如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

拆分器

splitter 是一个组件,其作用是将消息分割成几个部分,并发送这些结果消息独立进行处理。 非常经常地,它们在包含聚合器的管道中作为上游生产者。spring-doc.cadn.net.cn

编程模型

performing splitting 的 API 包含一个基类,AbstractMessageSplitter。 它是一个 MessageHandler 实现,封装了分隔器共有的功能,例如在生成的消息中填充适当的消息头(CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER)。 这些填充使得跟踪消息及其处理结果成为可能(通常情况下,这些头部信息会被复制到各种转换端点生成的消息中)。 然后可以使用这些值,例如由一个 组合消息处理器spring-doc.cadn.net.cn

The following example shows an excerpt from AbstractMessageSplitter:spring-doc.cadn.net.cn

public abstract class AbstractMessageSplitter
    extends AbstractReplyProducingMessageConsumer {
    ...
    protected abstract Object splitMessage(Message<?> message);

}

要在一个应用程序中实现特定的拆分器,您可以扩展AbstractMessageSplitter并实现splitMessage方法,该方法包含拆分消息的逻辑。 返回值可以是以下之一:spring-doc.cadn.net.cn

  • 一个Collection、一个消息数组或一个Iterable(或Iterator),该值会迭代消息。 在这种情况下,消息会在CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER填充之后被发送作为消息。 采用这种方法可以让你获得更多的控制权——例如,在拆分过程中填充自定义消息头。spring-doc.cadn.net.cn

  • 一个Collection或非消息对象的数组,或者一个Iterable(或Iterator),该值会迭代非消息对象。 它的工作方式类似于前一种情况,除了每个集合元素都会作为消息负载使用。 通过这种方式,您可以在不考虑消息系统的情况下专注于领域对象,并生成更易于测试的代码。spring-doc.cadn.net.cn

  • a Message 或非消息对象(但不是集合或数组)。 它与之前的案例类似,只是发送一条消息。spring-doc.cadn.net.cn

在 Spring Integration 中,任何 POJO 都可以实现拆分算法,前提是该类定义了一个接受单个参数并返回结果的方法。 在此情况下,方法的返回值按前文所述进行解释。 输入参数可以是 Message 或一个简单的 POJO。 在后一种情况下,拆分器将接收传入消息的负载(payload)。 我们推荐这种方案,因为它使代码与 Spring Integration API 解耦,并且通常更易于测试。spring-doc.cadn.net.cn

迭代器

从版本 4.1 开始,AbstractMessageSplitter 支持将 value 拆分为 Iterator 类型。 注意,在出现 Iterator(或 Iterable)的情况下,我们无法访问底层项目的数量,且 SEQUENCE_SIZE 头被设置为 0。 这意味着 <aggregator> 的默认 SequenceSizeReleaseStrategy 将无法工作,来自 splitterCORRELATION_ID 的分组也不会被释放;它将保持为 incomplete。 在这种情况下,您应使用适当的自定义 ReleaseStrategy,或结合 send-partial-result-on-expirygroup-timeout 或一个 MessageGroupStoreReaper 来依赖处理。spring-doc.cadn.net.cn

从版本 5.0 开始,AbstractMessageSplitter 提供了 protected obtainSizeIfPossible() 方法来允许确定 IterableIterator 对象的大小(如果可能的话)。 例如,XPathMessageSplitter 可以确定其底层 NodeList 对象的大小。 并且从版本 5.0.9 开始,这个方法也正确地返回了 com.fasterxml.jackson.core.TreeNode 的大小。spring-doc.cadn.net.cn

一个Iterator对象可以在不需要在内存中构建整个集合的情况下避免分片的需要。 例如,当基础项是从某些外部系统(例如,数据库或FTP MGET)通过迭代或流来填充时。spring-doc.cadn.net.cn

流与 Flux

从5.0版本开始,AbstractMessageSplitter支持Java Stream和Reactive Streams Publisher类型来划分valuespring-doc.cadn.net.cn

