入站通道适配器

以下列表展示了 AMQP 入站通道适配器的可能配置选项:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                               new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("aName");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }

    };
}
<int-amqp:inbound-channel-adapter
                                  id="inboundAmqp"                (1)
                                  channel="inboundChannel"        (2)
                                  queue-names="si.test.queue"     (3)
                                  acknowledge-mode="AUTO"         (4)
                                  advice-chain=""                 (5)
                                  channel-transacted=""           (6)
                                  concurrent-consumers=""         (7)
                                  connection-factory=""           (8)
                                  error-channel=""                (9)
                                  expose-listener-channel=""      (10)
                                  header-mapper=""                (11)
                                  mapped-request-headers=""       (12)
                                  listener-container=""           (13)
                                  message-converter=""            (14)
                                  message-properties-converter="" (15)
                                  phase=""                        (16)
                                  prefetch-count=""               (17)
                                  receive-timeout=""              (18)
                                  recovery-interval=""            (19)
                                  missing-queues-fatal=""         (20)
                                  shutdown-timeout=""             (21)
                                  task-executor=""                (22)
                                  transaction-attribute=""        (23)
                                  transaction-manager=""          (24)
                                  batch-size=""                   (25)
                                  consumers-per-queue             (26)
                                  batch-mode="MESSAGES"/>         (27)
1 此适配器的唯一 ID。 可选。
2 转换后的消息应发送到的消息通道。 必填项。
3 应消费消息的 AMQP 队列名称(逗号分隔列表)。 必需项。
4 MessageListenerContainer的确认模式。 当设置为MANUAL时,投递标签和通道将分别包含在消息头amqp_deliveryTagamqp_channel中。 用户应用程序负责确认。 NONE表示无需确认(autoAck)。 AUTO表示适配器容器在下游流程完成后进行确认。 可选(默认为 AUTO)。 请参阅 入站端点确认模式
5 用于处理与此入站通道适配器相关的横切行为的额外 AOP 通知。 可选。
6 用于指示由该组件创建的通道是否为事务性通道的标志。 如果为 true,则告知框架使用事务性通道,并根据操作结果(发送或接收)以提交或回滚结束所有操作;若出现异常信号,则触发回滚。 可选(默认为 false)。
7 指定要创建的并发消费者数量。 默认值为 1。 我们建议增加并发消费者数量,以扩展对来自队列的传入消息的消费能力。 但请注意,一旦注册多个消费者,任何顺序保证都将失效。 通常,对于低流量队列使用一个消费者。 当设置了 'consumers-per-queue' 时不允许此配置。 可选。
8 对 RabbitMQ ConnectionFactory 的 Bean 引用。 可选(默认为 connectionFactory)。
9 用于发送错误消息的消息通道。 可选。
10 是否将监听器通道(com.rabbitmq.client.Channel)暴露给已注册的 ChannelAwareMessageListener。 可选(默认为 true)。
11 接收 AMQP 消息时使用的 AmqpHeaderMapper 的引用。 可选。 默认情况下,仅标准 AMQP 属性(例如 contentType)会被复制到 Spring Integration MessageHeaders。 AMQP MessageProperties 内的任何用户自定义标头都不会由默认 DefaultAmqpHeaderMapper 复制到消息中。 如果提供了 request-header-names,则不允许此项。
12 以逗号分隔的 AMQP 头名称列表,这些头将从 AMQP 请求映射到 MessageHeaders。 仅当未提供 'header-mapper' 引用时,才能提供此内容。 该列表中的值也可以是用于匹配头名称的简单模式(例如 "*" 或 "thing1*, thing2" 或 "*something")。
13 引用用于接收 AMQP 消息的 AbstractMessageListenerContainer。 如果提供了此属性,则不应提供其他与监听器容器配置相关的属性。 换句话说,通过设置此引用,您必须对监听器容器的配置负全部责任。 唯一的例外是 MessageListener 本身。 由于这实际上是此通道适配器实现的核心职责,因此所引用的监听器容器不得拥有自己的 MessageListener。 可选。
14 接收 AMQP 消息时使用的 MessageConverter。 可选。
15 接收 AMQP 消息时使用的 MessagePropertiesConverter。 可选。
16 指定底层 AbstractMessageListenerContainer 应启动和停止的阶段。 启动顺序从最低到最高,关闭顺序则与此相反。 默认情况下,此值为 Integer.MAX_VALUE,表示该容器尽可能晚地启动并尽可能快地停止。 可选。
17 告知 AMQP 代理在单个请求中向每个消费者发送多少条消息。 通常,您可以将此值设置得较高以提高吞吐量。 该值应大于或等于事务大小(参见后文列表中的 batch-size 属性)。 可选(默认为 1)。
18 接收超时时间(毫秒)。 可选(默认为 1000)。
19 指定底层 AbstractMessageListenerContainer(以毫秒为单位)恢复尝试之间的间隔。 可选(默认为 5000)。
20 如果为'true'且代理上没有可用的任何队列,容器将在启动期间抛出致命异常并停止(在三次尝试被动声明队列后),如果在容器运行时删除了这些队列。 如果false,容器不会抛出异常,而是进入恢复模式,根据recovery-interval尝试重启。 可选(默认为true)。
21 在底层 AbstractMessageListenerContainer 停止后、强制关闭 AMQP 连接之前,等待工作线程的时间(以毫秒为单位)。 如果在收到关闭信号时有任何工作线程处于活动状态,只要它们能在此超时时间内完成处理,则允许其完成处理。 否则,连接将被关闭,消息将保持未确认状态(如果通道是事务性的)。 可选(默认为 5000)。
22 默认情况下,底层的 AbstractMessageListenerContainer 使用 SimpleAsyncTaskExecutor 实现,该实现为每个任务启动一个新线程并异步执行。 默认情况下,并发线程数量无限制。 请注意,此实现不会重用线程。 考虑使用线程池化的 TaskExecutor 实现作为替代方案。 可选(默认为 SimpleAsyncTaskExecutor)。
23 默认情况下,底层的 AbstractMessageListenerContainer 会创建 DefaultTransactionAttribute 的新实例(它采用 EJB 方式在运行时异常时回滚,但不在检查异常时回滚)。 可选(默认为 DefaultTransactionAttribute)。
24 将 Bean 引用设置到底层 AbstractMessageListenerContainer 上的外部 PlatformTransactionManager。 事务管理器与 channel-transacted 属性协同工作。 如果框架在发送或接收消息时已有事务在进行中,且 channelTransacted 标志为 true,则消息事务的提交或回滚将推迟到当前事务结束时执行。 如果 channelTransacted 标志为 false,则消息操作不适用任何事务语义(即自动确认)。 有关更多信息,请参阅 Spring AMQP 中的事务。 可选。
25 告知 SimpleMessageListenerContainer 在单个请求中处理多少条消息。 为了获得最佳效果,其值应小于或等于 prefetch-count 中设置的值。 当设置 'consumers-per-queue' 时不允许使用此选项。 可选(默认为 1)。
26 表示底层监听器容器应为 DirectMessageListenerContainer,而不是默认的 SimpleMessageListenerContainer。 有关更多信息,请参阅 Spring AMQP 参考手册
27 当容器的 consumerBatchEnabledtrue 时,决定适配器如何在消息负载中呈现批处理消息。 当设置为 MESSAGES(默认值)时,负载是一个 List<Message<?>>,其中每条消息的头部从传入的 AMQP Message 映射,而负载是转换后的 body。 当设置为 EXTRACT_PAYLOADS 时,负载是一个 List<?>,其中的元素从 AMQP Message 主体转换而来。 EXTRACT_PAYLOADS_WITH_HEADERSEXTRACT_PAYLOADS 类似,但此外,每条消息的头部会从 MessageProperties 映射到对应索引处的 List<Map<String, Object>;头部名称为 AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS
容器

