如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

轮询器

此部分描述了在Spring Integration中轮询的工作原理。spring-doc.cadn.net.cn

轮询消费者

当消息端点(通道适配器)连接到通道并实例化时,它们会生成以下之一的实例:spring-doc.cadn.net.cn

实际实现取决于这些端点连接到的通道类型。 连接到实现 org.springframework.messaging.SubscribableChannel 接口的通道的通道适配器会生成 EventDrivenConsumer 的实例。 另一方面,连接到实现 org.springframework.messaging.PollableChannel 接口(例如 QueueChannel)的通道的通道适配器会生成 PollingConsumer 的实例。spring-doc.cadn.net.cn

轮询消费者让Spring Integration组件主动轮询消息,而不是以事件驱动的方式处理消息。spring-doc.cadn.net.cn

它们在许多消息场景中代表一个关键的横切关注点。 在 Spring Integration 中,轮询消费者基于同名模式,该模式由 Gregor Hohpe 和 Bobby Woolf 在其著作《企业集成模式》中进行描述。 您可以在本书的网站上找到该模式的描述。spring-doc.cadn.net.cn

对于轮询消费者配置的更多信息,请参阅消息端点spring-doc.cadn.net.cn

可轮询消息源

Spring 集成提供了一种轮询消费者模式的第二种变体。 当使用入站通道适配器时,这些适配器通常由一个 SourcePollingChannelAdapter 包裹。 例如,当从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器 会配置为 poller 以定期检索消息。 因此,当组件配置了 poller 时,所产生的实例类型之一为以下几种之一:spring-doc.cadn.net.cn

这表示轮询器在入站和出站消息场景中都有使用。 以下是一些使用轮询器的用例:spring-doc.cadn.net.cn

AOP 通知类可以应用于轮询器,在 advice-chain 中,例如使用事务通知来启动一个事务。 从版本 4.1 开始,提供了 PollSkipAdvice。 轮询器使用触发器来确定下一次轮询的时间。 可以使用 PollSkipAdvice 来抑制(跳过)一次轮询,可能是因为某些下游条件会阻止消息被处理。 要使用此通知,您需要为其提供 PollSkipStrategy 的实现。 从版本 4.2.5 开始,提供了 SimplePollSkipStrategy。 要使用它,您可以将其作为 Bean 实例添加到应用程序上下文中,将其注入到 PollSkipAdvice 中,并将其添加到轮询器的通知链中。 要跳过轮询,请调用 skipPolls()。 要恢复轮询,请调用 reset()。 版本 4.2 在此领域增加了更多灵活性。 请参阅 条件轮询器

此章节仅旨在提供对轮询消费者及其如何融入消息通道概念(参见消息通道) 和通道适配器(参见通道适配器)的高层次概述。 有关通用的消息端点以及特定的轮询消费者的信息,请参阅 消息端点spring-doc.cadn.net.cn

延迟确认可轮询消息源

自5.0.1版本起,某些模块提供了MessageSource实现,支持在下游流完成(或把消息交给另一个线程)时延迟确认。 目前这仅限于AmqpMessageSourceKafkaMessageSource模块。spring-doc.cadn.net.cn

使用这些消息源时,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 标头(请参阅 MessageHeaderAccessor API)将被添加到消息中。 当与可轮询消息源一起使用时,该标头的值是 AcknowledgmentCallback 的实例,如下例所示:spring-doc.cadn.net.cn

@FunctionalInterface
public interface AcknowledgmentCallback {

    void acknowledge(Status status);

    boolean isAcknowledged();

    void noAutoAck();

    default boolean isAutoAck();

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

并非所有消息源(例如,KafkaMessageSource)都支持 REJECT 状态。 它被视为与 ACCEPT 相同。spring-doc.cadn.net.cn

应用程序可以在任何时间确认一条消息,如下例所示:spring-doc.cadn.net.cn

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

如果 MessageSource 连接到 SourcePollingChannelAdapter,当下游流程完成后轮询线程返回适配器时,适配器会检查确认是否已经完成;若未完成,则将其状态设置为 ACCEPT(如果流程抛出异常,则设置为 REJECT)。 状态值定义在 AcknowledgmentCallback.Status 枚举 中。spring-doc.cadn.net.cn

Spring Integration 提供了MessageSourcePollingTemplate来执行对MessageSource的即时轮询。
这也会负责在MessageHandler回调返回(或抛出异常)时为AcknowledgmentCallback设置ACCEPTREJECT。 以下示例展示了如何使用MessageSourcePollingTemplate进行轮询:spring-doc.cadn.net.cn

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

在这两种情况下(SourcePollingChannelAdapterMessageSourcePollingTemplate),你都可以通过在回调中调用 noAutoAck() 来禁用自动确认/否定确认。 如果你将消息传递给另一个线程并希望稍后再进行确认,可能会这样做。 并非所有实现都支持此功能(例如,Apache Kafka 就不支持,因为偏移量提交必须在同一线程上执行)。spring-doc.cadn.net.cn

消息源的有条件轮询器

此部分介绍了如何使用条件性检查器。spring-doc.cadn.net.cn

背景

Advice 个对象,在一个 advice-chain 上进行轮询,建议整个轮询任务(包括消息检索和处理)。 这些“环绕通知”方法无法访问任何轮询的上下文——只能访问轮询本身。 这适用于如使任务事务性或因某些外部条件跳过轮询等要求,正如之前所述。 如果我们希望根据 receive 部分的结果采取一些行动,或者希望根据条件调整轮询器呢?对于这些情况,Spring Integration 提供了“智能”轮询。spring-doc.cadn.net.cn

“智能”轮询

Version 5.3 引入了 ReceiveMessageAdvice 接口。 任何在 Advice 中实现此接口的 advice-chain 对象仅应用于 receive() 操作 - MessageSource.receive()PollableChannel.receive(timeout)。 因此,它们只能用于 SourcePollingChannelAdapterPollingConsumer。 此类实现以下方法:spring-doc.cadn.net.cn

