如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

消息端点

本章的第一部分涵盖了一些背景理论,并揭示了驱动 Spring Integration 各种消息组件的基础 API 的诸多细节。 如果您希望深入理解幕后运作机制,这些信息将非常有帮助。 不过,如果您希望快速上手并使用基于简化命名空间的配置来配置各种元素,不妨先跳转到 端点命名空间支持spring-doc.cadn.net.cn

如概述中所述,消息端点负责将各种消息组件连接到通道。 在接下来的几章中,我们将介绍多种消耗消息的不同组件。 其中一些还能够发送回复消息。 发送消息非常简单。 如前面在消息通道中所示,您可以将消息发送到消息通道。 然而,接收消息则稍微复杂一些。 主要原因是存在两种类型的消费者:轮询消费者事件驱动消费者spring-doc.cadn.net.cn

两者之中,事件驱动的消费者要简单得多。 由于无需管理和调度单独的轮询线程,它们本质上是带有回调方法的监听器。 当连接到 Spring Integration 的可订阅消息通道时,这种简单的选项效果极佳。 然而,当连接到缓冲型、可轮询的消息通道时,必须有某个组件来调度和管理轮询线程。 Spring Integration 提供了两种不同的端点实现来适配这两种类型的消费者。 因此,消费者本身只需实现回调接口。 当需要轮询时,端点充当消费者实例的容器。 其好处类似于使用容器托管消息驱动 Bean,但由于这些消费者是在 ApplicationContext 内运行的由 Spring 管理的对象,它更类似于 Spring 自身的 MessageListener 容器。spring-doc.cadn.net.cn

消息处理器

Spring Integration 的 MessageHandler 接口由框架内的许多组件实现。 换句话说,这不属于公共 API 的一部分,您通常不会直接实现 MessageHandler。 尽管如此,它被消息消费者用于实际处理已消费的消息,因此了解此策略接口有助于理解消费者的整体角色。 该接口的定义如下:spring-doc.cadn.net.cn

public interface MessageHandler {

    void handleMessage(Message<?> message);

}

尽管该接口非常简单,但它为后续章节中介绍的大多数组件(如路由器、转换器、拆分器、聚合器、服务激活器等)奠定了基础。 这些组件对它们处理的消息执行的功能各不相同,但实际接收消息的要求是相同的,在轮询和事件驱动行为之间的选择也是相同的。 Spring Integration 提供了两个端点实现来托管这些基于回调的处理程序,并允许将它们连接到消息通道。spring-doc.cadn.net.cn

事件驱动消费者

由于它是两者中更简单的一种,我们首先介绍事件驱动的消费者端点。 您可能还记得,SubscribableChannel 接口提供了一个 subscribe() 方法,并且该方法接受一个 MessageHandler 参数(如 SubscribableChannel 所示)。 以下列表展示了 subscribe 方法的定义:spring-doc.cadn.net.cn

subscribableChannel.subscribe(messageHandler);

由于订阅到通道的处理器无需主动轮询该通道,因此这是一种事件驱动的消费者。Spring Integration 提供的实现接受 SubscribableChannelMessageHandler,如下示例所示:spring-doc.cadn.net.cn

SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);

EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);

轮询消费者

Spring Integration 还提供了 PollingConsumer,其实例化方式相同,但通道必须实现 PollableChannel,如下示例所示:spring-doc.cadn.net.cn

PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);

PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
有关轮询消费者的更多信息,请参阅 通道适配器通道适配器

轮询消费者还有许多其他配置选项。 以下示例展示了如何设置触发器:spring-doc.cadn.net.cn

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));

PeriodicTrigger 通常使用简单间隔(Duration)定义,但也支持 initialDelay 属性和布尔型 fixedRate 属性(默认为 false — 即无固定延迟)。 以下示例同时设置了这两个属性:spring-doc.cadn.net.cn

PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);

上述示例中三个设置的结果是一个触发器,它会等待五秒,然后每秒触发一次。spring-doc.cadn.net.cn

CronTrigger 需要有效的 cron 表达式。 请参阅 Javadoc 了解详情。 以下示例设置新的 CronTriggerspring-doc.cadn.net.cn

CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");

上一个示例中定义的触发器的结果是每隔十秒触发一次,时间为周一至周五。spring-doc.cadn.net.cn

