|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
消息通道实现
Spring Integration 提供了不同的消息通道实现。 以下部分简要描述每一种。
PublishSubscribeChannel
The PublishSubscribeChannel 实现会将任何发送到它的 Message 广播到其所有订阅处理程序。
这通常用于发送事件消息,其主要作用是通知(与文档消息相反,文档消息通常旨在由单个处理程序处理)。
请注意,PublishSubscribeChannel 仅用于发送。
由于当其 send(Message) 方法被调用时,它会直接将其消息广播给其订阅者,消费者无法轮询消息(它不实现 PollableChannel,因此没有 receive() 方法)。
相反,任何订阅者本身必须是 MessageHandler,并且会调用订阅者的 handleMessage(Message) 方法。
在3.0版本之前,对没有订阅者的PublishSubscribeChannel调用send方法返回false。
当与MessagingTemplate一起使用时,会抛出一个MessageDeliveryException异常。
从3.0版本开始,行为已更改,只要至少存在最小数量的订阅者(并且成功处理消息),调用send方法总是被视为成功的。
可以通过设置minSubscribers属性来修改此行为,默认值为0。
如果使用了TaskExecutor,则仅根据正确的订阅者数量来确定这一点,因为消息的实际处理是异步进行的。 |
QueueChannel
The QueueChannel 实现包装了一个队列。
与 PublishSubscribeChannel 不同,QueueChannel 具有点对点语义。
换句话说,即使通道有多名消费者,也只能有一个消费者接收到发送到该通道的任何 Message 。
它提供了一个无参数构造函数(提供了一种本质上未定义容量的 Integer.MAX_VALUE ),以及一个接受队列容量作为参数的构造函数,如下所示:
public QueueChannel(int capacity)
一个尚未达到容量限制的通道会将其消息存储在其内部队列中,并且 send(Message<?>) 方法会立即返回,即使没有接收者准备好处理该消息。
如果队列已达到容量限制,发送方将阻塞,直到队列中有可用空间。
或者,如果您使用带有额外超时参数的 send 方法,则队列将阻塞,直到有可用空间或超时时间过去,以先发生者为准。
同样地,如果队列上有可用消息,receive() 调用会立即返回;但如果队列为空,则 receive 调用可能会阻塞,直到有可用消息或提供的超时时间过去(如果提供了的话)。
在任何一种情况下,都可以通过传递值为 0 的超时参数来强制立即返回,无论队列状态如何。
然而需要注意的是,对不带 timeout 参数的 send() 和 receive() 版本的调用将无限期阻塞。
PriorityChannel
whereas the QueueChannel 强制遵循先进先出(FIFO)顺序,PriorityChannel 是另一种实现方式,允许根据优先级在通道内对消息进行排序。
默认情况下,优先级由每个消息内的 priority 头部确定。然而,为了自定义优先级确定逻辑,可以向 PriorityChannel 构造函数提供类型为 Comparator<Message<?>> 的比较器。
RendezvousChannel
The RendezvousChannel 启用了“直接传递”场景,其中发送者会阻塞直到另一方调用通道的 receive() 方法。
另一方则会在发送者发送消息之前阻塞。
内部实现上,这种做法与 QueueChannel 非常相似,只是它使用了一个 SynchronousQueue(即零容量的 BlockingQueue 实现)。
在发送者和接收者运行在不同线程中但不适合异步地将消息存入队列的情况下,这种方法表现良好。
换句话说,在使用 RendezvousChannel 的情况下,发送者知道某个接收方已经接受了该消息,而使用 QueueChannel 时,则可能只是将消息存储到了内部队列并且永远无法被接收。
请记住,这些基于队列的通道默认仅在内存中存储消息。
当需要持久化时,您可以通过在 'queue' 元素内提供 'message-store' 属性来引用一个持久化的 MessageStore 实现,或者用一个由持久化代理支持的通道替换本地通道,例如 JMS 通道或适配器。
后一种选项可以让您利用任何 JMS 提供商的消息持久化实现,如在 JMS 支持 中所述。
然而,当队列中的缓冲不是必要的时候,最简单的方法是依赖于 DirectChannel ,这将在下一节中讨论。 |
The RendezvousChannel 也可以用于实现请求-响应操作。
发送方可以创建一个临时且匿名的 RendezvousChannel 实例,然后在构建一个 Message 时将其设置为 'replyChannel' 头部。
发送该 Message 后,发送方可以立即调用 receive(可选地提供超时值)来阻塞并等待响应 Message。
这与 Spring Integration 的许多请求-响应组件内部实现非常相似。
DirectChannel
该 DirectChannel 具有点对点的语义,但在其他方面更类似于 PublishSubscribeChannel 而不是之前描述的任何基于队列的通道实现。
它实现了 SubscribableChannel 接口而不是 PollableChannel 接口,因此直接将消息分发给订阅者。
然而,作为一个点对点通道,它与 PublishSubscribeChannel 不同之处在于,它会将每个 Message 发送到一个单一的已订阅的 MessageHandler。
除了是最简单的点对点通道选项之外,另一个最重要的特性是它可以使得单一线程在同一时间执行“通道两端”的操作。
例如,如果一个处理器订阅了 DirectChannel,那么向该通道发送 Message 会直接触发处理器的 handleMessage(Message) 方法,在发送者线程的方法返回之前。
提供具有此行为的通道实现的主要动机是支持必须跨越通道的事务,同时仍能受益于通道提供的抽象和松耦合。
如果 send() 调用在事务范围内被调用,则处理程序的调用结果(例如更新数据库记录)将在决定该事务的最终结果(提交或回滚)中发挥作用。
由于 DirectChannel 是最简单的选项,且不会为轮询器的线程调度和管理带来任何额外的开销,因此它是 Spring Integration 中的默认通道类型。
一般思路是定义应用程序的通道,考虑哪些通道需要提供缓冲或限制输入,然后将这些通道修改为基于队列的 PollableChannels。
同样地,如果一个通道需要广播消息,它就不应该是 DirectChannel,而应该是 PublishSubscribeChannel。
稍后,我们将展示如何配置每种类型的通道。 |
The DirectChannel 内部委托给一个消息调度器来调用其订阅的消息处理器,而该调度器可以通过 load-balancer 或 load-balancer-ref 个属性(互斥)来暴露负载均衡策略。负载均衡策略用于帮助消息分发器在多个消息处理器订阅同一个通道时,决定如何分配消息。作为方便,load-balancer 属性暴露了一个枚举值的集合,指向已存在的 LoadBalancingStrategy 实现。A round-robin(负载均衡在轮询中跨多个处理器分配)和 none(对于希望明确禁用负载均衡的情况)是唯一可用的值。其他策略实现可能在未来的版本中添加。然而,自从版本 3.0,您可以提供自己的 LoadBalancingStrategy 实现,并使用 load-balancer-ref 属性将其注入,该属性应指向一个实现了 LoadBalancingStrategy 的 Bean,如下示例所示:
一个FixedSubscriberChannel是一个仅支持单一SubscribableChannel订阅者且该订阅者无法取消订阅的MessageHandler。
这在没有其他订阅者参与且不需要通道拦截器的情况下,对于高吞吐量性能用例非常有用。
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
注意,load-balancer 和 load-balancer-ref 属性是互斥的。
负载均衡也与布尔类型的 failover 属性一起工作。
如果 failover 值为 true(默认值),则在前置处理器抛出异常时,分发器会回退到任何后续处理器(必要时)。
处理器的顺序由其自身定义的可选顺序值决定,或者如果没有此类值,则按照处理器订阅的顺序。
如果某些情况要求调度器每次都尝试调用第一个处理器,并且在每次发生错误时都按照相同的固定顺序进行回退,则不应提供负载均衡策略。
换句话说,即使未启用负载均衡,调度器仍然支持failover布尔属性。
然而,在没有负载均衡的情况下,处理器的调用始终从第一个开始,根据它们的顺序。
例如,当有一个明确的一级、二级、三级等定义时,这种方法效果很好。
当使用命名空间支持时,任何端点上的order属性决定了其顺序。
请注意,负载均衡和failover仅在通道有多个订阅的消息处理器时适用。
当使用命名空间支持时,这意味着多个端点共享同一个在input-channel属性中定义的通道引用。 |
自 5.2 版本起,当failover为真时,当前处理器的失败及其失败消息会被记录在debug或根据配置记录在info。
ExecutorChannel
ExecutorChannel 是一个点对点通道,支持与 DirectChannel 相同的调度配置(负载均衡策略和 failover 布尔属性)。
这两种调度通道类型的关键区别在于,ExecutorChannel 会委托给一个 TaskExecutor 的实例来执行调度。
这意味着 send 方法通常不会阻塞,但这同时也意味着处理器的调用可能不在发送者的线程中发生。
因此,它不支持跨越发送者和接收处理器的事务。
发送者有时会被阻塞。
例如,当使用 TaskExecutor 与限制策略(如 ThreadPoolExecutor.CallerRunsPolicy)进行客户端速率限制时,发送者的线程可以在线程池达到最大容量且执行者的任务队列已满的情况下随时执行方法。
由于这种情况只会在不可预测的方式下发生,你不应依赖它来进行事务处理。 |
PartitionedChannel
自 Spring 框架的 6.1 版本起,提供了一个 PartitionedChannel 实现。
这是对 AbstractExecutorChannel 的扩展,并代表了一种点到点分发逻辑,在这种逻辑中,实际消费会在一个特定的线程上进行处理,这个线程是由发送给此通道的消息中的分区键评估确定的。
该通道类似于上述提及的 ExecutorChannel,但不同之处在于具有相同分区键的消息总是由同一个线程处理,从而保留了消息的顺序。
它不需要外部的 TaskExecutor,但是可以配置自定义的 ThreadFactory(例如 Thread.ofVirtual().name("partition-", 0).factory())。
此工厂用于根据每个分区填充单线程执行器到一个 MessageDispatcher 代理中。
默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头被用作分区键。
该通道可以配置为简单的 Bean:
@Bean
PartitionedChannel somePartitionedChannel() {
return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
该通道将有3分区 - 专用线程;并将使用partitionKey头来确定消息将在哪个分区处理。
有关更多信息,请参见PartitionedChannel类的Javadoc。
FluxMessageChannel
FluxMessageChannel 是一个针对 "sinking" 的已发送消息的 org.reactivestreams.Publisher 实现,它将消息发送至内部 reactor.core.publisher.Flux,供下游响应式订阅者按需消费。
该通道实现既不是 SubscribableChannel,也不是 PollableChannel,因此只能使用 org.reactivestreams.Subscriber 实例从该通道进行消费,以遵循响应式流的背压特性。
另一方面,FluxMessageChannel 实现了 ReactiveStreamsSubscribableChannel,其 subscribeTo(Publisher<Message<?>>) 契约允许从响应式源发布者接收事件,从而将响应式流桥接到集成流程中。
为了实现整个集成流程的完全响应式行为,此类通道必须放置在流程中的所有端点之间。
见Reactive Streams 支持以获取有关与 Reactive Streams 交互的更多信息。
作用域通道
Spring Integration 1.0 提供了一个 ThreadLocalChannel 实现,但该实现已于 2.0 版本中移除。
现在处理相同需求的更通用方法是向一个 channel 添加一个 scope 属性。属性值可以是一个在上下文中可用的作用域名称。
例如,在 Web 环境中,某些作用域是可用的,并且任何自定义的作用域实现都可以注册到上下文中。下面的例子展示了如何将线程局部作用域应用到一个 channel 中,包括作用域本身的注册:
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
在前面示例中定义的通道内部也会委托给一个队列,但该通道绑定到了当前线程上,因此队列的内容也同样被绑定。
这样,在发送消息到通道的线程之后可以从终端通道接收这些相同的消息,但其他线程无法访问它们。
虽然按线程范围定义的通道很少需要使用,但在仅使用DirectChannel实例来确保单一操作线程的情况下,并且任何回复消息都应该发往一个“终端”通道时,这样的通道可以非常有用。如果这个终端通道是按线程范围定义的,那么原始发送线程可以从终端通道收集其回复。
现在,由于任何频道都可以进行范围限定,您除了可以使用线程局部变量之外还可以定义自己的范围。