spring-doc.cadn.net.cn

在这种情况下,目标Iterator是基于它们的迭代功能构建的。spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

此外,如果拆分器的输出通道是ReactiveStreamsSubscribableChannel的一个实例,则AbstractMessageSplitter会生成一个Flux结果而不是Iterator,并且输出通道将订阅这个Flux以基于下游流的需求进行回压拆分。spring-doc.cadn.net.cn

从版本 5.2 开始,拆分器支持使用 discardChannel 选项来发送那些拆分函数返回空容器(集合、数组、流、Flux 等)的请求消息。 在这种情况下,没有项目可供迭代以发送到 outputChannelnull 的拆分结果将保留为流程结束指示符。spring-doc.cadn.net.cn

使用 Java、Groovy 和 Kotlin DSL 配置 Splitter

基于Message及其可迭代载荷的简单拆分器示例(使用DSL配置):spring-doc.cadn.net.cn

@Bean
public IntegrationFlow someFlow() {
    return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
    integrationFlow {
        split<Message<*>> { it.payload }
    }
@Bean
someFlow() {
    integrationFlow {
        splitWith {
		    expectedType Message<?>
		    function { it.payload }
        }
    }
}

查看有关 DSL 的更多信息,请参见相应的章节:spring-doc.cadn.net.cn

使用 XML 配置拆分器

可以使用XML配置一个分隔符,如下所示:spring-doc.cadn.net.cn

<int:channel id="inputChannel"/>

<int:splitter id="splitter"           (1)
  ref="splitterBean"                  (2)
  method="split"                      (3)
  input-channel="inputChannel"        (4)
  output-channel="outputChannel"      (5)
  discard-channel="discardChannel" /> (6)

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 分隔符的ID是可选的。
2 对应用程序上下文中定义的 bean 的引用。 该 bean 必须实现拆分逻辑,如前文所述。 可选。 如果未提供对 bean 的引用,则假设到达 input-channel 的消息负载是 java.util.Collection 的实现,并将默认拆分逻辑应用于该集合,将每个单独元素封装为消息并发送至 output-channel
3 实现拆分逻辑的方法(在bean上定义)。 可选的。
4 分流器的输入通道。 必需。
5 发送结果的通道(拆分器将结果发送到该通道)。 可选的(因为传入的消息本身可以指定回复通道)。
6 在拆分结果为空的情况下,请求消息被发送到的通道。 可选(如果结果为 null,它们将停止)。

我们推荐使用一个ref属性,如果自定义分割器实现可以在其他<splitter>定义中引用。 然而,如果自定义分割器处理器实现应该局限于单个<splitter>的定义中,你可以配置一个内部bean定义,如下例所示:spring-doc.cadn.net.cn

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
在同一<int:splitter> 配置中同时使用 ref 属性和内部处理器定义是不允许的,因为这会导致条件模糊,并引发异常。
如果 ref 属性引用了一个扩展了 AbstractMessageProducingHandler 的 bean(例如框架本身提供的 splitters),配置将会优化,直接注入输出通道到处理器中。 在这种情况下,每个 ref 必须是一个单独的 bean 实例(或者 prototype-作用域的 bean)或使用内部的 <bean/> 配置类型。 然而,这种优化仅适用于您在 splitters XML 定义中没有提供任何特定于 splitter 的属性的情况。 如果您不小心从多个 bean 引用了同一个消息处理器,则会收到配置异常。

使用注解配置拆分器

The @Splitter annotation is applicable to methods that expect either the Message type or the message payload type, and the return values of the method should be a Collection of any type. If the returned values are not actual Message objects, each item is wrapped in a Message as the payload of the Message. Each resulting Message is sent to the designated output channel for the endpoint on which the @Splitter is defined.spring-doc.cadn.net.cn

以下示例展示了如何使用@Splitter注解来配置一个splitter:spring-doc.cadn.net.cn

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}