对于最新稳定版本,请使用 Spring Integration 7.0.0spring-doc.cadn.net.cn

消息通道实现

Spring Integration 提供了不同的消息通道实现。 以下章节简要介绍了每一项。spring-doc.cadn.net.cn

发布订阅频道

发布订阅频道实现广播 任意消息发送给所有订阅的处理员。 这通常用于发送事件消息,事件消息的主要作用是通知(与文档消息不同,文档消息通常由单个处理器处理)。 注意发布订阅频道仅供发送。 因为它会直接向用户广播,当它发送(消息)调用方法时,消费者无法轮询消息(它不实现Pollable频道因此 没有接收()方法)。 相反,任何订阅者本身都必须是消息处理器,以及订阅者的handleMessage(消息)方法依次被调用。spring-doc.cadn.net.cn

在3.0版本之前,调用发送方法发布订阅频道但该节目没有回归任何订阅者false. 与消息模板一个MessageDeliveryException被扔了。 从3.0版本开始,行为发生了变化,导致发送只要至少有最低订阅者(并且成功处理消息),则始终被视为成功。 这种行为可以通过设置min订阅者该属性默认为0.spring-doc.cadn.net.cn

如果你使用任务执行者,仅使用正确数量的订阅者来判定,因为消息的实际处理是异步完成的。

队列通道

队列通道实现会包裹队列。 与发布订阅频道队列通道具有点对点语义。 换句话说,即使频道有多个用户,也只有其中一个用户应该接收到任何消息被送到那个频道。 它提供了一个默认的无参数构造子(提供本质上无界的容量Integer.MAX价值)以及接受队列容量的构造器,如下列表所示:spring-doc.cadn.net.cn

public QueueChannel(int capacity)

尚未达到容量上限的信道会将其内部队列中存储消息,且发送(消息<?>)方法会立即返回,即使没有接收端准备好处理消息。 如果队列已满,发送方会阻塞,直到队列中有空位。 或者,如果你使用带有额外超时参数的发送方法,队列会阻塞,直到任一房间可用或超时时间结束,以先发生者为准。 类似地,a接收()如果队列中有消息可用,呼叫会立即返回;但如果队列为空,则接收呼叫可能会阻塞,直到消息可用或超时(如提供)过去。 无论哪种情况,都可以通过传递超时值为0强制立即返回,不论队列状态。 但请注意,这涉及到以下版本发送()接收()没有超时参数块无限期。spring-doc.cadn.net.cn

优先频道

队列通道强制执行先进先出(FIFO)排序,即优先频道是一种允许信道内根据优先级排序消息的替代实现。 默认情况下,优先级由优先权每个消息中的头部。 然而,对于自定义优先级确定逻辑,有一个类型的比较器Comparator<Message<?>>可以提供给优先频道构造 函数。spring-doc.cadn.net.cn

会合频道

会合频道实现了“直接切换”场景,发送方会屏蔽,直到另一方调用该通道接收()方法。 对方会封锁,直到发送方发送消息。 内部实现与队列通道,但其使用了一个同步队列(零容量实现阻塞队列). 这在发送端和接收端在不同线程中运行的情况效果良好,但异步地将消息丢弃在队列中则不合适。 换句话说,具有会合频道,发送方知道某个接收者已接受该消息,而当队列通道该消息将被存储在内部队列中,可能永远不会收到。spring-doc.cadn.net.cn

请记住,所有这些基于队列的通道默认都是在内存中存储消息。 当需要持久化时,你可以在“队列”元素中提供“消息存储”属性来引用持久化消息商店实现过程中,或者你也可以用由持久代理支持的本地通道替代,比如JMS支持的通道或通道适配器。 后者选项允许你利用任何 JMS 提供商在消息持久化上的实现,详见 JMS 支持。 然而,当不需要在队列中缓冲时,最简单的方法是依赖直达频道,将在下一节讨论。