轮询端点的默认触发器是一个 PeriodicTrigger 实例,具有 1 秒的固定延迟周期。

除了触发器外,您还可以指定另外两个与轮询相关的配置属性:maxMessagesPerPollreceiveTimeout。 以下示例展示了如何设置这两个属性:spring-doc.cadn.net.cn

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);

maxMessagesPerPoll 属性指定在一次轮询操作中接收消息的最大数量。 这意味着轮询器会持续调用 receive() 而无需等待,直到返回 null 或达到最大值为止。 例如,如果轮询器的触发间隔为十秒,且 maxMessagesPerPoll 设置为 25,并且它正在轮询一个队列中有 100 条消息的通道,那么所有 100 条消息可以在 40 秒内被检索到。 它会抓取 25 条,等待十秒,再抓取接下来的 25 条,依此类推。 如果将 maxMessagesPerPoll 配置为负值,则 MessageSource.receive() 将在单个轮询周期内被调用,直到其返回 null。 从 5.5 版本开始,0 值具有特殊含义——完全跳过 MessageSource.receive() 调用,这可被视为暂停此轮询端点,直到稍后将 maxMessagesPerPoll 更改为非零值(例如通过控制总线)。spring-doc.cadn.net.cn

receiveTimeout 属性指定当轮询器调用接收操作且无可用消息时,应等待的时间长度。 例如,考虑两个表面上相似但实际截然不同的选项:第一个具有 5 秒的间隔触发器和 50 毫秒的接收超时,而第二个具有 50 毫秒的间隔触发器和 5 秒的接收超时。 第一个选项可能在通道接受消息后最多延迟 4950 毫秒才接收到该消息(如果该消息在其某次轮询调用返回后立即到达)。 另一方面,第二种配置永远不会错过超过 50 毫秒的消息。 区别在于,第二个选项需要线程进行等待。 然而,正因如此,它能够对到达的消息做出更快的响应。 这种被称为“长轮询”的技术可用于在轮询源上模拟事件驱动行为。spring-doc.cadn.net.cn

轮询消费者也可以委托给 Spring TaskExecutor,如下示例所示:spring-doc.cadn.net.cn

PollingConsumer consumer = new PollingConsumer(channel, handler);

TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);

此外,PollingConsumer 拥有一个名为 adviceChain 的属性。 该属性允许您指定用于处理包括事务在内的其他横切关注点的 AOP 通知的 List。 这些通知将应用于 doPoll() 方法周围。 有关更详细的信息,请参阅“端点命名空间支持”下关于 AOP 通知链和事务支持的章节。 另请参阅 @Poller 注解的 Javadoc 以及相应的 消息注解支持 章节。 Java DSL 还提供了一种带有相应 Pollers 工厂的 .poller() 端点配置选项。spring-doc.cadn.net.cn

前面的示例展示了依赖查找。 然而,请记住,这些消费者通常配置为 Spring Bean 定义。 实际上,Spring Integration 还提供了一个 FactoryBean,名为 ConsumerEndpointFactoryBean,它根据通道的类型创建相应的消费者类型。 此外,Spring Integration 完全支持 XML 命名空间,可以进一步隐藏这些细节。 本指南在介绍每种组件类型时,都会展示基于命名空间的配置。spring-doc.cadn.net.cn

许多 MessageHandler 实现可以生成回复消息。 如前所述,与接收消息相比,发送消息是微不足道的。 然而,何时以及发送多少回复消息取决于处理器类型。 例如,聚合器会等待一定数量的消息到达,并且通常被配置为拆分器的下游消费者,后者可以为每个处理的消息生成多个回复。 当使用命名空间配置时,您并不严格需要了解所有细节。 但是,了解其中一些组件共享一个共同的基类 AbstractReplyProducingMessageHandler,并且该类提供了一个 setOutputChannel(..) 方法,可能仍然值得。

端点命名空间支持

在本参考手册中,您可以找到针对端点元素(如 router、transformer、service-activator 等)的特定配置示例。 大多数这些元素支持 input-channel 属性,许多还支持 output-channel 属性。 解析后,这些端点元素会根据所引用的 input-channel 的类型生成 PollingConsumerEventDrivenConsumer 的实例:PollableChannelSubscribableChannel,分别对应。 当通道可轮询时,轮询行为基于端点元素的 poller 子元素及其属性。spring-doc.cadn.net.cn

