如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

配置消息通道

要创建一个消息通道实例,您可以通过以下方式之一使用<channel/>元素进行XML配置或使用DirectChannel实例进行Java配置:spring-doc.cadn.net.cn

@Bean
public MessageChannel exampleChannel() {
    return new DirectChannel();
}
<int:channel id="exampleChannel"/>

当您使用<channel/>元素而没有任何子元素时,会创建一个DirectChannel实例(一个SubscribableChannel)。spring-doc.cadn.net.cn

要创建发布/订阅频道,请使用<publish-subscribe-channel/>元素(在Java中为PublishSubscribeChannel),如下所示:spring-doc.cadn.net.cn

@Bean
public MessageChannel exampleChannel() {
    return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>

您也可以提供多种<queue/>子元素,以创建任何可轮询的通道类型(如消息通道实现中所述)。 以下各节展示了每种通道类型的示例。spring-doc.cadn.net.cn

DirectChannel配置

如先前所述,DirectChannel 是默认类型。 以下列出了如何定义一个:spring-doc.cadn.net.cn

@Bean
public MessageChannel directChannel() {
    return new DirectChannel();
}
<int:channel id="directChannel"/>

默认通道采用轮询负载均衡,并启用了故障转移(详见 DirectChannel 以获取更多详细信息)。 要禁用其中一个或两个功能,请添加一个 <dispatcher/> 子元素(即 DirectChannelLoadBalancingStrategy 构造函数),并按以下方式配置属性:spring-doc.cadn.net.cn

@Bean
public MessageChannel failFastChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setFailover(false);
    return channel;
}

@Bean
public MessageChannel failFastChannel() {
    return new DirectChannel(null);
}
<int:channel id="failFastChannel">
    <int:dispatcher failover="false"/>
</channel>

<int:channel id="channelWithFixedOrderSequenceFailover">
    <int:dispatcher load-balancer="none"/>
</int:channel>

从版本 6.3 开始,所有基于 UnicastingDispatcherMessageChannel 实现都可以配置为使用 Predicate<Exception> failoverStrategy,而不是简单的 failover 选项。 该谓词根据当前组件抛出的异常,决定是否故障转移到下一个 MessageHandler。 更复杂的错误分析应使用 ErrorMessageExceptionTypeRouter 来完成。spring-doc.cadn.net.cn

数据类型通道配置

有时候,消费者只能处理特定类型的消息负载,这就要求你确保输入消息的类型。 最初的想法可能是使用消息过滤器。但是,消息过滤器所能做的只是过滤掉不符合消费者需求的消息。 另一种方法是使用内容路由,并将不符合数据类型的message导向特定的transformer进行转换和转换为所需的数据类型。 这可以实现目标,但更简单的方法是应用数据类型通道模式。你可以为每种具体的payload数据类型使用单独的数据类型通道。spring-doc.cadn.net.cn

若要创建一个仅接受包含特定有效负载类型消息的数据类型通道,请在通道的 datatype 属性中提供数据类型的全限定类名,如下例所示:spring-doc.cadn.net.cn

@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(Number.class);
    return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>

注意,类型检查会通过任何可以分配给通道数据类型的类型。 换句话说,在前面的示例中,numberChannel 将接受载荷为 java.lang.Integerjava.lang.Double 的消息。 多个类型可以通过逗号分隔列表提供,如下一个示例所示:spring-doc.cadn.net.cn

@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(String.class, Number.class);
    return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>

那么,在前面的示例中,'numberChannel' 只接受数据类型为 java.lang.Number 的消息。
但是,如果消息的有效载荷不是所需的数据类型会发生什么情况呢?
这取决于你是否定义了一个名为 integrationConversionService 的 bean,并且该 bean 是 Spring 转换服务的一个实例。
如果没有,则会立即抛出一个 Exception 异常。
但是,如果你已经定义了 integrationConversionService bean,则会尝试将消息的有效载荷转换为可接受的数据类型。
spring-doc.cadn.net.cn

您可以注册自定义转换器。 例如,假设您向我们上面配置的'numberChannel'发送了一个String负载的消息。 您可以按照以下方式处理该消息:spring-doc.cadn.net.cn

MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));

通常,这将是一个完全合法的操作。 然而,由于我们使用了 Datatype Channel,此类操作的结果将生成类似以下的异常:spring-doc.cadn.net.cn

Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…

由于我们要求负载类型为Number,但发送的是String,因此发生了异常。
所以我们需要一些方法将String转换为Number。 为此,我们可以实现类似于以下示例的转换器:spring-doc.cadn.net.cn

public static class StringToIntegerConverter implements Converter<String, Integer> {
    public Integer convert(String source) {
        return Integer.parseInt(source);
    }
}

然后我们可以将其注册为转换器,使用集成转换服务,如下例所示:spring-doc.cadn.net.cn

@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
    return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>

<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>

StringToIntegerConverter类中,当该类被标记有@Component注解进行自动扫描时。spring-doc.cadn.net.cn

当解析 'converter' 元素时,如果尚未定义该元素,则会创建 integrationConversionService bean。 有了这个转换器后,send 操作现在将成功执行,因为数据类型通道使用该转换器将 String 载荷转换为 Integerspring-doc.cadn.net.cn

对于有关负载类型转换的更多信息,请参阅 负载类型转换spring-doc.cadn.net.cn

从版本4.0开始,integrationConversionService通过DefaultDatatypeChannelMessageConverter调用,后者会在应用上下文中查找转换服务。spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

要使用不同的转换技术,您可以在频道上指定message-converter属性。此属性必须引用一个MessageConverter实现。spring-doc.cadn.net.cn

仅使用fromMessage方法。spring-doc.cadn.net.cn

该方法为转换器提供了访问消息头的权限(如果转换需要来自头部的信息,例如content-type)。spring-doc.cadn.net.cn

此方法只能返回已转换的有效负载或完整的Message对象。如果是后者,则转换器必须小心地从入站消息中复制所有头部信息。spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

您可以声明一个类型为MessageConverter、ID为datatypeChannelMessageConverter<bean/>,并且该转换器将被所有具有datatype类型的通道使用。spring-doc.cadn.net.cn

QueueChannel配置

要创建一个QueueChannel,使用<queue/>子元素。 您可以按照以下方式指定通道的容量:spring-doc.cadn.net.cn

@Bean
public PollableChannel queueChannel() {
    return new QueueChannel(25);
}
<int:channel id="queueChannel">
    <queue capacity="25"/>
</int:channel>
如果没有为这个 <queue/> 子元素的 'capacity' 属性提供值,生成的队列将是无界的。 为了避免内存耗尽等问题,我们强烈建议您为有界队列设置一个显式的值。

持久的QueueChannel配置

由于一个QueueChannel提供了缓冲消息的能力,但默认情况下仅在内存中进行,因此在系统故障的情况下可能会丢失消息。 为了缓解这种风险,一个QueueChannel可以由MessageGroupStore策略接口的持久性实现来支持。 有关MessageGroupStoreMessageStore的更多细节,请参阅消息存储spring-doc.cadn.net.cn

capacity 属性在使用了 message-store 属性时不允许。

当一个QueueChannel接收到一个Message时,它会将消息添加到消息存储中。 当从一个QueueChannel中轮询出一个Message时,该消息将被移除出消息存储。spring-doc.cadn.net.cn

默认情况下,一个QueueChannel将其消息存储在一个内存队列中,这可能会导致之前提到的消息丢失的情况。 然而,Spring Integration 提供了持久化存储方式,例如JdbcChannelMessageStorespring-doc.cadn.net.cn

您可以为任何QueueChannel配置消息存储,通过添加message-store属性,如下例所示:spring-doc.cadn.net.cn

<int:channel id="dbBackedChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

(见下方的Java/Kotlin 配置选项示例。)spring-doc.cadn.net.cn

Spring Integration JDBC 模块还提供了针对多种流行数据库的模式数据定义语言(DDL)。 这些模式位于该模块的 org.springframework.integration.jdbc.store.channel 包中 (spring-integration-jdbc)。spring-doc.cadn.net.cn