会合频道也适用于实现请求-回复作。 发送方可以创建一个临时且匿名的实例会合频道然后在构建 时将其设置为“replyChannel”头部消息. 发完后消息发送方可以立即调用收到(可选地提供超时值)以便在等待回复时进行屏蔽消息. 这与 Spring Integration 内部许多请求-回复组件的实现非常相似。spring-doc.cadn.net.cn

直达频道

直达频道具有点对点语义,但除此之外更接近发布订阅频道比之前描述的任何基于队列的信道实现都要强大。 它实现了订阅频道接口而非Pollable频道接口,因此它直接向订阅者发送消息。 然而,作为点对点信道,它与发布订阅频道它将每个消息到单一订阅者消息处理器.spring-doc.cadn.net.cn

除了是最简单的点对点通道选项外,其最重要的特性之一是允许单线程在通道的“两侧”执行作。 例如,如果一个处理器订阅了直达频道,然后发送消息该通道触发调用该处理器的handleMessage(消息)在发送方线程中直接进行方法,在发送()方法调用可以返回。spring-doc.cadn.net.cn

提供这种行为的通道实现的主要动机是支持必须跨通道的交易,同时仍能利用通道提供的抽象和松耦合。 如果发送()调用是在事务范围内调用的,处理程序调用的结果(例如更新数据库记录)在决定该事务的最终结果(提交或回滚)中起作用。spring-doc.cadn.net.cn

自从......直达频道是最简单的选项,且不会增加调度和管理轮询器线程所需的额外开销,它是 Spring Integration 中的默认通道类型。基本思路是为应用程序定义通道,考虑哪些需要提供缓冲或限速输入,并修改它们为基于队列可投票频道. 同样,如果一个频道需要广播消息,它也不应该是直达频道而是发布订阅频道. 之后,我们会展示这些通道的配置方式。

直达频道内部委派给消息调度器调用其订阅的消息处理程序,该调度器可以通过以下方式暴露负载均衡策略负载均衡器负载均衡器-参考属性(互斥)。负载均衡策略被消息调度器用来帮助确定当多个消息处理器订阅同一通道时,消息如何分配到消息处理器之间。为了方便起见,负载均衡器属性 暴露了指向 预先实现的值枚举负载均衡策略. 一个循环赛(轮流负载均衡于处理机之间)没有(对于想要明确禁用负载均衡的情况) 是唯一可用的数值。未来版本可能会添加其他策略实现。不过,自3.0版本起,你可以自行实现负载均衡策略并用以下方式注入负载均衡器-参考属性,应指向实现负载均衡策略,如下示例所示:spring-doc.cadn.net.cn

一个固定订阅频道订阅频道只支持一个消息处理器无法取消订阅的用户。这在没有其他用户参与且不需要信道拦截器时的高吞吐量性能场景中非常有用。spring-doc.cadn.net.cn

<int:channel id="lbRefChannel">
  <int:dispatcher load-balancer-ref="lb"/>
</int:channel>

<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>

注意负载均衡器负载均衡器-参考属性是互斥的。spring-doc.cadn.net.cn

负载均衡还与布尔值结合使用备援切换财产。 如果备援切换值为真(默认值),当前一个处理器抛出异常时,调度器会退回到后续的处理程序(如有需要)。顺序由处理程序本身定义的可选顺序值决定,若无该值,则由处理程序订阅的顺序决定。spring-doc.cadn.net.cn

如果在某些情况下,调度器每次发生错误时都必须尝试调用第一个处理程序,然后在每次发生错误时都以相同的固定顺序回退,则不应提供负载均衡策略。换句话说,调度器仍然支持备援切换即使未启用负载均衡,也具有布尔属性。然而,在没有负载均衡的情况下,调用处理程序总是从第一个开始,按照它们的顺序。例如,当对主级、次级、三级等有明确定义时,这种方法效果很好。使用命名空间支持时,次序任意端点的属性决定了顺序。spring-doc.cadn.net.cn

请记住负载均衡和备援切换仅当一个信道拥有多个订阅消息处理程序时才适用。使用命名空间支持时,这意味着多个端点共享定义在输入通道属性。