以下列出了 poller 的所有可用配置选项:spring-doc.cadn.net.cn

<int:poller cron=""                                  (1)
            default="false"                          (2)
            error-channel=""                         (3)
            fixed-delay=""                           (4)
            fixed-rate=""                            (5)
            initial-delay=""                         (6)
            id=""                                    (7)
            max-messages-per-poll=""                 (8)
            receive-timeout=""                       (9)
            ref=""                                   (10)
            task-executor=""                         (11)
            time-unit="MILLISECONDS"                 (12)
            trigger="">                              (13)
            <int:advice-chain />                     (14)
            <int:transactional />                    (15)
</int:poller>
1 提供使用 Cron 表达式配置轮询器的功能。 底层实现使用一个 org.springframework.scheduling.support.CronTrigger。 如果设置了此属性,则不得指定以下任何属性:fixed-delaytriggerfixed-rateref
2 通过将此属性设置为 true,您可以定义一个全局默认轮询器。 如果在应用程序上下文中定义了多个默认轮询器,则会抛出异常。 任何连接到 PollableChannelPollingConsumer)的端点,或任何未显式配置轮询器的 SourcePollingChannelAdapter,将使用全局默认轮询器。 其默认值为 false。 可选。
3 标识当此轮询器的调用发生故障时,错误消息发送到的通道。 要完全抑制异常,您可以提供对 nullChannel 的引用。 可选。
4 固定延迟触发器在底层使用 PeriodicTrigger。 数值可以是 time-unit,或者从版本 6.2 开始,也可以采用持续时间格式,例如 PT10SP1D。 如果设置了此属性,则不得指定以下任何属性:fixed-ratetriggercronref
5 固定频率触发器在底层使用 PeriodicTrigger。 数值单位为 time-unit,或可表示为持续时间格式(自 6.2 版本起),例如:PT10SP1D。 如果设置了此属性,则不得指定以下任何属性:fixed-delaytriggercronref
6 底层PeriodicTrigger的初始延迟(从 6.2 版本开始)。 数值可以是time-unit,也可以采用持续时间格式,例如PT10SP1D
7 引用轮询器底层 Bean 定义的 ID,其类型为 org.springframework.integration.scheduling.PollerMetadata。 除非是默认轮询器(default="true"),否则顶层 id 元素必须包含 id 属性。
8 请参阅配置入站通道适配器以获取更多信息。 如果未指定,则默认值取决于上下文。 如果您使用PollingConsumer,则该属性默认为-1。 但是,如果您使用SourcePollingChannelAdapter,则max-messages-per-poll属性默认为1。 可选。
9 值在底层类 PollerMetadata 上设置。 如果未指定,则默认为 1000(毫秒)。 可选。
10 对另一个顶级轮询器的 Bean 引用。 ref 属性不得出现在顶级 poller 元素上。 但是,如果设置了此属性,则不得指定以下任何属性:fixed-ratetriggercronfixed-delay
11 提供引用自定义任务执行器的能力。 有关更多信息,请参阅 TaskExecutor 支持。 可选。
12 此属性指定底层 org.springframework.scheduling.support.PeriodicTrigger 上的 java.util.concurrent.TimeUnit 枚举值。 因此,此属性只能与 fixed-delayfixed-rate 属性结合使用。 如果与 crontrigger 引用属性结合使用,将导致失败。 PeriodicTrigger 支持的最小粒度为毫秒。 因此,可用的选项仅为毫秒和秒。 如果未提供此值,则任何 fixed-delayfixed-rate 值将被解释为毫秒。 基本上,此枚举为基于秒的区间触发器值提供了便利。 对于每小时、每天和每月的设置,我们建议使用 cron 触发器。
13 引用任何实现了 org.springframework.scheduling.Trigger 接口的 Spring 配置的 Bean。 然而,如果设置了此属性,则不得指定以下任何属性:fixed-delayfixed-ratecronref。 可选。
14 允许指定额外的 AOP 通知来处理额外的横切关注点。 有关更多信息,请参阅 事务。 可选。
15 轮询器可以设置为事务性的。 有关更多信息,请参阅 AOP 通知链。 可选。

