|
此版本仍在开发中,尚未被视为稳定版。如需最新稳定版本,请使用 Spring Integration 7.0.4! |
AMQP 1.0 支持
从 7.0 版本开始,Spring Integration 提供了支持 RabbitMQ AMQP 1.0 的通道适配器。
这些通道适配器基于 org.springframework.amqp:spring-rabbitmq-client 库构建。
Spring AMQP 文档提供了关于 RabbitMQ AMQP 1.0 支持的更多详细信息。
AMQP 1.0 出站通道适配器
AmqpClientMessageHandler是AbstractReplyProducingMessageHandler的实现,可根据setRequiresReply()配置作为单向通道适配器或出站网关使用。
该通道适配器的实例需要AMQP 1.0协议的AsyncAmqpTemplate实现,例如上述spring-rabbitmq-client库中的RabbitAmqpTemplate。
此消息处理器默认是异步的;因此,发布错误应通过请求消息中的errorChannel头或在应用上下文中的全局默认errorChannel来处理。
The exchange 用于发布消息(以及可选的 routingKey)与用于发布的 queue 互斥。
如果未提供任一选项,则 AsyncAmqpTemplate 实现必须确保这些目标部分具有某些默认值;否则,消息将因未送达而被拒绝。
默认情况下,MessageConverter 是一个 org.springframework.amqp.support.converter.SimpleMessageConverter,用于处理 String、Serializable 实例和字节数组。
此外,默认的 AmqpHeaderMapper 是一个 DefaultAmqpHeaderMapper.outboundMapper()。
此消息头映射器还用于将 AMQP 消息属性映射回回复中的消息头。
在网关模式下,可以传入 replyPayloadType 来转换回复消息体。
然而,MessageConverter 必须是 SmartMessageConverter 的实现类,例如 JacksonJsonMessageConverter。
此外,作为 replyPayloadType 的互斥选项,可以将 returnMessage 标志设置为 true,以将整个 org.springframework.amqp.core.Message 实例作为回复消息负载返回。
以下示例演示如何将 AmqpClientMessageHandler 配置为简单的 @ServiceActivator:
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
-
Java
@Bean
IntegrationFlow sendFlow(RabbitAmqpTemplate rabbitTemplate) {
return f -> f
.handle(AmqpClient.outboundAdapter(rabbitTemplate)
.exchange("e1")
.routingKeyExpression("'k1'"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
integrationFlow {
handle(AmqpClient.outboundAdapter(rabbitTemplate)
.apply {
exchange("e1")
routingKeyExpression("'k1'")
}
)
}
@Bean
sendFlow() {
integrationFlow {
handle(AmqpClient.outboundAdapter(rabbitTemplate)
.with {
exchange 'e1'
routingKeyExpression '''k1'''
}
)
}
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendChannel")
AmqpClientMessageHandler amqpClientMessageHandler(RabbitAmqpTemplate rabbitTemplate) {
AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
messageHandler.setExchangeExpressionString("headers[exchange]");
messageHandler.setRoutingKeyExpressionString("headers[routingKey]");
return messageHandler;
}
AmqpClientMessageHandler 的网关变体可能如下:
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
-
Java
@Bean
IntegrationFlow requestReplyOutboundFlow(RabbitAmqpTemplate rabbitTemplate) {
return f -> f
.handle(AmqpClient.outboundGateway(rabbitTemplate)
.queueFunction(m -> "requestReply"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
integrationFlow {
handle(AmqpClient.outboundGateway(rabbitTemplate)
.queueFunction { "requestReply" }
)
}
@Bean
sendFlow() {
integrationFlow {
handle(AmqpClient.outboundGateway(rabbitTemplate)
.with {
queueFunction { 'requestReply' }
}
)
}
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendAndReceiveChannel")
AmqpClientMessageHandler amqpClientGateway(RabbitAmqpTemplate rabbitTemplate) {
AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
messageHandler.setRequiresReply(true);
messageHandler.setReplyPayloadType(String.class);
messageHandler.setMessageConverter(new JacksonJsonMessageConverter());
messageHandler.setQueue("q1");
return messageHandler;
}
AMQP 1.0 消息驱动通道适配器
AmqpClientMessageProducer 是一个 MessageProducerSupport 实现,作为消息驱动通道适配器,用于通过 RabbitMQ AMQP 1.0 协议从队列中消费消息。
它需要一个 AmqpConnectionFactory 以及至少一个要消费的队列。
其内部逻辑基于 RabbitAmqpListenerContainer 和 IntegrationRabbitAmqpMessageListener,将转换后的已消费 AMQP 消息中继到 outputChannel。
部分 RabbitAmqpListenerContainer 配置选项作为设置器从 AmqpClientMessageProducer 公开。
默认情况下,MessageConverter 是一个 org.springframework.amqp.support.converter.SimpleMessageConverter,用于处理 String、Serializable 实例和字节数组。
此外,默认的 AmqpHeaderMapper 是一个 DefaultAmqpHeaderMapper.inboundMapper()。
可以将 messageConverter 选项设置为 null,以完全跳过转换(包括头部映射),并将接收到的 AMQP 消息作为要生成的 Spring 消息的有效载荷返回。
此外,AmqpClientMessageProducer 实现了 Pausable 契约,并委托给相应的 RabbitAmqpListenerContainer API。
当 AmqpClientMessageProducer.setBatchSize() > 1 时,此通道适配器以批处理模式工作。
在这种情况下,接收到的消息会被收集,直到达到批次大小或 batchReceiveTimeout 周期耗尽。
所有批处理的 AMQP 消息随后被转换为 Spring 消息,并生成一个结果列表作为包装消息的有效载荷,发送到 outputChannel。
批处理模式通过一次性结算所有批处理的消息,带来了一定的性能提升。
当 autoSettle 标志设置为 false 时,AcknowledgmentCallback 实例将作为 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 消息头提供,用于对接收到的消息或整个批次进行结算决策。
以下示例演示如何将 AmqpClientMessageProducer 配置为简单的入站端点:
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
-
Java
@Bean
IntegrationFlow receiveFlow(AmqpConnectionFactory connectionFactory) {
return IntegrationFlow.from(AmqpClient.inboundChannelAdapter(connectionFactory, "q1"))
.channel(c -> c.queue("receiveChannel"))
.get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, "q1")) {
channel("inputChannel")
}
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, 'q1')) {
channel 'inputChannel'
}
}
@Bean
AmqpClientMessageProducer batchAmqpClientMessageProducer(AmqpConnectionFactory connectionFactory,
QueueChannel inputChannel) {
AmqpClientMessageProducer amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "q3");
amqpClientMessageProducer.setOutputChannel(inputChannel);
amqpClientMessageProducer.setBatchSize(2);
return amqpClientMessageProducer;
}
AMQP 1.0 入站网关
The AmqpClientInboundGateway 是一个用于通过 RabbitMQ AMQP 1.0 协议接收请求并生成回复的 MessagingGatewaySupport 实现。
它与上述提到的 AmqpClientMessageProducer 类似,并共享许多 RabbitAmqpListenerContainer 配置选项。
此外,为了生成 AMQP 1.0 回复,AmqpClientInboundGateway 在内部使用了 RabbitAmqpTemplate。
为了实现自动回复与请求的关联,必须在请求消息中提供 replyTo 属性。
例如,RabbitAmqpTemplate.sendAndReceive() 依赖于 RabbitMQ AMQP 1.0 库中的 RpcClient,该库会生成一个独占且自动删除的队列。
或者,可以将回复地址设置为 replyExchange(以及可选的 replyRoutingKey)或 replyQueue(但不可同时设置两者),这些设置位于 AmqpClientInboundGateway 上,并委托给 RabbitAmqpTemplate 默认选项处理。
可以使用 messageId 或 correlationId 请求消息属性来关联回复。
如果缺失,RabbitAmqpTemplate.sendAndReceive() 中的 RpcClient 将自动生成一个。
AmqpClientInboundGateway 能够将此类关联键映射回回复消息。
以下示例演示如何将 AmqpClientInboundGateway 配置为简单的入站网关:
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
-
Java
@Bean
IntegrationFlow amqpClientInboundGatewayFlow(AmqpConnectionFactory connectionFactory) {
return IntegrationFlow.from(AmqpClient.inboundGateway(connectionFactory, "requestReply"))
.channel(c -> c.queue("inputChannel"))
.get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
integrationFlow(AmqpClient.inboundGateway(connectionFactory, "requestReply")) {
channel { queue("inputChannel") }
}
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
integrationFlow(AmqpClient.inboundGateway(connectionFactory, 'requestReply')) {
channel { queue 'inputChannel' }
}
}
@Bean
AmqpClientInboundGateway amqpClientInboundGateway(AmqpConnectionFactory connectionFactory) {
AmqpClientInboundGateway amqpClientInboundGateway = new AmqpClientInboundGateway(connectionFactory, "requestReply");
amqpClientInboundGateway.setRequestChannelName("inputChannel");
return amqpClientInboundGateway;
}