|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
基于 AMQP 的消息通道
有两种可用的消息通道实现。
一种是点对点(point-to-point),另一种是发布-订阅(publish-subscribe)。
这两种通道都为底层的 AmqpTemplate 和 SimpleMessageListenerContainer 提供了广泛的配置属性(如本章前面所示,针对通道适配器和网关)。
不过,此处展示的示例仅包含最小化配置。
请查阅 XML 架构以查看可用的属性。
点对点通道可能如下所示:
<int-amqp:channel id="p2pChannel"/>
在底层,前面的示例会导致声明一个名为 si.p2pChannel 的 Queue,并且该通道会向该 Queue 发送消息(技术上是通过向无名称的直接交换发送,其路由键与此 Queue 的名称匹配)。
该通道还会在该 Queue 上注册一个消费者。
如果您希望通道是“可轮询”的而不是基于消息驱动的,请提供值为 false 的 message-driven 标志,如下例所示:
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
发布-订阅通道可能如下所示:
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
在底层,上述示例会导致声明一个名为 si.fanout.pubSubChannel 的扇出(fanout)交换机,并且该通道会向此扇出交换机发送消息。
该通道还会声明一个由服务器命名的、排他的、自动删除的、非持久的 Queue,并将其绑定到扇出交换机,同时在该 Queue 上注册一个消费者以接收消息。
发布 - 订阅通道没有“可轮询”选项。
它必须是消息驱动的。
从版本 4.1 开始,基于 AMQP 的消息通道(结合 channel-transacted)支持
template-channel-transacted,以区分 transactional 的 AbstractMessageListenerContainer 配置和
RabbitTemplate 的配置。
请注意,此前 channel-transacted 默认是 true。
现在,默认情况下,AbstractMessageListenerContainer 为 false。
在 4.3 版本之前,基于 AMQP 的通道仅支持负载和头部为 Serializable 的消息。
整个消息会被转换(序列化)并发送到 RabbitMQ。
现在,您可以将 extract-payload 属性(或使用 Java 配置时为 setExtractPayload())设置为 true。
当此标志为 true 时,消息负载会被转换,且头部会被映射,其方式类似于使用通道适配器时的行为。
这种安排使得基于 AMQP 的通道可以与不可序列化的负载一起使用(例如配合其他消息转换器,如 Jackson2JsonMessageConverter)。
有关默认映射头部的更多信息,请参阅 AMQP 消息头部。
您可以通过提供使用 outbound-header-mapper 和 inbound-header-mapper 属性的自定义映射器来修改映射关系。
您现在还可以指定一个 default-delivery-mode,当不存在 amqp_deliveryMode 头部时使用它来设置投递模式。
默认情况下,Spring AMQP MessageProperties 使用 PERSISTENT 投递模式。
| 与其他持久化支持的通道一样,AMQP 支持的通道旨在提供消息持久性以避免消息丢失。 它们并非用于将工作分发给其他对等应用程序。 为此目的,请使用通道适配器。 |
从版本 5.0 开始,可轮询通道现在会将轮询线程阻塞指定的 receiveTimeout(默认值为 1 秒)。
此前,与其他 PollableChannel 实现不同,如果无可用消息,无论接收超时时间如何,线程都会立即返回给调度器。
与使用 basicGet() 检索消息(无超时)相比,阻塞操作的成本略高,因为每条消息都需要创建一个消费者来接收。
若要恢复之前的行为,请将轮询器的 receiveTimeout 设置为 0。 |
使用 Java 配置进行配置
以下示例展示了如何使用 Java 配置来配置通道:
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
使用 Java DSL 进行配置
以下示例展示了如何使用 Java DSL 配置通道:
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}