示例

一个具有 1 秒间隔的简单基于间隔的轮询器可以配置如下:spring-doc.cadn.net.cn

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller fixed-rate="1000"/>
</int:transformer>

作为不使用 fixed-rate 属性的替代方案,您也可以使用 fixed-delay 属性。spring-doc.cadn.net.cn

对于基于 Cron 表达式的轮询器,请使用 cron 属性,如下示例所示:spring-doc.cadn.net.cn

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>

如果输入通道是 PollableChannel,则必须配置轮询器。 具体而言,如前所述,triggerPollingConsumer 类的一个必需属性。 因此,如果您在轮询消费者端点的配置中省略了 poller 子元素,可能会抛出异常。 如果您尝试在与不可轮询的通道相连的元素上配置轮询器,也可能抛出该异常。spring-doc.cadn.net.cn

也可以创建顶级轮询器,在这种情况下,只需要一个 ref 属性,如下例所示:spring-doc.cadn.net.cn

<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller ref="weekdayPoller"/>
</int:transformer>
ref 属性仅允许用于内部轮询器定义。 如果在顶层轮询器上定义此属性,将在应用程序上下文初始化期间抛出配置异常。

全局默认轮询器

为了进一步简化配置,您可以定义一个全局默认轮询器。 在 XML DSL 中,单个顶层轮询器组件可以将 default 属性设置为 true。 对于 Java 配置,在这种情况下必须声明一个名为 PollerMetadata.DEFAULT_POLLERPollerMetadata Bean。 在这种情况下,任何在其输入通道上具有 PollableChannel、在同一 ApplicationContext 内定义且未显式配置 poller 的端点都将使用该默认值。 以下示例展示了这样一个轮询器及其使用的转换器:spring-doc.cadn.net.cn

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
    return IntegrationFlow.from(MessageChannels.queue("pollable"))
                           .transform(transformer) // No 'poller' attribute because there is a default global poller
                           .channel("output")
                           .get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

