|
对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
分散-聚集
从版本 4.1 开始,Spring Integration 提供了分散-聚集企业集成模式的实现。 它是一个复合端点,其目标是向收件人发送消息并聚合结果。 正如企业集成模式中所述,它是诸如“最佳报价”等场景的组件,在这些场景中,我们需要向多个提供商请求信息,并决定哪一个提供商为我们提供所请求项目的最佳条款。
以前,可以使用离散组件来配置模式。 这一增强带来了更便捷的配置。
这ScatterGatherHandler是一个请求-回复端点,它结合了PublishSubscribeChannel(或RecipientListRouter) 和AggregatingMessageHandler.
请求消息被发送到scatter通道,以及ScatterGatherHandler等待聚合器发送到outputChannel.
功能性
这Scatter-Gather模式建议两种场景:“拍卖”和“分发”。
在这两种情况下,aggregation函数相同,并提供了所有可用于AggregatingMessageHandler.
(实际上,ScatterGatherHandler只需要一个AggregatingMessageHandler作为构造函数参数。
有关详细信息,请参阅聚合器。
拍卖
拍卖Scatter-Gathervariant 对请求消息使用“发布-订阅”逻辑,其中“分散”通道是PublishSubscribeChannel跟apply-sequence="true".
但是,此通道可以是任何MessageChannel实现(与request-channel在ContentEnricher— 参见内容丰富器)。
但是,在这种情况下,您应该创建自己的自定义correlationStrategy对于aggregation功能。
分配
分布Scatter-Gather变体基于RecipientListRouter(参见RecipientListRouter)以及RecipientListRouter.
这是第二个ScatterGatherHandlerconstructor 参数。
如果只想依赖默认值correlationStrategy对于recipient-list-router和aggregator,则应指定apply-sequence="true".
否则,您应该提供自定义correlationStrategy对于aggregator.
与PublishSubscribeChannelvariant(拍卖变体),具有recipient-list-router selector选项允许根据消息过滤目标提供商。
跟apply-sequence="true",默认值sequenceSize,并且aggregator可以正确释放组。
分发选项与拍卖选项是互斥的。
这applySequence=true仅对于基于ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer)构造函数配置,因为框架不能改变外部提供的组件。
为方便起见,XML 和 Java DSLScatter-Gather集applySequence从 6.0 版开始设置为 true。 |
对于拍卖和分发变体,请求(分散)消息都使用gatherResultChannel标头来等待来自aggregator.
默认情况下,所有提供商都应将其结果发送到replyChannel标头(通常通过省略output-channel从最终端点)。
但是,gatherChannel选项,允许提供商将其回复发送到该渠道以进行聚合。
配置分散收集端点
以下示例显示了 Bean 定义的 Java 配置Scatter-Gather:
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的示例中,我们将RecipientListRouter distributorbean 与applySequence="true"以及收件人渠道列表。
下一个 bean 是针对AggregatingMessageHandler.
最后,我们将这两个 bean 注入到ScatterGatherHandlerbean 定义,并将其标记为@ServiceActivator将分散-聚集组件连接到集成流中。
以下示例显示如何配置<scatter-gather>endpoint 使用 XML 命名空间:
<scatter-gather
id="" (1)
auto-startup="" (2)
input-channel="" (3)
output-channel="" (4)
scatter-channel="" (5)
gather-channel="" (6)
order="" (7)
phase="" (8)
send-timeout="" (9)
gather-timeout="" (10)
requires-reply="" > (11)
<scatterer/> (12)
<gatherer/> (13)
</scatter-gather>
| 1 | 终结点的 ID。
这ScatterGatherHandlerbean 的别名为id + '.handler'.
这RecipientListRouterbean 的别名为id + '.scatterer'.
这AggregatingMessageHandlerbean 的别名为id + '.gatherer'.
自选。
(这BeanFactory生成默认值id值。 |
| 2 | 生命周期属性指示是否应在应用程序上下文初始化期间启动终结点。
此外,ScatterGatherHandler还实现Lifecycle并启动和停止gatherEndpoint,如果gather-channel被提供。
自选。
(默认值为true.) |
| 3 | 接收请求消息以在ScatterGatherHandler.
必填。 |
| 4 | 该通道的ScatterGatherHandler发送聚合结果。
自选。
(传入消息可以在replyChannelmessage header) 的 Message 标头)。 |
| 5 | 要向其发送拍卖方案的分散消息的通道。
自选。
与<scatterer>子元素。 |
| 6 | 用于接收每个提供商对聚合的回复的通道。
它用作replyChannel标头。
自选。
默认情况下,FixedSubscriberChannel被创建。 |
| 7 | 当多个处理程序订阅相同的处理程序时,此组件的顺序DirectChannel(用于负载平衡目的)。
自选。 |
| 8 | 指定应启动和停止终结点的阶段。
启动顺序从最低到最高,关机顺序从最高到最低。
默认情况下,此值为Integer.MAX_VALUE,这意味着该容器尽可能晚地启动并尽快停止。
自选。 |
| 9 | 发送回复时等待的超时间隔Message到output-channel.
默认情况下,send()块一秒钟。
仅当输出通道有一些“发送”限制时,它才适用,例如,QueueChannel具有已满的固定“容量”。
在这种情况下,一个MessageDeliveryException被抛出。
这send-timeout被忽略AbstractSubscribableChannel实现。
为group-timeout(-expression)这MessageDeliveryException从计划过期任务中,将导致重新计划此任务。
自选。 |
| 10 | 用于指定分散收集在返回之前等待回复消息的时间。
默认情况下,它等待30秒。
如果回复超时,则返回“null”。
自选。 |
| 11 | 指定分散收集是否必须返回非空值。
此值为true默认情况下。
因此,一个ReplyRequiredException当底层聚合器在gather-timeout.
请注意,如果null是一种可能性,则gather-timeout应指定以避免无限期等待。 |
| 12 | 这<recipient-list-router>选项。
自选。
相互排斥scatter-channel属性。 |
| 13 | 这<aggregator>选项。
必填。 |
错误处理
由于 Scatter-Gather 是一个多请求-回复组件,因此错误处理具有一些额外的复杂性。
在某些情况下,如果ReleaseStrategy允许以少于请求的回复完成流程。
在其他情况下,当发生错误时,应考虑从子流返回“补偿消息”之类的内容。
每个异步子流都应配置errorChannel标头,用于从MessagePublishingErrorHandler.
否则,将错误发送到全局errorChannel使用常见的错误处理逻辑。
有关异步错误处理的更多信息,请参阅错误处理。
同步流可以使用ExpressionEvaluatingRequestHandlerAdvice忽略异常或返回补偿消息。
当异常从其中一个子流抛出到ScatterGatherHandler,它只是被重新抛向上游。
这样,所有其他子流都将毫无用处,并且它们的回复将在ScatterGatherHandler.
这有时可能是预期行为,但在大多数情况下,最好在不影响所有其他子流和收集器中的期望的情况下处理特定子流中的错误。
从 5.1.3 版本开始,ScatterGatherHandler随errorChannelName选择。
它填充到errorChannel标头,在发生异步错误时使用,或者可以在常规同步子流中用于直接发送错误消息。
下面的示例配置演示了通过返回补偿消息来处理异步错误:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
为了产生正确的回复,我们必须复制标题(包括replyChannel和errorChannel) 从failedMessage的MessagingException已发送到scatterGatherErrorChannel通过MessagePublishingErrorHandler.
这样,目标异常将返回给ScatterGatherHandler用于回复消息组完成。
这样的例外payload可以在MessageGroupProcessor收集器或以其他方式处理到下游,在分散收集端点之后。
在将散射结果发送给采集器之前,ScatterGatherHandler恢复请求消息标头,包括回复和错误通道(如果有)。
这样,来自AggregatingMessageHandler将传播给调用方,即使在分散的收件人子流中应用了异步切换。
为了成功运行,一个gatherResultChannel,originalReplyChannel和originalErrorChannel标头必须从分散的收件人子流传输回回复。
在这种情况下,一个合理的、有限的gatherTimeout必须为ScatterGatherHandler.
否则,默认情况下,它将被阻止,等待收集者的回复。 |