如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

聚合器和重组器

一个 Aggregator 在概念上是 Splitter 的相反操作。 它将一系列单独的消息聚合为单条消息,因此必然更加复杂。 默认情况下,聚合器返回的消息包含来自传入消息的负载集合。 同样的规则也适用于 Resequencer。 以下示例展示了拆分 - 聚合模式的经典案例:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlow.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

split()方法将列表拆分为单独的消息,并将它们发送到ExecutorChannelresequence()方法根据消息头中找到的序列详细信息对消息进行重新排序。 aggregate()方法收集这些消息。spring-doc.cadn.net.cn

然而,您可以通过指定发布策略和相关策略等来更改默认行为。 考虑以下示例:spring-doc.cadn.net.cn

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

前述示例将具有 myCorrelationKey 个标头的消息进行关联,并在至少累积十条消息后释放这些消息。spring-doc.cadn.net.cn

resequence() EIP 方法提供了类似的 Lambda 配置。spring-doc.cadn.net.cn