如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

Scatter-Gather

自 Spring Integration 4.1 版本起,提供了 散列-聚合 企业集成模式的实现。 它是一个复合端点,目标是向接收者发送消息并聚集结果。 正如在 企业集成模式 中所指出的那样,在“最佳报价”场景中,我们需要从多个提供商那里获取信息,并决定哪个提供商为请求的商品提供了最优条件。spring-doc.cadn.net.cn

之前,模式可以通过独立的组件进行配置。 此增强带来了更加便捷的配置方式。spring-doc.cadn.net.cn

The ScatterGatherHandler 是一个请求-回复端点,结合了一个 PublishSubscribeChannel(或一个 RecipientListRouter) 和一个 AggregatingMessageHandler。 请求消息被发送到 scatter 通道,并且 ScatterGatherHandler 等待聚合器将回复发送到 outputChannelspring-doc.cadn.net.cn

功能

The Scatter-Gather 模式建议两种场景:“拍卖”和“分配”。 在两种情况下,aggregation 函数是相同的,并提供了 AggregatingMessageHandler 可用的所有选项。 实际上,ScatterGatherHandler 只需要一个 AggregatingMessageHandler 作为构造函数参数即可。 有关更多信息,请参见 聚合器spring-doc.cadn.net.cn

拍卖

拍卖 Scatter-Gather 变体使用“发布 - 订阅”逻辑处理请求消息,其中“散射”通道是一个带有 apply-sequence="true"PublishSubscribeChannel。 然而,该通道可以是任何 MessageChannel 实现(例如 ContentEnricher 中的 request-channel 的情况——参见 内容增强器)。 不过,在这种情况下,您应为 aggregation 函数创建自己的自定义 correlationStrategyspring-doc.cadn.net.cn

分发

分发 Scatter-Gather 变体基于 RecipientListRouter(参见 RecipientListRouter),并包含 RecipientListRouter 的所有可用选项。 这是第二个 ScatterGatherHandler 构造函数参数。 如果您希望仅依赖 recipient-list-routeraggregator 的默认 correlationStrategy,则应指定 apply-sequence="true"。 否则,您应为 aggregator 提供自定义的 correlationStrategy。 与 PublishSubscribeChannel 变体(拍卖变体)不同,拥有 recipient-list-router selector 选项可以根据消息筛选目标提供商。 使用 apply-sequence="true" 时,将提供默认的 sequenceSize,并且 aggregator 可以正确释放组。 分发选项与拍卖选项互斥。spring-doc.cadn.net.cn

The applySequence=true 是必需的,仅适用于基于 ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) 构造函数配置的纯 Java 配置,因为框架无法修改外部提供的组件。 为了方便起见,从版本 6.0 开始,Scatter-Gather 的 XML 和 Java DSL 将 applySequence 设置为 true。

对于拍卖和分配变体,请求(散列)消息会被丰富添加带有gatherResultChannel头信息以等待来自aggregator的回复消息。spring-doc.cadn.net.cn

默认情况下,所有提供商应该将他们的结果发送到replyChannel头(通常通过省略最终端点中的output-channel来实现)。 但是,还提供了gatherChannel选项,允许提供商将他们的回复发送到该通道以进行聚合。spring-doc.cadn.net.cn

配置散射 - 汇聚端点

以下示例展示了bean定义的Java配置代码段Scatter-Gather:spring-doc.cadn.net.cn

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

在前面的例子中,我们配置了RecipientListRouterdistributor bean 为 applySequence="true" 并且设置了接收者通道的列表。
下一个 bean 是一个 AggregatingMessageHandler。最后,我们将这两个 bean 注入到 ScatterGatherHandler bean 定义中,并将其标记为 @ServiceActivator 以将散列收集组件连接到集成流中。spring-doc.cadn.net.cn

以下示例展示了如何通过使用XML命名空间来配置<scatter-gather>端点:spring-doc.cadn.net.cn

<scatter-gather
		id=""  (1)
		auto-startup=""  (2)
		input-channel=""  (3)
		output-channel=""  (4)
		scatter-channel=""  (5)
		gather-channel=""  (6)
		order=""  (7)
		phase=""  (8)
		send-timeout=""  (9)
		gather-timeout=""  (10)
		requires-reply="" > (11)
			<scatterer/>  (12)
			<gatherer/>  (13)
