如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

严格消息排序

本节描述传入和传出消息的消息排序。spring-doc.cadn.net.cn

传入

如果您需要严格排序传入的消息,必须将传入监听器容器的 prefetchCount 属性配置为 1。 这是因为,如果消息失败并重新投递,它将在现有的预取消息之后到达。 自 Spring AMQP 2.0 版本起,prefetchCount 的默认值已设置为 250,以提升性能。 严格排序要求会以降低性能为代价。spring-doc.cadn.net.cn

出站

考虑以下集成流程:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(","))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

假设我们向网关发送消息 ABC。 虽然消息 ABC 很可能按顺序发送,但无法保证这一点。 这是因为模板会为每次发送操作从缓存中“借用”一个通道,且无法保证每条消息都使用相同的通道。 一种解决方案是在拆分器之前启动事务,但在 RabbitMQ 中事务成本较高,可能会使性能降低数百倍。spring-doc.cadn.net.cn

为了更高效地解决此问题,从 5.1 版本开始,Spring Integration 提供了 BoundRabbitChannelAdvice,它是一个 HandleMessageAdvice。 请参阅 处理消息建议。 当在拆分器之前应用时,它确保所有下游操作都在同一通道上执行,并且(可选)可以等待直到接收到所有已发送消息的发布者确认(如果连接工厂配置为支持确认)。 以下示例展示了如何使用 BoundRabbitChannelAdvicespring-doc.cadn.net.cn

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(",")
                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

请注意,相同的 RabbitTemplate(实现了 RabbitOperations)在通知和出站适配器中被使用。 该通知在模板的 invoke 方法内运行下游流程,以便所有操作都在同一通道上执行。 如果提供了可选的超时时间,当流程完成后,通知将调用 waitForConfirmsOrDie 方法;如果在指定时间内未收到确认,该方法将抛出异常。spring-doc.cadn.net.cn

下游流程中不得出现线程移交(QueueChannelExecutorChannel 及其他情况)。