AMQP 1.0 支持

从 7.0 版本开始,Spring Integration 提供了支持 RabbitMQ AMQP 1.0 的通道适配器。 这些通道适配器基于 org.springframework.amqp:spring-rabbitmq-client 库构建。spring-doc.cadn.net.cn

Spring AMQP 文档提供了关于 RabbitMQ AMQP 1.0 支持的更多详细信息。spring-doc.cadn.net.cn

AMQP 1.0 出站通道适配器

AmqpClientMessageHandlerAbstractReplyProducingMessageHandler的实现,可根据setRequiresReply()配置作为单向通道适配器或出站网关使用。 该通道适配器的实例需要AMQP 1.0协议的AsyncAmqpTemplate实现,例如上述spring-rabbitmq-client库中的RabbitAmqpTemplate。 此消息处理器默认是异步的;因此,发布错误应通过请求消息中的errorChannel头或在应用上下文中的全局默认errorChannel来处理。spring-doc.cadn.net.cn

The exchange 用于发布消息(以及可选的 routingKey)与用于发布的 queue 互斥。 如果未提供任一选项,则 AsyncAmqpTemplate 实现必须确保这些目标部分具有某些默认值;否则,消息将因未送达而被拒绝。spring-doc.cadn.net.cn

默认情况下,MessageConverter 是一个 org.springframework.amqp.support.converter.SimpleMessageConverter,用于处理 String、Serializable 实例和字节数组。 此外,默认的 AmqpHeaderMapper 是一个 DefaultAmqpHeaderMapper.outboundMapper()。 此消息头映射器还用于将 AMQP 消息属性映射回回复中的消息头。spring-doc.cadn.net.cn

在网关模式下,可以传入 replyPayloadType 来转换回复消息体。 然而,MessageConverter 必须是 SmartMessageConverter 的实现类,例如 JacksonJsonMessageConverter。 此外,作为 replyPayloadType 的互斥选项,可以将 returnMessage 标志设置为 true,以将整个 org.springframework.amqp.core.Message 实例作为回复消息负载返回。spring-doc.cadn.net.cn

以下示例演示如何将 AmqpClientMessageHandler 配置为简单的 @ServiceActivatorspring-doc.cadn.net.cn