@Bean
public QueueChannel pollable() {
   return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
    ...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
    PollerMetadata()
        .also {
            it.maxMessagesPerPoll = 5
            it.trigger = PeriodicTrigger(3000)
        }

@Bean
fun convertFlow() =
    integrationFlow(MessageChannels.queue("pollable")) {
    	transform(transformer) // No 'poller' attribute because there is a default global poller
    	channel("output")
    }
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>

<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
                 ref="transformer"
                 output-channel="output"/>

事务支持

Spring Integration 还为轮询器(pollers)提供了事务支持,以便每个接收和转发操作都可以作为原子工作单元执行。 要为轮询器配置事务,请添加 <transactional/> 子元素。 以下示例展示了可用的属性:spring-doc.cadn.net.cn

<int:poller fixed-delay="1000">
    <int:transactional transaction-manager="txManager"
                       propagation="REQUIRED"
                       isolation="REPEATABLE_READ"
                       timeout="10000"
                       read-only="false"/>
</int:poller>

有关更多信息,请参阅 轮询事务支持spring-doc.cadn.net.cn

AOP 通知链

由于 Spring 事务支持依赖于使用 TransactionInterceptor(AOP 通知)的代理机制来处理由轮询器启动的消息流的事务行为,因此您有时需要提供额外的通知来处理与轮询器相关的其他横切行为。 为此,poller 定义了一个 advice-chain 元素,允许您在实现 MethodInterceptor 接口的类中添加更多通知。 以下示例展示了如何为 poller 定义 advice-chainspring-doc.cadn.net.cn

<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
		method="good" output-channel="output">
	<int:poller max-messages-per-poll="1" fixed-rate="10000">
		 <int:advice-chain>
			<ref bean="adviceA" />
			<beans:bean class="org.something.SampleAdvice" />
			<ref bean="txAdvice" />
		</int:advice-chain>
	</int:poller>
</int:service-activator>

有关如何实现 MethodInterceptor 接口的更多信息,请参阅 Spring Framework Reference Guide 中的 AOP 部分。 建议链也可以应用于没有事务配置的轮询器,从而增强由轮询器启动的消息流的行为。spring-doc.cadn.net.cn

在使用建议链时,不能指定 <transactional/> 子元素。 相反,请声明一个 <tx:advice/> bean 并将其添加到 <advice-chain/> 中。 有关完整的配置详情,请参阅 轮询器事务支持

TaskExecutor 支持

轮询线程可由 Spring 的 TaskExecutor 抽象的任何实例执行。这为端点或一组端点启用并发。自 Spring 3 起。0,核心 Spring 框架拥有一个 task 命名空间,其 <executor/> 元素支持创建简单的线程池执行器。该元素接受用于常见并发设置的属性,例如 pool-size(池大小)和 queue-capacity(队列容量)。配置线程池执行器可以显著影响端点在负载下的性能表现。这些设置适用于每个端点,因为端点的性能是需要考虑的主要因素之一(另一个主要因素是端点所订阅通道的预期流量)。要为配置了 XML 命名空间支持的轮询端点启用并发功能,请在其 task-executor 元素上提供 <poller/> 引用,然后提供以下示例中显示的一个或多个属性:spring-doc.cadn.net.cn

<int:poller task-executor="pool" fixed-rate="1000"/>

<task:executor id="pool"
               pool-size="5-25"
               queue-capacity="20"
               keep-alive="120"/>

如果您未提供 task-executor,消费者的处理器将在调用者的线程中被调用。 请注意,调用者通常是默认的 TaskScheduler(参见 配置任务调度器)。 您还应记住,task-executor 属性可以通过指定 Bean 名称来引用 Spring 的 TaskExecutor 接口的任何实现。 前面展示的 executor 元素是为了方便而提供的。spring-doc.cadn.net.cn

正如前面在轮询消费者的背景部分中提到的,您还可以通过以下方式配置轮询消费者以模拟事件驱动的行为。 通过设置较长的接收超时时间和较短的触发间隔,您可以确保即使在轮询消息源上也能对到达的消息做出非常及时的反应。 请注意,这仅适用于具有带超时的阻塞等待调用的源。 例如,文件轮询器不会阻塞。 每次receive()调用都会立即返回,其中可能包含新文件,也可能不包含。 因此,即使轮询器中包含一个很长的receive-timeout,该值在这种场景下也永远不会被使用。 另一方面,当使用 Spring Integration 自己的基于队列的通道时,超时值确实有机会发挥作用。 下面的示例展示了轮询消费者如何近乎即时地接收消息:spring-doc.cadn.net.cn

<int:service-activator input-channel="someQueueChannel"
    output-channel="output">
    <int:poller receive-timeout="30000" fixed-rate="10"/>

</int:service-activator>

采用这种方法不会带来太多开销,因为内部而言,它不过是一个定时等待线程,其 CPU 资源消耗远低于(例如)一个疯狂运行的无限 while 循环。spring-doc.cadn.net.cn

在运行时更改轮询频率

当使用 fixed-delayfixed-rate 属性配置轮询器时,默认实现会使用 PeriodicTrigger 实例。 PeriodicTrigger 是 Spring 框架核心的一部分。 它仅接受间隔作为构造函数参数。 因此,无法在运行时更改它。spring-doc.cadn.net.cn

然而,您可以定义自己的 org.springframework.scheduling.Trigger 接口实现。 您甚至可以将 PeriodicTrigger 作为起点。 然后,您可以为间隔(周期)添加一个 setter,或者甚至在触发器本身中嵌入自己的节流逻辑。 period 属性用于每次调用 nextExecutionTime 时以安排下一次轮询。 要在轮询器中使用此自定义触发器,请在您的应用上下文中声明自定义触发器的 bean 定义,并通过使用引用自定义触发器 bean 实例的 trigger 属性将依赖项注入到您的轮询器配置中。 现在,您可以获取对触发器 bean 的引用,并在轮询之间更改轮询间隔。spring-doc.cadn.net.cn

例如,请参阅 Spring Integration 示例 项目。 该项目包含一个名为 dynamic-poller 的示例,它使用自定义触发器,并展示了在运行时更改轮询间隔的能力。spring-doc.cadn.net.cn

该示例提供了一个自定义触发器,它实现了 org.springframework.scheduling.Trigger 接口。 该示例的触发器基于 Spring 的 PeriodicTrigger 实现。 然而,自定义触发器的字段不是 final 的,并且属性具有显式的 getter 和 setter,使您可以在运行时动态更改轮询周期。spring-doc.cadn.net.cn

需要注意的是,由于 Trigger 方法的值为 nextExecutionTime(),对动态触发器的任何更改在下次轮询之前不会生效,这基于现有配置。 无法强制触发器在其当前配置的下次执行时间之前触发。

有效载荷类型转换

在整个参考手册中,您还可以看到各种端点的特定配置和实现示例,这些端点接受消息或任意 Object 作为输入参数。 在 Object 的情况下,此类参数被映射到消息负载、负载的一部分或标头(当使用 Spring 表达式语言时)。 然而,端点方法的输入参数类型有时与负载或其部分的类型不匹配。 在这种情况下,我们需要执行类型转换。 Spring Integration 提供了一种便捷的方式,在其名为 integrationConversionService 的转换服务 Bean 实例内注册类型转换器(通过使用 Spring ConversionService)。 一旦通过 Spring Integration 基础设施定义了第一个转换器,该 Bean 便会自动创建。 To register a converter, you can implement org.springframework.core.convert.converter.Converter, org.springframework.core.convert.converter.GenericConverter, or org.springframework.core.convert.converter.ConverterFactoryspring-doc.cadn.net.cn

Converter 接口的实现是最简单的,它用于将单一类型转换为另一种类型。 对于更复杂的场景,例如转换到类层次结构,您可以实现 GenericConverter 接口,并可能同时实现 ConditionalConverter 接口。 这些接口使您能够完全访问 fromto 类型描述符,从而实现复杂的转换。 例如,如果您有一个名为 Something 的抽象类作为转换目标(如参数类型、通道数据类型等),并且有两个具体实现类 Thing1Thing,您希望根据输入类型选择其中一个进行转换,那么 GenericConverter 将是合适的选择。 有关更多信息,请参阅这些接口的 Javadoc:spring-doc.cadn.net.cn

实现转换器后,您可以使用便捷的命名空间支持将其注册,如下例所示:spring-doc.cadn.net.cn

<int:converter ref="sampleConverter"/>

<bean id="sampleConverter" class="foo.bar.TestConverter"/>

或者,您可以使用内部 Bean,如下示例所示:spring-doc.cadn.net.cn

<int:converter>
    <bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>

从 Spring Integration 4.0 开始,您可以使用注解来创建前述配置,如下示例所示:spring-doc.cadn.net.cn

@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {

	public Number convert(Boolean source) {
		return source ? 1 : 0;
	}

}

或者,您可以使用 @Configuration 注解,如下示例所示:spring-doc.cadn.net.cn

@Configuration
@EnableIntegration
public class ContextConfiguration {

	@Bean
	@IntegrationConverter
	public SerializingConverter serializingConverter() {
		return new SerializingConverter();
	}

}

在配置应用上下文时,Spring 框架允许您添加一个 conversionService Bean(请参阅 配置 ConversionService 章节)。 该服务在需要时用于在 Bean 创建和配置期间执行适当的转换。spring-doc.cadn.net.cn

相比之下,integrationConversionService 用于运行时转换。 这些用途截然不同。 旨在用于绑定 Bean 构造函数参数和属性的转换器,如果在运行时用于 Spring Integration 表达式求值(针对数据类型通道、负载类型转换器中的消息等),可能会产生意外结果。spring-doc.cadn.net.cn

然而,如果您确实希望将 Spring conversionService 用作 Spring Integration integrationConversionService,您可以在应用上下文中配置一个别名,如下例所示:spring-doc.cadn.net.cn

<alias name="conversionService" alias="integrationConversionService"/>

在这种情况下,conversionService 提供的转换器可用于 Spring Integration 运行时转换。spring-doc.cadn.net.cn

内容类型转换

从版本 5.0 开始,默认情况下,方法调用机制基于 org.springframework.messaging.handler.invocation.InvocableHandlerMethod 基础设施。 其 HandlerMethodArgumentResolver 实现(例如 PayloadArgumentResolverMessageMethodArgumentResolver)可以使用 MessageConverter 抽象将传入的 payload 转换为目标方法参数类型。 转换可以基于 contentType 消息头进行。 为此,Spring Integration 提供了 ConfigurableCompositeMessageConverter,它将委托给已注册的转换器列表进行调用,直到其中一个返回非 null 结果。 默认情况下,此转换器按严格顺序提供:spring-doc.cadn.net.cn

请参阅 Javadoc(在前面的列表中链接),以了解其用途以及转换时合适的 contentType 值的更多信息。 使用 ConfigurableCompositeMessageConverter 是因为它可以与任何其他 MessageConverter 实现一起提供,包括或不包括前面提到的默认转换器。 它还可以作为适当的 Bean 注册到应用程序上下文中,从而覆盖默认转换器,如下例所示:spring-doc.cadn.net.cn

@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
    List<MessageConverter> converters =
        Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
                 new JavaSerializationMessageConverter());
    return new ConfigurableCompositeMessageConverter(converters);
}