从5.2版本开始,当备援切换成立时,当前处理程序的失败以及失败消息都会被记录在调试信息如果分别配置。spring-doc.cadn.net.cn

执行者频道

执行者频道是一个点对点信道,支持与 相同的调度器配置直达频道(负载均衡策略和备援切换布尔性质)。这两种调度通道类型的主要区别在于执行者频道代表到一个实例任务执行者执行调度。这意味着发送方法通常不会阻塞,但也意味着处理程序调用可能不会发生在发送方线程中。因此,它不支持跨发送端和接收端处理程序的事务。spring-doc.cadn.net.cn

发送方有时会阻挡。 例如,当使用 a任务执行者带有拒绝策略,会限制客户端(例如ThreadPoolExecutor.CallerRunsPolicy),发送端线程可以在线程池达到最大容量且执行者工作队列满时执行该方法。 由于这种情况只会以不可预测的方式发生,你不应依赖它来进行交易。

分区通道

从6.1版本开始,分区通道提供实现。 这是 的扩展摘要执行者通道代表点对点调度逻辑,实际消耗由发送到该信道的消息计算的分区键决定在特定线程上进行。 该信道类似于执行者频道但不同的是,使用相同分区键的消息总是在同一线程中处理,保持顺序。 它不需要外部接口任务执行者但可以通过自定义配置线程工厂(例如:Thread.ofVirtual().name(“partition-”, 0).factory()). 该工厂用于将单线程执行程序填充到消息调度器每个分区委托。 默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID消息头作为分区键使用。 该通道可以配置为简单的豆子:spring-doc.cadn.net.cn

@Bean
PartitionedChannel somePartitionedChannel() {
    return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}

该频道将拥有3分区——专用线程;将使用partitionKey用头来确定消息将在哪个分区处理。 看分区通道欲了解更多信息,请参阅 Javadocs 类。spring-doc.cadn.net.cn

流信息频道

流信息频道org.reactivestreams.出版商实现“沉没”向内部发送消息reactor.core.publisher.Flux供下游反应型用户按需消费。 该信道实现既不是订阅频道,也不是Pollable频道,所以只有org.reactivestreams.Subscriber实例可以用来利用该通道的能量,以尊重反应流的背压特性。 另一方面,流信息频道实现 aReactiveStreamsSubscribeableChannel(可订阅频道)subscribeTo(Publisher<Message<?>>)契约允许接收来自响应式源发布者的事件,将响应式流桥接进集成流程。 为了实现整个积分流的完全反应行为,必须在流中所有端点之间放置这样的通道。spring-doc.cadn.net.cn

有关与反应流交互的更多信息,请参见反应流支持spring-doc.cadn.net.cn

有范围的通道

Spring Integration 1.0 提供了线程本地频道但自2.0版本起已移除。 现在处理相同要求的更通用方法是添加一个范围归因于频道。 属性的值可以是上下文中可用的作用域名称。 例如,在网页环境中,某些作用域可用,任何自定义作用域实现都可以在上下文中注册。 以下示例展示了对通道应用线程本地作用域,包括对该作用域本身的注册:spring-doc.cadn.net.cn

<int:channel id="threadScopedChannel" scope="thread">
     <int:queue />
</int:channel>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
        </map>
    </property>
</bean>

前述示例中定义的通道也委托到内部的队列,但通道绑定到当前线程,因此队列内容也同样绑定。 这样,发送到通道的线程以后可以接收到相同的消息,但其他线程无法访问。 虽然线程作用域通道很少被要求使用,但在某些情况下它们仍然有用直达频道实例被用于强制执行单一线程作,但任何回复消息都应发送到“终端”通道。 如果该终端通道是线程作用波,原始发送线程可以从终端通道收集回复。spring-doc.cadn.net.cn

现在,由于任何通道都可以有作用域,你可以在线程本地之外,定义自己的作用域。spring-doc.cadn.net.cn