</scatter-gather>
1 The id of the endpoint. The ScatterGatherHandler bean is registered with an alias of id + '.handler'. The RecipientListRouter bean is registered with an alias of id + '.scatterer'. The AggregatingMessageHandler bean is registered with an alias of id + '.gatherer'. Optional. (The BeanFactory generates a default id value.)
2 生命周期属性,表示端点是否应在应用程序上下文初始化时启动。 此外,ScatterGatherHandler还实现了Lifecycle并启停gatherEndpoint,如果提供了gather-channel,则会在内部创建gatherEndpoint。 此属性可选。 (默认值为true。)
3 在接收请求消息并处理它们的通道上进行接收。 必需。
4 发送聚合结果的通道。 可选。 (入站消息可以在replyChannel消息头中自己指定回复通道)。
5 用于向拍卖场景发送散射消息的频道。 可选。 与 <scatterer> 子元素互斥。
6 用于从每个提供商接收聚合回复的通道。 它用作散点消息中的 replyChannel 标头。 可选。 默认情况下,将创建 FixedSubscriberChannel
7 当多个处理器订阅到同一个 DirectChannel 时,此组件的顺序(用于负载均衡目的)。 可选。
8 指定端点应启动和停止的阶段。 启动顺序从低到高,关闭顺序从高到低。 默认情况下,此值为 Integer.MAX_VALUE,表示该容器尽可能晚地启动并尽快停止。 可选。
9 发送回复时等待的超时间隔 Messageoutput-channel。 默认情况下,send() 会阻塞一秒。 它仅适用于输出通道存在某些“发送”限制的情况——例如,具有固定“容量”且已满的 QueueChannel。 在这种情况下,将抛出 MessageDeliveryException。 对于 AbstractSubscribableChannel 实现,send-timeout 将被忽略。 对于 group-timeout(-expression),计划过期任务中的 MessageDeliveryException 会导致该任务被重新调度。 可选。
10 允许您指定 scatter-gather 在返回前等待回复消息的时长。 默认情况下,它等待 30 秒。 如果回复超时,将返回 'null'。 可选。
11 指定散合模式是否必须返回非空值。 该值默认为 true。 因此,当底层聚合器在 gather-timeout 后返回空值时,将抛出 ReplyRequiredException。 注意,如果 null 是可能的情况,则应指定 gather-timeout 以避免无限等待。
12 <recipient-list-router> 选项。 可选。 与 scatter-channel 属性互斥。
13 <aggregator> 个选项。 必填。

错误处理

由于 Scatter-Gather 是一个多请求 - 响应组件,错误处理具有一定的额外复杂性。 在某些情况下,如果 ReleaseStrategy 允许流程以少于请求数的响应完成,则最好直接捕获并忽略下游异常。 在其他情况下,当发生错误时,应考虑使用类似“补偿消息”的机制从子流程返回。spring-doc.cadn.net.cn

每个异步子流程都应配置一个 errorChannel 标头,以便从 MessagePublishingErrorHandler 正确发送错误消息。 否则,错误将发送至全局 errorChannel,并应用通用的错误处理逻辑。 有关异步错误处理的更多信息,请参阅 错误处理spring-doc.cadn.net.cn

同步流可以使用ExpressionEvaluatingRequestHandlerAdvice来忽略异常或返回补偿消息。 当一个异常从某个子流抛出到ScatterGatherHandler时,它会被重新抛出给上游。 这样,所有其他子流都将白费功夫,它们的回复将在ScatterGatherHandler中被忽略。 这在某些情况下可能是预期行为,但在大多数情况下,最好在特定的子流中处理错误,而不影响其他子流以及收集器的期望。spring-doc.cadn.net.cn

从版本 5.1.3 开始,ScatterGatherHandlererrorChannelName 选项提供。 它被填充到散射消息的 errorChannel 头中,在发生异步错误时使用,也可用于常规同步子流程中以直接发送错误消息。spring-doc.cadn.net.cn

下面的示例配置展示了如何通过返回补偿消息来实现异步错误处理:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}

要生成正确的回复,我们必须从已发送至 scatterGatherErrorChannelMessagePublishingErrorHandlerMessagingExceptionfailedMessage 中复制标头(包括 replyChannelerrorChannel)。 这样,目标异常将返回给用于完成回复消息组的 ScatterGatherHandler 的收集器。 此类异常 payload 可以在收集器的 MessageGroupProcessor 中被过滤掉,或者在分散 - 聚集端点之后以其他方式在下游处理。spring-doc.cadn.net.cn

在将散射结果发送给收集器之前,ScatterGatherHandler 会恢复请求消息头,包括回复通道和错误通道(如果存在)。 通过这种方式,即使散射接收子流程中应用了异步交接,来自 AggregatingMessageHandler 的错误也将被传播给调用者。 为了成功操作,必须将 gatherResultChanneloriginalReplyChanneloriginalErrorChannel 头信息从散射接收子流程的回复中传回。 在这种情况下,必须为 ScatterGatherHandler 配置一个合理的有限值 gatherTimeout。 否则,默认情况下它将一直阻塞并等待来自收集器的回复。