一个重要的功能是,对于任何事务性持久存储(例如JdbcChannelMessageStore),只要轮询器配置了事务,在事务成功完成之前,从存储中移除的消息不会被永久删除。否则,如果事务回滚,则消息Message不会丢失。

随着越来越多的与"NoSQL"数据存储相关的 Spring 项目提供对这些存储的底层支持,许多其他消息存储实现也已可用。 如果您找不到符合您特定需求的实现,也可以自行提供对 MessageGroupStore 接口的实现。spring-doc.cadn.net.cn

自从 4.0 版本起,我们建议如果可能的话,配置QueueChannel 实例使用 ChannelMessageStore。这些通常被优化用于这种用途,与通用的消息存储相比。 如果 ChannelMessageStoreChannelPriorityMessageStore,消息将按照优先级顺序进行 FIFO 接收。 优先级的概念由消息存储实现决定。 例如,以下示例展示了使用MongoDB Channel Message Store的 Java 配置:spring-doc.cadn.net.cn

@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
    MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
    store.setPriorityEnabled(true);
    return store;
}

@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
    return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
    return IntegrationFlow.from((Channels c) ->
            c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
            ....
            .get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
    integrationFlow {
        channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
    }
注意MessageGroupQueue类。 那是使用BlockingQueue操作的MessageGroupStore实现。

另一种自定义 QueueChannel 环境的方式是通过 <int:queue> 子元素或其特定构造函数中的 ref 属性提供。 该属性提供了对任何 java.util.Queue 实现的引用。 例如,可以按如下方式配置 Hazelcast 分布式 IQueuespring-doc.cadn.net.cn

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(new Config()
                                           .setProperty("hazelcast.logging.type", "log4j"));
}

@Bean
public PollableChannel distributedQueue() {
    return new QueueChannel(hazelcastInstance()
                              .getQueue("springIntegrationQueue"));
}

PublishSubscribeChannel配置

要创建一个PublishSubscribeChannel,可以使用<publish-subscribe-channel/> 元素。 在使用此元素时,还可以指定用于发布消息的task-executor(如果没有指定,则在发送者的线程中进行发布),如下所示:spring-doc.cadn.net.cn