这两个新的转换器在默认值之前注册到组合器中。 您也可以不使用 ConfigurableCompositeMessageConverter,而是通过注册一个名称为 integrationArgumentResolverMessageConverter 的 Bean(通过设置 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME 属性)来提供您自己的 MessageConverterspring-doc.cadn.net.cn

在使用 SpEL 方法调用时,不可用基于 MessageConverter 的转换(包括 contentType 个头部)。 在这种情况下,仅可使用上述 有效载荷类型转换中提到的常规类到类的转换。

异步轮询

如果您希望轮询是异步的,poller 可以可选地指定一个 task-executor 属性,该属性指向任何现有 TaskExecutor Bean 的实例(Spring 3.0 通过 task 命名空间提供了便捷的配置方式)。 然而,在使用 TaskExecutor 配置 poller 时,您必须理解某些事项。spring-doc.cadn.net.cn

问题在于存在两个配置:轮询器和 TaskExecutor。 它们必须相互协调。 否则,可能会导致人为的内存泄漏。spring-doc.cadn.net.cn

考虑以下配置:spring-doc.cadn.net.cn

<int:channel id="publishChannel">
    <int:queue />
</int:channel>

<int:service-activator input-channel="publishChannel" ref="myService">
	<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>

<task:executor id="taskExecutor" pool-size="20" />