  • beforeReceive(Object source) This method is called before the Object.receive() method. It lets you examine and reconfigure the source. Returning false cancels this poll (similar to the PollSkipAdvice mentioned earlier).spring-doc.cadn.net.cn

  • Message<?> afterReceive(Message<?> result, Object source) This method is called after the receive() method. Again, you can reconfigure the source or take any action (perhaps depending on the result, which can be null if there was no message created by the source). You can even return a different messagespring-doc.cadn.net.cn

线程安全

如果 Advice 会修改源数据,则不应将轮询器配置为使用 TaskExecutor。 如果 Advice 会修改源数据,此类修改不是线程安全的,可能会导致意外结果,特别是在高频轮询时。 如果您需要并发处理轮询结果,请考虑使用下游的 ExecutorChannel,而不是向轮询器添加执行器。spring-doc.cadn.net.cn

建议链排序

您应该了解在初始化过程中如何执行建议链。 Advice 个未实现 ReceiveMessageAdvice 的对象会应用于整个轮询过程,并在任何 ReceiveMessageAdvice 之前按顺序全部首先被调用。 然后,ReceiveMessageAdvice 个对象按顺序围绕源 receive() 方法被调用。 例如,如果您有 Advice 个对象 a, b, c, d,其中 bdReceiveMessageAdvice,则对象的适用顺序如下:a, c, b, d。 此外,如果源本身已是 Proxy,则 ReceiveMessageAdvice 会在任何现有的 Advice 对象之后被调用。 如果您希望更改顺序,必须自行配置代理。spring-doc.cadn.net.cn

SimpleActiveIdleReceiveMessageAdvice

此建议是ReceiveMessageAdvice的一个简单实现。 当与一个DynamicPeriodicTrigger一起使用时,它会根据上次轮询是否产生了消息来调整轮询频率。 轮询器还必须引用同一个DynamicPeriodicTriggerspring-doc.cadn.net.cn

重要提示:异步交接
SimpleActiveIdleReceiveMessageAdvice 根据 receive() 的结果来修改触发条件。 这仅在建议被调用时处于轮询线程的情况下生效。 如果轮询线程有 task-executor,则不会生效。 要在轮询结果之后使用异步操作,请稍后执行异步交接,或许可以通过使用 ExecutorChannel 来实现。

CompoundTriggerAdvice

此建议允许根据轮询是否返回消息来选择两种触发器之一。 考虑一个使用 CronTrigger 的轮询器。CronTrigger 实例是不可变的,因此构造后不能被修改。 考虑这样一个用例:我们希望每小时使用 cron 表达式触发一次轮询,但如果未收到消息,则每分钟进行一次轮询,并在检索到消息时重新使用 cron 表达式。spring-doc.cadn.net.cn

The advice (and poller) 使用一个 CompoundTrigger 来达到这个目的。 The trigger 的 primary 触发器可以是一个 CronTrigger。 当建议(advice)检测到没有接收到消息时,它会将次要触发器添加到 CompoundTrigger 中。 当 CompoundTrigger 实例的 nextExecutionTime 方法被调用时,如果存在次要触发器,则委托给次要触发器;否则,委托给主要触发器。spring-doc.cadn.net.cn

必须将.poller也引用同一个CompoundTriggerspring-doc.cadn.net.cn

以下示例展示了使用每分钟作为cron表达式备用的每小时配置:spring-doc.cadn.net.cn

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
重要提示:异步交接
CompoundTriggerAdvice 根据 receive() 的结果来修改触发条件。 这仅在建议被调用时处于轮询线程的情况下生效。 如果轮询线程有 task-executor,则不会生效。 要在轮询结果之后使用异步操作,请稍后执行异步交接,或许可以通过使用 ExecutorChannel 来实现。

仅 MessageSource 的提示

某些建议可能仅适用于MessageSource.receive(),而不适用于PollableChannel。 为此目的,仍然存在一个MessageSourceMutator接口(该接口扩展了ReceiveMessageAdvice)。 有关更多信息,请参阅Inbound Channel Adapters: Polling Multiple Servers and Directoriesspring-doc.cadn.net.cn