@Bean
public MessageChannel pubsubChannel() {
    return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>

如果您在PublishSubscribeChannel下游提供重组器或聚合器,则可以将通道的'apply-sequence'属性设置为true。 这样做表示通道应在传递消息之前设置sequence-sizesequence-number消息头以及关联ID。 例如,如果有五个订阅者,则sequence-size将设置为5,并且消息的sequence-number头值范围将从15spring-doc.cadn.net.cn

除了 Executor,您还可以配置一个 ErrorHandler。 默认情况下,PublishSubscribeChannel 使用 MessagePublishingErrorHandler 实现将错误从 errorChannel 头发送到的 MessageChannel,或注入到全局 errorChannel 实例中。 如果未配置 Executor,则忽略 ErrorHandler,异常将直接抛给调用者的线程。spring-doc.cadn.net.cn

如果在下游提供一个ResequencerAggregator,紧跟着上游的PublishSubscribeChannel,您可以将通道上的'apply-sequence'属性设置为true。 这样表示该通道应当在传递消息之前设置序列大小和序列号消息标头以及关联ID。 例如,如果有五个订阅者,序列大小会被设置为5,而消息的序列号标头值将从15不等。spring-doc.cadn.net.cn

以下示例展示了如何将 apply-sequence 头设置为 truespring-doc.cadn.net.cn

@Bean
public MessageChannel pubsubChannel() {
    PublishSubscribeChannel channel = new PublishSubscribeChannel();
    channel.setApplySequence(true);
    return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
The apply-sequence 值默认为 false,这样发布-订阅通道就可以将完全相同的消息实例发送到多个传出通道。 由于 Spring Integration 强制实现负载和头引用的不可变性,当该标志设置为 true 时,通道会创建新的 Message 实例,这些实例具有相同的负载引用但有不同的头值。

自 5.4.3 版本起,PublishSubscribeChannel 可以通过其 BroadcastingDispatcherrequireSubscribers 选项进行配置,以指示此通道在没有订阅者时不会默默地忽略消息。 当没有订阅者且此选项设置为 true 时,会抛出一个带有 Dispatcher has no subscribers 标签的 MessageDispatchingException 异常。spring-doc.cadn.net.cn

ExecutorChannel

要创建一个ExecutorChannel,添加一个<dispatcher>子元素并带有task-executor属性。 该属性的值可以引用上下文中任何TaskExecutor。 例如,这样做可以在向订阅处理程序发送消息时配置线程池进行调度。 正如前面提到的,这样做会打破发送方和接收方之间的单线程执行上下文,使得任何活跃的事务上下文不会被处理程序调用所共享(也就是说,处理程序可能会抛出Exception异常,但send的调用已经成功返回)。 以下示例展示了如何使用dispatcher元素并在task-executor属性中指定执行器:spring-doc.cadn.net.cn

@Bean
public MessageChannel executorChannel() {
    return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
    <int:dispatcher task-executor="someExecutor"/>
</int:channel>

load-balancerfailover选项同样适用于<dispatcher/>子元素,如前文DirectChannel配置中所描述。 相同的默认值同样适用。 因此,除非为其中一个或两个属性提供了显式配置,否则该通道具有启用了故障转移的轮询负载均衡策略,如下例所示:spring-doc.cadn.net.cn

<int:channel id="executorChannelWithoutFailover">
    <int:dispatcher task-executor="someExecutor" failover="false"/>
</int:channel>

PriorityChannel配置

要创建一个PriorityChannel,可以使用<priority-queue/>子元素,如下例所示:spring-doc.cadn.net.cn

@Bean
public PollableChannel priorityChannel() {
    return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
    <int:priority-queue capacity="20"/>
</int:channel>

默认情况下,通道会咨询消息的priority标头。 但是,你可以提供一个自定义的Comparator引用。另外,请注意,PriorityChannel(和其他类型一样)也支持datatype属性。 就像QueueChannel一样,它还支持一个capacity属性。 以下示例演示了所有这些内容:spring-doc.cadn.net.cn

@Bean
public PollableChannel priorityChannel() {
    PriorityChannel channel = new PriorityChannel(20, widgetComparator());
    channel.setDatatypes(example.Widget.class);
    return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
    <int:priority-queue comparator="widgetComparator"
                    capacity="10"/>
</int:channel>

自 4.0 版本起,priority-channel 子元素支持 message-store 选项(在此情况下不允许使用 comparatorcapacity)。 消息存储必须是 PriorityCapableChannelMessageStore。 目前为 RedisJDBCMongoDB 提供了 PriorityCapableChannelMessageStore 的实现。 有关更多信息,请参阅 QueueChannel 配置消息存储。 您可以在 后备消息通道 中找到示例配置。spring-doc.cadn.net.cn

RendezvousChannel配置

当队列子元素为<rendezvous-queue>时,会创建一个RendezvousChannel。 它不提供任何额外的配置选项给前面描述的选项,且其队列不接受任何容量值,因为它是一个零容量的直接传递队列。 以下示例展示了如何声明一个RendezvousChannelspring-doc.cadn.net.cn

@Bean
public PollableChannel rendezvousChannel() {
    return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
    <int:rendezvous-queue/>
</int:channel>

作用域通道配置

任何通道都可以配置为scope属性,如下例所示:spring-doc.cadn.net.cn

<int:channel id="threadLocalChannel" scope="thread"/>

通道拦截器配置

消息通道也可能包含拦截器,如通道拦截器所述。
<interceptors/> 子元素可以添加到一个 <channel/> 中(或更具体的元素类型)。
您可以提供 ref 属性来引用任何实现 ChannelInterceptor 接口的 Spring 管理对象,如以下示例所示:spring-doc.cadn.net.cn

<int:channel id="exampleChannel">
    <int:interceptors>
        <ref bean="trafficMonitoringInterceptor"/>
    </int:interceptors>
</int:channel>

一般情况下,我们建议将拦截器实现定义在单独的位置,因为它们通常提供可以跨多个渠道复用的常用行为。spring-doc.cadn.net.cn

全局通道拦截器配置

通道拦截器提供了一种简洁且清晰的方式来为每个单独的通道应用横切行为。 如果相同的横切行为需要在多个通道上应用,为每个通道配置相同的一组拦截器将不是最有效的方式。 为了避免重复配置同时也能让拦截器应用于多个通道,Spring Integration 提供了全局拦截器。 考虑以下示例对:spring-doc.cadn.net.cn

<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
    <bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>

<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>

每个<channel-interceptor/>元素允许您定义一个全局拦截器,该拦截器应用于匹配pattern属性定义的所有通道。 在前面的情况下,全局拦截器将应用到'thing1'通道以及所有以'thing2'或'input'开头的其他通道(从版本5.0开始),但不包括以'thing3'开头的通道。spring-doc.cadn.net.cn

在模式中添加此语法可能会引起一个(尽管可能不太可能发生的问题)。 如果你有一个名为!thing1的bean,并且你在通道拦截器的pattern模式中包含了!thing1这个模式,那么它不再匹配。 现在的模式匹配所有不名为thing1的bean。 在这种情况下,你可以通过使用\来转义模式中的!。 模式\!thing1可以匹配一个名为!thing1的bean。

order 属性允许您在给定通道上存在多个拦截器时,管理此拦截器的注入位置。 例如,'inputChannel' 通道可以配置本地独立的拦截器(见下文),如下例所示:spring-doc.cadn.net.cn

<int:channel id="inputChannel">
  <int:interceptors>
    <int:wire-tap channel="logger"/>
  </int:interceptors>
</int:channel>

一个合理的问题是:"全局拦截器相对于其他在本地或通过其他全局拦截器定义配置的拦截器是如何注入的?" 当前实现提供了一种简单的机制来定义拦截器的执行顺序。 order属性中的正数确保拦截器在任何现有拦截器之后注入,而负数确保拦截器在现有拦截器之前注入。 这意味着,在前面的示例中,全局拦截器是在'wire-tap'拦截器(在本地配置)之后注入的(因为其order大于0)。 如果存在另一个具有匹配pattern的全局拦截器,其顺序将通过比较两个拦截器的order属性值来确定。 若要在现有拦截器之前注入全局拦截器,请将order属性设置为负值。spring-doc.cadn.net.cn

注意,orderpattern 属性都是可选的。 order 的默认值为 0,而 pattern 的默认值是 '*'(表示匹配所有频道)。

消息拦截

正如前面提到的,Spring 集成提供了一个简单的 wiretap 监听器拦截器。 您可以在<interceptors/>元素内的任何通道上配置一个 wiretap。 这样做特别适用于调试,可以与 Spring 集成的日志通道适配器结合使用,如下所示:spring-doc.cadn.net.cn

<int:channel id="in">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:logging-channel-adapter id="logger" level="DEBUG"/>
'logging-channel-adapter' 也接受 'expression' 属性,以便您可以针对 'payload' 和 'headers' 变量评估 SpEL 表达式。 或者,若要记录完整的消息 toString() 结果,请将 'log-full-message' 属性的值提供为 true。 默认情况下,其值为 false,因此仅记录负载(payload)。 将其设置为 true 可在记录负载的同时启用所有头的记录。 'expression' 选项提供了最大的灵活性(例如 expression="payload.user.name")。

关于Spring框架中的wire tap和其他类似组件(消息发布配置)的一个常见误解是它们自动具有异步特性。
默认情况下,作为组件的wire tap并不是异步调用的。 相反,Spring Integration侧重于一种统一的方式来配置异步行为:消息通道。 在消息流中,使某些部分同步或异步的关键在于该部分配置的消息通道类型。 这就是消息通道抽象的一个主要好处。
从框架的最初设计开始,我们一直强调消息通道作为框架的一等公民的重要性及其价值。 它不仅仅是一个内部的、隐式的EIP模式实现。
它是完全对外部用户可配置的组件进行暴露。 因此,wire tap组件仅负责执行以下任务:spring-doc.cadn.net.cn

这实际上是一种桥接模式的变体,但它是封装在通道定义之内的(因此,在不中断流的情况下更容易启用和禁用)。也不同步桥接器,它基本上会 fork 另一个消息流。是同步还是异步流程?答案取决于'message channel' 'channelB'的类型。我们有以下选项:直接通道、可轮询通道和执行器通道。最后一段打破了线程边界,使得通过这样的通道进行通信是异步的,因为从该通道发送消息到其订阅处理程序的消息分派发生在与发送消息到该通道使用的线程不同的线程上。这就是会让您的线程流同步或异步的原因。它与框架内的其他组件(如消息发布器)保持一致,并通过避免您在编写线程安全代码之外还要预先考虑某个特定代码片段应实现为同步还是异步的方式,为您增加了统一性和简化性。两个代码片段(例如组件 A 和组件 B)通过消息通道进行实际连接,正是这种连接决定了它们的协作是同步的还是异步的。您甚至可能希望将来从同步模式切换到异步模式,而消息通道(message channel)让您能够迅速完成这一切换,且无需触碰任何代码。spring-doc.cadn.net.cn

关于 wire tap 的最后一个要点是,尽管在上面提供了不默认异步的理由,但仍需记住尽早将消息转交给接收方通常是很有益的。 因此,在 wire tap 的传出通道中使用异步通道选项是很常见的做法。 然而,默认情况下并不会强制执行这种异步行为。 如果我们这样做,会破坏许多用例,包括你可能不想打破事务边界。 或许你使用 wire tap 模式进行审计,而希望审计消息在原始事务中发送。 例如,你可以将 wire tap 连接到一个 JMS 出站通道适配器。 这样,你就可以同时获得以下好处:1) 可以在一个事务中发送一个 JMS 消息,2) 仍然可以实现“即发即弃”的行为,从而避免对主要消息流产生明显延迟。spring-doc.cadn.net.cn

从版本 4.0 开始,当拦截器(例如 WireTap)引用通道时,避免循环引用非常重要。 您需要将此类通道排除在当前拦截器所拦截的通道之外。 这可以通过适当的模式或编程方式实现。 如果您有一个自定义的 ChannelInterceptor 引用了 channel,请考虑实现 VetoCapableInterceptor。 这样,框架会根据提供的模式询问拦截器是否可以拦截每个候选通道。 您还可以在拦截器方法中添加运行时保护,以确保该通道不是由拦截器引用的通道。 WireTap 同时使用了这两种技术。

从版本 4.3 开始,WireTap具有额外的构造函数,这些构造函数接受一个channelName而不是一个 MessageChannel实例。这在 Java 配置和使用通道自动创建逻辑时非常方便。 目标MessageChannel bean 是在提供的channelName稍后解析的,在首次与拦截器交互时完成。spring-doc.cadn.net.cn

通道解析需要一个BeanFactory,因此wire tap实例必须是Spring管理的bean。

此晚期绑定的方法还允许通过使用Java DSL配置简化典型的探针模式,如下例所示:spring-doc.cadn.net.cn

@Bean
public PollableChannel myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input")
            .get();
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}

条件性线路监听

通过使用selectorselector-expression属性,可以对截获进行条件设置。
selector引用了一个MessageSelector bean,该bean可以在运行时决定消息是否应该发送到截获通道。
同样地,selector-expression是一个布尔SpEL表达式,其作用相同:如果该表达式的值为true(即为true),则消息会被发送到截获通道。spring-doc.cadn.net.cn

全局 Wire Tap 配置

可以将全局线程捕获配置为全局通道拦截器配置的一种特殊情形。
为此,配置一个顶层的wire-tap元素。
现在,除了正常的wire-tap命名空间支持外,还支持patternorder属性,并且它们的工作方式与channel-interceptor中的相同。
以下示例展示了如何配置全局线程捕获:spring-doc.cadn.net.cn

@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
    return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全局线程截获提供了一种方便的方法,在不修改现有通道配置的情况下,对外部配置单通道线程截获。 为此,请将pattern属性设置为目标通道名称。 例如,您可以使用此技术来配置一个测试用例以验证通道上的消息。