上述配置展示了一个未调优的配置。spring-doc.cadn.net.cn

默认情况下,任务执行器拥有一个无界的任务队列。 即使所有线程都被阻塞(正在等待新消息到达或超时结束),轮询器仍会持续调度新任务。 假设有 20 个线程在执行任务,且超时时间为 5 秒,则任务执行速率为每秒 4 个。 然而,新任务的调度速率为每秒 20 个,因此任务执行器内部的队列在进程空闲时以每秒 16 个的速度增长,从而导致内存泄漏。spring-doc.cadn.net.cn

处理此问题的方法之一是将任务执行器的queue-capacity属性设置为该值。 即使设置为0也是一个合理的值。 您也可以通过设置任务执行器的rejection-policy属性(例如设置为DISCARD)来指定无法入队的消息应如何处理,从而对其进行管理。 换句话说,在配置TaskExecutor时,您必须了解某些细节。 有关该主题的更多详细信息,请参阅Spring参考手册中的"任务执行与调度"章节。spring-doc.cadn.net.cn

端点内部 Bean

许多端点是复合 Bean。 这包括所有消费者和所有轮询的入站通道适配器。 消费者(轮询或事件驱动)委托给 MessageHandler。 轮询适配器通过委托给 MessageSource 来获取消息。 通常,获取对委托 Bean 的引用很有用,也许是为了在运行时更改配置或进行测试。 这些 Bean 可以通过已知名称从 ApplicationContext 中获取。 MessageHandler 实例会注册到应用程序上下文中,其 Bean ID 类似于 someConsumer.handler(其中 'consumer' 是端点的 id 属性的值)。 MessageSource 实例会注册为类似的 Bean ID somePolledAdapter.source,其中 'somePolledAdapter' 是适配器的 ID。spring-doc.cadn.net.cn

上述内容仅适用于框架组件本身。 您可以改用内部 Bean 定义,如下示例所示:spring-doc.cadn.net.cn

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
            output-channel = "outChannel" method="foo">
    <beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>

该 Bean 被视为任何已声明的内部 Bean,不会注册到应用上下文中。 如果您希望以其他方式访问此 Bean,请在顶层使用 id 声明它,并改用 ref 属性。 有关更多信息,请参阅 Spring 文档spring-doc.cadn.net.cn