入站通道适配器
以下列表展示了 AMQP 入站通道适配器的可能配置选项:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
@Bean
public MessageChannel amqpInputChannel() {
return new DirectChannel();
}
@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
@Qualifier("amqpInputChannel") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("aName");
container.setConcurrentConsumers(2);
// ...
return container;
}
@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
<int-amqp:inbound-channel-adapter
id="inboundAmqp" (1)
channel="inboundChannel" (2)
queue-names="si.test.queue" (3)
acknowledge-mode="AUTO" (4)
advice-chain="" (5)
channel-transacted="" (6)
concurrent-consumers="" (7)
connection-factory="" (8)
error-channel="" (9)
expose-listener-channel="" (10)
header-mapper="" (11)
mapped-request-headers="" (12)
listener-container="" (13)
message-converter="" (14)
message-properties-converter="" (15)
phase="" (16)
prefetch-count="" (17)
receive-timeout="" (18)
recovery-interval="" (19)
missing-queues-fatal="" (20)
shutdown-timeout="" (21)
task-executor="" (22)
transaction-attribute="" (23)
transaction-manager="" (24)
batch-size="" (25)
consumers-per-queue (26)
batch-mode="MESSAGES"/> (27)
| 1 | 此适配器的唯一 ID。 可选。 |
| 2 | 转换后的消息应发送到的消息通道。 必填项。 |
| 3 | 应消费消息的 AMQP 队列名称(逗号分隔列表)。 必需项。 |
| 4 | MessageListenerContainer的确认模式。
当设置为MANUAL时,投递标签和通道将分别包含在消息头amqp_deliveryTag和amqp_channel中。
用户应用程序负责确认。
NONE表示无需确认(autoAck)。
AUTO表示适配器容器在下游流程完成后进行确认。
可选(默认为 AUTO)。
请参阅 入站端点确认模式。 |
| 5 | 用于处理与此入站通道适配器相关的横切行为的额外 AOP 通知。 可选。 |
| 6 | 用于指示由该组件创建的通道是否为事务性通道的标志。 如果为 true,则告知框架使用事务性通道,并根据操作结果(发送或接收)以提交或回滚结束所有操作;若出现异常信号,则触发回滚。 可选(默认为 false)。 |
| 7 | 指定要创建的并发消费者数量。
默认值为 1。
我们建议增加并发消费者数量,以扩展对来自队列的传入消息的消费能力。
但请注意,一旦注册多个消费者,任何顺序保证都将失效。
通常,对于低流量队列使用一个消费者。
当设置了 'consumers-per-queue' 时不允许此配置。
可选。 |
| 8 | 对 RabbitMQ ConnectionFactory 的 Bean 引用。
可选(默认为 connectionFactory)。 |
| 9 | 用于发送错误消息的消息通道。 可选。 |
| 10 | 是否将监听器通道(com.rabbitmq.client.Channel)暴露给已注册的 ChannelAwareMessageListener。
可选(默认为 true)。 |
| 11 | 接收 AMQP 消息时使用的 AmqpHeaderMapper 的引用。
可选。
默认情况下,仅标准 AMQP 属性(例如 contentType)会被复制到 Spring Integration MessageHeaders。
AMQP MessageProperties 内的任何用户自定义标头都不会由默认 DefaultAmqpHeaderMapper 复制到消息中。
如果提供了 request-header-names,则不允许此项。 |
| 12 | 以逗号分隔的 AMQP 头名称列表,这些头将从 AMQP 请求映射到 MessageHeaders。
仅当未提供 'header-mapper' 引用时,才能提供此内容。
该列表中的值也可以是用于匹配头名称的简单模式(例如 "*" 或 "thing1*, thing2" 或 "*something")。 |
| 13 | 引用用于接收 AMQP 消息的 AbstractMessageListenerContainer。
如果提供了此属性,则不应提供其他与监听器容器配置相关的属性。
换句话说,通过设置此引用,您必须对监听器容器的配置负全部责任。
唯一的例外是 MessageListener 本身。
由于这实际上是此通道适配器实现的核心职责,因此所引用的监听器容器不得拥有自己的 MessageListener。
可选。 |
| 14 | 接收 AMQP 消息时使用的 MessageConverter。
可选。 |
| 15 | 接收 AMQP 消息时使用的 MessagePropertiesConverter。
可选。 |
| 16 | 指定底层 AbstractMessageListenerContainer 应启动和停止的阶段。
启动顺序从最低到最高,关闭顺序则与此相反。
默认情况下,此值为 Integer.MAX_VALUE,表示该容器尽可能晚地启动并尽可能快地停止。
可选。 |
| 17 | 告知 AMQP 代理在单个请求中向每个消费者发送多少条消息。
通常,您可以将此值设置得较高以提高吞吐量。
该值应大于或等于事务大小(参见后文列表中的 batch-size 属性)。
可选(默认为 1)。 |
| 18 | 接收超时时间(毫秒)。
可选(默认为 1000)。 |
| 19 | 指定底层 AbstractMessageListenerContainer(以毫秒为单位)恢复尝试之间的间隔。
可选(默认为 5000)。 |
| 20 | 如果为'true'且代理上没有可用的任何队列,容器将在启动期间抛出致命异常并停止(在三次尝试被动声明队列后),如果在容器运行时删除了这些队列。
如果false,容器不会抛出异常,而是进入恢复模式,根据recovery-interval尝试重启。
可选(默认为true)。 |
| 21 | 在底层 AbstractMessageListenerContainer 停止后、强制关闭 AMQP 连接之前,等待工作线程的时间(以毫秒为单位)。
如果在收到关闭信号时有任何工作线程处于活动状态,只要它们能在此超时时间内完成处理,则允许其完成处理。
否则,连接将被关闭,消息将保持未确认状态(如果通道是事务性的)。
可选(默认为 5000)。 |
| 22 | 默认情况下,底层的 AbstractMessageListenerContainer 使用 SimpleAsyncTaskExecutor 实现,该实现为每个任务启动一个新线程并异步执行。
默认情况下,并发线程数量无限制。
请注意,此实现不会重用线程。
考虑使用线程池化的 TaskExecutor 实现作为替代方案。
可选(默认为 SimpleAsyncTaskExecutor)。 |
| 23 | 默认情况下,底层的 AbstractMessageListenerContainer 会创建 DefaultTransactionAttribute 的新实例(它采用 EJB 方式在运行时异常时回滚,但不在检查异常时回滚)。
可选(默认为 DefaultTransactionAttribute)。 |
| 24 | 将 Bean 引用设置到底层 AbstractMessageListenerContainer 上的外部 PlatformTransactionManager。
事务管理器与 channel-transacted 属性协同工作。
如果框架在发送或接收消息时已有事务在进行中,且 channelTransacted 标志为 true,则消息事务的提交或回滚将推迟到当前事务结束时执行。
如果 channelTransacted 标志为 false,则消息操作不适用任何事务语义(即自动确认)。
有关更多信息,请参阅 Spring AMQP 中的事务。
可选。 |
| 25 | 告知 SimpleMessageListenerContainer 在单个请求中处理多少条消息。
为了获得最佳效果,其值应小于或等于 prefetch-count 中设置的值。
当设置 'consumers-per-queue' 时不允许使用此选项。
可选(默认为 1)。 |
| 26 | 表示底层监听器容器应为 DirectMessageListenerContainer,而不是默认的 SimpleMessageListenerContainer。
有关更多信息,请参阅 Spring AMQP 参考手册。 |
| 27 | 当容器的 consumerBatchEnabled 为 true 时,决定适配器如何在消息负载中呈现批处理消息。
当设置为 MESSAGES(默认值)时,负载是一个 List<Message<?>>,其中每条消息的头部从传入的 AMQP Message 映射,而负载是转换后的 body。
当设置为 EXTRACT_PAYLOADS 时,负载是一个 List<?>,其中的元素从 AMQP Message 主体转换而来。
EXTRACT_PAYLOADS_WITH_HEADERS 与 EXTRACT_PAYLOADS 类似,但此外,每条消息的头部会从 MessageProperties 映射到对应索引处的 List<Map<String, Object>;头部名称为 AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS。 |
|
容器
请注意,当使用 XML 配置外部容器时,无法使用 Spring AMQP 命名空间来定义该容器。
这是因为该命名空间要求至少包含一个
|
尽管 Spring Integration 的 JMS 和 AMQP 支持相似,但仍存在重要差异。
JMS 入站通道适配器在底层使用 JmsDestinationPollingSource,并期望配置一个轮询器(poller)。
AMQP 入站通道适配器使用 AbstractMessageListenerContainer,并且是消息驱动的。
在这方面,它更接近于 JMS 消息驱动通道适配器。 |
从版本 5.5 开始,AmqpInboundChannelAdapter可以配置为使用在内部调用重试操作时应用于RecoveryCallback的org.springframework.amqp.rabbit.retry.MessageRecoverer策略。
有关更多信息,请参阅setMessageRecoverer()的 JavaDocs。
@Publisher 注解也可以与 @RabbitListener 结合使用:
@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {
@Bean
QueueChannel fromRabbitViaPublisher() {
return new QueueChannel();
}
@RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
@Publisher("fromRabbitViaPublisher")
@Payload("#args.payload.toUpperCase()")
public void consumeForPublisher(String payload) {
}
}
默认情况下,@Publisher AOP 拦截器处理方法调用的返回值。
然而,@RabbitListener 方法的返回值被视为 AMQP 回复消息。
因此,这种方法不能与 @Publisher 一起使用,所以对于这种组合,建议使用带有针对方法参数的相应 SpEL 表达式的 @Payload 注解。
有关 @Publisher 的更多信息,请参阅注解驱动配置部分。
当在监听器容器中使用独占或单活跃消费者时,建议将容器属性 forceStop 设置为 true。
这将防止一种竞态条件:在停止容器后,另一个消费者可能在该实例完全停止之前就开始消费消息。 |
批处理消息
有关批量消息的更多信息,请参阅 Spring AMQP 文档。
要使用 Spring Integration 生成批量消息,只需将出站端点配置为 BatchingRabbitTemplate。
在接收批量消息时,默认情况下,监听器容器会提取每个片段消息,并且适配器将为每个片段生成一个 Message<?>。
从 5.2 版本开始,如果容器的 deBatchingEnabled 属性设置为 false,则去分批操作将由适配器执行,并生成单个 Message<List<?>>,其负载为片段负载的列表(在适当的情况下经过转换)。
默认的 BatchingStrategy 是 SimpleBatchingStrategy,但可以在适配器上覆盖此设置。
在使用批处理进行需要重试的恢复操作时,必须使用 org.springframework.amqp.rabbit.retry.MessageBatchRecoverer。 |