如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

线程屏障

有时,我们需要暂停消息流线程,直到发生某些其他异步事件。 例如,考虑一个向 RabbitMQ 发布消息的 HTTP 请求。 我们可能希望等待 RabbitMQ 代理发出消息已收到的确认后再回复用户。spring-doc.cadn.net.cn

在 4.2 版本中,Spring Integration 为此引入了 <barrier/> 组件。 底层的 MessageHandlerBarrierMessageHandler。 该类还实现了 MessageTriggerAction,其中传递给 trigger() 方法的消息会释放 handleRequestMessage() 方法(如果存在)中的相应线程。spring-doc.cadn.net.cn

暂停的线程与触发线程通过在对消息调用 CorrelationStrategy 时进行关联。 当消息发送到 input-channel 时,线程将暂停最多 requestTimeout 毫秒,等待相应的触发消息。 默认的相关策略使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 标头。 当具有相同相关性的触发消息到达时,线程将被释放。 在释放后发送到 output-channel 的消息是使用 MessageGroupProcessor 构建的。 默认情况下,消息是两个有效负载的 Collection<?>,并且标头通过使用 DefaultAggregatingMessageGroupProcessor 进行合并。spring-doc.cadn.net.cn

如果首先(或在主线程超时后)调用了trigger()方法,它将暂停最多triggerTimeout以等待暂停消息到达。 如果您不希望挂起触发线程,可以考虑将其转交给TaskExecutor,以便其线程被挂起。
在 5.4 版本之前,请求消息和触发消息仅有一个 timeout 选项,但在某些使用场景中,为这些操作设置不同的超时时间更为合适。 因此,现已引入 requestTimeouttriggerTimeout 选项。

requires-reply 属性决定了如果在触发消息到达之前挂起的线程超时,应采取的操作。 默认情况下,其值为 false,这意味着端点返回 null,流程结束,线程返回给调用者。 当值为 true 时,将抛出 ReplyRequiredExceptionspring-doc.cadn.net.cn

您可以以编程方式调用 trigger() 方法(通过名称获取 Bean 引用,barrier.handler — 其中 barrier 是屏障端点的 Bean 名称)。 或者,您可以配置一个 <outbound-channel-adapter/> 来触发释放。spring-doc.cadn.net.cn

同一关联 ID 只能暂停一个线程。 同一关联 ID 可多次使用,但不可同时并发使用。 如果有第二个线程携带相同的关联 ID 到达,将抛出异常。

以下示例展示了如何使用自定义标头进行关联:spring-doc.cadn.net.cn

@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
    BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
    barrier.setOutputChannel(out());
    barrier.setDiscardChannel(lateTriggerChannel);
    return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
    return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

取决于哪个消息先到达,发送消息到 in 的线程或发送消息到 release 的线程将等待最多十秒,直到另一条消息到达。 当消息被释放时,out 通道会收到一条消息,该消息结合了调用名为 myOutputProcessor 的自定义 MessageGroupProcessor Bean 的结果。 如果主线程超时且触发器稍后到达,您可以配置一个丢弃通道,将迟到的触发器发送到该通道。 如果请求消息未能及时到达,触发器消息也会被丢弃。spring-doc.cadn.net.cn

有关此组件的示例,请参阅 屏障示例应用程序spring-doc.cadn.net.cn