请注意,当使用 XML 配置外部容器时,无法使用 Spring AMQP 命名空间来定义该容器。 这是因为该命名空间要求至少包含一个 <listener/> 元素。 在此环境中,监听器位于适配器内部。 因此,您必须使用普通的 Spring <bean/> 定义来配置容器,如下示例所示:spring-doc.cadn.net.cn

<bean id="container"
 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="aName.queue" />
    <property name="defaultRequeueRejected" value="false"/>
</bean>
尽管 Spring Integration 的 JMS 和 AMQP 支持相似,但仍存在重要差异。 JMS 入站通道适配器在底层使用 JmsDestinationPollingSource,并期望配置一个轮询器(poller)。 AMQP 入站通道适配器使用 AbstractMessageListenerContainer,并且是消息驱动的。 在这方面,它更接近于 JMS 消息驱动通道适配器。

从版本 5.5 开始,AmqpInboundChannelAdapter可以配置为使用在内部调用重试操作时应用于RecoveryCallbackorg.springframework.amqp.rabbit.retry.MessageRecoverer策略。 有关更多信息,请参阅setMessageRecoverer()的 JavaDocs。spring-doc.cadn.net.cn

@Publisher 注解也可以与 @RabbitListener 结合使用:spring-doc.cadn.net.cn

@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {

    @Bean
    QueueChannel fromRabbitViaPublisher() {
        return new QueueChannel();
    }

    @RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
    @Publisher("fromRabbitViaPublisher")
    @Payload("#args.payload.toUpperCase()")
    public void consumeForPublisher(String payload) {

    }

}

默认情况下,@Publisher AOP 拦截器处理方法调用的返回值。 然而,@RabbitListener 方法的返回值被视为 AMQP 回复消息。 因此,这种方法不能与 @Publisher 一起使用,所以对于这种组合,建议使用带有针对方法参数的相应 SpEL 表达式的 @Payload 注解。 有关 @Publisher 的更多信息,请参阅注解驱动配置部分。spring-doc.cadn.net.cn

当在监听器容器中使用独占或单活跃消费者时,建议将容器属性 forceStop 设置为 true。 这将防止一种竞态条件:在停止容器后,另一个消费者可能在该实例完全停止之前就开始消费消息。

批处理消息

有关批量消息的更多信息,请参阅 Spring AMQP 文档spring-doc.cadn.net.cn

要使用 Spring Integration 生成批量消息,只需将出站端点配置为 BatchingRabbitTemplatespring-doc.cadn.net.cn

在接收批量消息时,默认情况下,监听器容器会提取每个片段消息,并且适配器将为每个片段生成一个 Message<?>。 从 5.2 版本开始,如果容器的 deBatchingEnabled 属性设置为 false,则去分批操作将由适配器执行,并生成单个 Message<List<?>>,其负载为片段负载的列表(在适当的情况下经过转换)。spring-doc.cadn.net.cn

默认的 BatchingStrategySimpleBatchingStrategy,但可以在适配器上覆盖此设置。spring-doc.cadn.net.cn

在使用批处理进行需要重试的恢复操作时,必须使用 org.springframework.amqp.rabbit.retry.MessageBatchRecoverer