@Bean
IntegrationFlow sendFlow(RabbitAmqpTemplate rabbitTemplate) {
    return f -> f
            .handle(AmqpClient.outboundAdapter(rabbitTemplate)
                    .exchange("e1")
                    .routingKeyExpression("'k1'"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
    integrationFlow {
                handle(AmqpClient.outboundAdapter(rabbitTemplate)
    		            .apply {
    		                exchange("e1")
                            routingKeyExpression("'k1'")
    		            }
    		    )
    }
@Bean
sendFlow() {
    integrationFlow {
        handle(AmqpClient.outboundAdapter(rabbitTemplate)
                .with {
                     exchange 'e1'
                     routingKeyExpression '''k1'''
                }
        )
    }
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendChannel")
AmqpClientMessageHandler amqpClientMessageHandler(RabbitAmqpTemplate rabbitTemplate) {
    AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
    messageHandler.setExchangeExpressionString("headers[exchange]");
    messageHandler.setRoutingKeyExpressionString("headers[routingKey]");
    return messageHandler;
}

AmqpClientMessageHandler 的网关变体可能如下:spring-doc.cadn.net.cn

@Bean
IntegrationFlow requestReplyOutboundFlow(RabbitAmqpTemplate rabbitTemplate) {
    return f -> f
            .handle(AmqpClient.outboundGateway(rabbitTemplate)
                    .queueFunction(m -> "requestReply"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
    integrationFlow {
                handle(AmqpClient.outboundGateway(rabbitTemplate)
    		            .queueFunction { "requestReply" }
                )
    }
@Bean
sendFlow() {
    integrationFlow {
        handle(AmqpClient.outboundGateway(rabbitTemplate)
                .with {
                     queueFunction { 'requestReply' }
                }
        )
    }
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendAndReceiveChannel")
AmqpClientMessageHandler amqpClientGateway(RabbitAmqpTemplate rabbitTemplate) {
    AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
    messageHandler.setRequiresReply(true);
    messageHandler.setReplyPayloadType(String.class);
    messageHandler.setMessageConverter(new JacksonJsonMessageConverter());
    messageHandler.setQueue("q1");
    return messageHandler;
}

AMQP 1.0 消息驱动通道适配器

AmqpClientMessageProducer 是一个 MessageProducerSupport 实现,作为消息驱动通道适配器,用于通过 RabbitMQ AMQP 1.0 协议从队列中消费消息。 它需要一个 AmqpConnectionFactory 以及至少一个要消费的队列。 其内部逻辑基于 RabbitAmqpListenerContainerIntegrationRabbitAmqpMessageListener,将转换后的已消费 AMQP 消息中继到 outputChannel。 部分 RabbitAmqpListenerContainer 配置选项作为设置器从 AmqpClientMessageProducer 公开。spring-doc.cadn.net.cn

默认情况下,MessageConverter 是一个 org.springframework.amqp.support.converter.SimpleMessageConverter,用于处理 String、Serializable 实例和字节数组。 此外,默认的 AmqpHeaderMapper 是一个 DefaultAmqpHeaderMapper.inboundMapper()。 可以将 messageConverter 选项设置为 null,以完全跳过转换(包括头部映射),并将接收到的 AMQP 消息作为要生成的 Spring 消息的有效载荷返回。spring-doc.cadn.net.cn

此外,AmqpClientMessageProducer 实现了 Pausable 契约,并委托给相应的 RabbitAmqpListenerContainer API。spring-doc.cadn.net.cn

AmqpClientMessageProducer.setBatchSize() > 1 时,此通道适配器以批处理模式工作。 在这种情况下,接收到的消息会被收集,直到达到批次大小或 batchReceiveTimeout 周期耗尽。 所有批处理的 AMQP 消息随后被转换为 Spring 消息,并生成一个结果列表作为包装消息的有效载荷,发送到 outputChannel。 批处理模式通过一次性结算所有批处理的消息,带来了一定的性能提升。spring-doc.cadn.net.cn

autoSettle 标志设置为 false 时,AcknowledgmentCallback 实例将作为 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 消息头提供,用于对接收到的消息或整个批次进行结算决策。spring-doc.cadn.net.cn

以下示例演示如何将 AmqpClientMessageProducer 配置为简单的入站端点:spring-doc.cadn.net.cn

@Bean
IntegrationFlow receiveFlow(AmqpConnectionFactory connectionFactory) {
    return IntegrationFlow.from(AmqpClient.inboundChannelAdapter(connectionFactory, "q1"))
            .channel(c -> c.queue("receiveChannel"))
            .get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
        integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, "q1")) {
            channel("inputChannel")
        }
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
    integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, 'q1')) {
        channel 'inputChannel'
    }
}
@Bean
AmqpClientMessageProducer batchAmqpClientMessageProducer(AmqpConnectionFactory connectionFactory,
        QueueChannel inputChannel) {

    AmqpClientMessageProducer amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "q3");
    amqpClientMessageProducer.setOutputChannel(inputChannel);
    amqpClientMessageProducer.setBatchSize(2);
    return amqpClientMessageProducer;
}

AMQP 1.0 入站网关

The AmqpClientInboundGateway 是一个用于通过 RabbitMQ AMQP 1.0 协议接收请求并生成回复的 MessagingGatewaySupport 实现。 它与上述提到的 AmqpClientMessageProducer 类似,并共享许多 RabbitAmqpListenerContainer 配置选项。 此外,为了生成 AMQP 1.0 回复,AmqpClientInboundGateway 在内部使用了 RabbitAmqpTemplatespring-doc.cadn.net.cn

为了实现自动回复与请求的关联,必须在请求消息中提供 replyTo 属性。 例如,RabbitAmqpTemplate.sendAndReceive() 依赖于 RabbitMQ AMQP 1.0 库中的 RpcClient,该库会生成一个独占且自动删除的队列。 或者,可以将回复地址设置为 replyExchange(以及可选的 replyRoutingKey)或 replyQueue(但不可同时设置两者),这些设置位于 AmqpClientInboundGateway 上,并委托给 RabbitAmqpTemplate 默认选项处理。 可以使用 messageIdcorrelationId 请求消息属性来关联回复。 如果缺失,RabbitAmqpTemplate.sendAndReceive() 中的 RpcClient 将自动生成一个。 AmqpClientInboundGateway 能够将此类关联键映射回回复消息。spring-doc.cadn.net.cn

以下示例演示如何将 AmqpClientInboundGateway 配置为简单的入站网关:spring-doc.cadn.net.cn

@Bean
IntegrationFlow amqpClientInboundGatewayFlow(AmqpConnectionFactory connectionFactory) {
    return IntegrationFlow.from(AmqpClient.inboundGateway(connectionFactory, "requestReply"))
            .channel(c -> c.queue("inputChannel"))
            .get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
        integrationFlow(AmqpClient.inboundGateway(connectionFactory, "requestReply")) {
            channel { queue("inputChannel") }
        }
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
    integrationFlow(AmqpClient.inboundGateway(connectionFactory, 'requestReply')) {
        channel { queue 'inputChannel' }
    }
}
@Bean
AmqpClientInboundGateway amqpClientInboundGateway(AmqpConnectionFactory connectionFactory) {
    AmqpClientInboundGateway amqpClientInboundGateway = new AmqpClientInboundGateway(connectionFactory, "requestReply");
    amqpClientInboundGateway.setRequestChannelName("inputChannel");
    return amqpClientInboundGateway;
}