如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

MQTT 支持

Spring Integration 提供了入站和出站通道适配器,以支持消息队列遥测传输(MQTT)协议。spring-doc.cadn.net.cn

您需要将以下依赖项包含到您的项目中:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>6.4.10</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:6.4.10"

当前实现使用了 Eclipse Paho MQTT Client 库。spring-doc.cadn.net.cn

本章节的 XML 配置及大部分内容均关于 MQTT v3.1 协议支持及其对应的 Paho Client。 有关相应协议支持的详情,请参阅 MQTT v5 支持 段落。

Both adapters' configuration is achieved using the DefaultMqttPahoClientFactory. Refer to the Paho documentation for more information about configuration options.spring-doc.cadn.net.cn

我们建议配置一个 MqttConnectOptions 对象并将其注入到工厂中,而不是在工厂本身上设置(已弃用)的选项。

入站(消息驱动)通道适配器

入站通道适配器由MqttPahoMessageDrivenChannelAdapter实现。 为方便起见,您可以使用命名空间对其进行配置。 最小配置可能如下所示:spring-doc.cadn.net.cn

<bean id="clientFactory"
        class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="connectionOptions">
        <bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
            <property name="userName" value="${mqtt.username}"/>
            <property name="password" value="${mqtt.password}"/>
        </bean>
    </property>
</bean>

<int-mqtt:message-driven-channel-adapter id="mqttInbound"
    client-id="${mqtt.default.client.id}.src"
    url="${mqtt.url}"
    topics="sometopic"
    client-factory="clientFactory"
    channel="output"/>

以下列表显示了可用的属性:spring-doc.cadn.net.cn

<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    topics="bar,baz"  (3)
    qos="1,2"  (4)
    converter="myConverter"  (5)
    client-factory="clientFactory"  (6)
    send-timeout="123"  (7)
    error-channel="errors"  (8)
    recovery-interval="10000"  (9)
    manual-acks="false" (10)
    channel="out" />
1 客户端 ID。
2 代理服务器URL。
3 接收消息的主题的逗号分隔列表。
4 QoS 值的逗号分隔列表。 它可以是应用于所有主题的单值,也可以是每个主题的对应值(此时两个列表的长度必须相同)。
5 An MqttMessageConverter (optional)。 默认情况下,DefaultPahoMessageConverter 会生成一个带有以下标题的 String 负载的消息:
  • mqtt_topic:接收该消息的主题spring-doc.cadn.net.cn

  • mqtt_duplicate:如果消息是重复的,则为 truespring-doc.cadn.net.cn

  • mqtt_qos:服务质量 您可以通过将其声明为 <bean/> 并将 payloadAsBytes 属性设置为 true,来配置 DefaultPahoMessageConverter 以在负载中返回原始 byte[]spring-doc.cadn.net.cn

6 客户端工厂。
7 The send() 超时。 它仅适用于通道可能会阻塞的情况(例如,当前已满的有界 QueueChannel)。
8 错误通道。 下游异常会被发送到此通道(如果已提供),在 ErrorMessage 中。 负载是一个包含失败消息和原因的 MessagingException
9 恢复间隔。 它控制适配器在失败后尝试重新连接的间隔时间。 默认值为 10000ms(十秒)。
10 确认模式;设置为 true 以启用手动确认。
从 4.1 版本开始,您可以省略 URL。 相反,您可以在 serverURIs 属性的 DefaultMqttPahoClientFactory 中提供服务器 URI。 这样做可以实现例如连接到高可用(HA)集群的功能。

从版本 4.2.2 开始,当适配器成功订阅主题时,会发布一个MqttSubscribedEvent。 当连接或订阅失败时,会发布MqttConnectionFailedEvent个事件。 这些事件可以由实现ApplicationListener的 bean 接收。spring-doc.cadn.net.cn

此外,一个名为 recoveryInterval 的新属性控制适配器在失败后尝试重新连接的间隔时间。 其默认值为 10000ms(十秒)。spring-doc.cadn.net.cn

在 4.2.3 版本之前,当适配器停止时,客户端总是取消订阅。 这是不正确的,因为如果客户端 QoS 大于 0,我们需要保持订阅活动,以便在适配器停止期间到达的消息能在下次启动时交付。 这也需要将客户端工厂的 cleanSession 属性设置为 false。 其默认值为 truespring-doc.cadn.net.cn

从版本 4.2.3 开始,如果 cleanSession 属性的值为 false,则适配器默认不会取消订阅。spring-doc.cadn.net.cn

此行为可通过在工厂上设置 consumerCloseAction 属性来覆盖。 它可以具有以下值:UNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEAN。 后者(默认值)仅在 cleanSession 属性为 true 时取消订阅。spring-doc.cadn.net.cn

若要恢复到4.2.3之前的行为,请使用UNSUBSCRIBE_ALWAYSspring-doc.cadn.net.cn

从版本 5.0 开始,topicqosretained属性被映射到.RECEIVED_…​个响应头(MqttHeaders.RECEIVED_TOPICMqttHeaders.RECEIVED_QOSMqttHeaders.RECEIVED_RETAINED),以避免无意中将它们传播到出站消息中(默认情况下,出站消息使用MqttHeaders.TOPICMqttHeaders.QOSMqttHeaders.RETAINED响应头)。spring-doc.cadn.net.cn

在运行时添加和移除主题

从版本 4.1 开始,您可以编程方式更改适配器订阅的主题。 Spring Integration 提供了 addTopic()removeTopic() 方法。 添加主题时,您可以选择指定 QoS(默认值:1)。 您还可以通过向 <control-bus/> 发送带有适当负载的相应消息来修改主题,例如:"myMqttAdapter.addTopic('foo', 1)"spring-doc.cadn.net.cn

停止和启动适配器不会影响主题列表(它不会恢复到配置中的原始设置)。 这些更改在应用程序上下文的生命周期之外不会被保留。 新的应用程序上下文将恢复到配置的设置。spring-doc.cadn.net.cn

在适配器停止(或从代理断开连接)时更改主题,将在下次建立连接时生效。spring-doc.cadn.net.cn

手动确认

从版本 5.3 开始,您可以将 manualAcks 属性设置为 true。 通常用于异步确认交付。 当设置为 true 时,会将头信息 (IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK) 添加到消息中,其值为一个 SimpleAcknowledgment。 您必须调用 acknowledge() 方法以完成交付。 有关更多信息,请参阅 IMqttClientsetManualAcks()messageArrivedComplete() 的 Javadocs。 为方便起见,提供了一个头信息访问器:spring-doc.cadn.net.cn

StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();

从版本 5.2.11 开始,当消息转换器抛出异常或在 MqttMessage 转换中返回 null 时,MqttPahoMessageDrivenChannelAdapter 会将 ErrorMessage 发送到 errorChannel(如果已提供)。 否则,此转换错误将被重新抛出到 MQTT 客户端回调中。spring-doc.cadn.net.cn

使用 Java 配置进行配置

以下 Spring Boot 应用示例展示了如何使用 Java 配置来配置入站适配器:spring-doc.cadn.net.cn

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 进行配置

以下 Spring Boot 应用提供了使用 Java DSL 配置入站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlow.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
                                        "testClient", "topic1", "topic2"))
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}

出站通道适配器

出站通道适配器由 MqttPahoMessageHandler 实现,该适配器被封装在 ConsumerEndpoint 中。 为便于配置,您可以使用命名空间对其进行配置。spring-doc.cadn.net.cn

从版本 4.1 开始,适配器支持异步发送操作,无需等待交付确认即可继续执行。 您可以发射应用程序事件,以便在需要时让应用程序确认交付。spring-doc.cadn.net.cn

以下清单显示了出站通道适配器可用的属性:spring-doc.cadn.net.cn

<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    converter="myConverter"  (3)
    client-factory="clientFactory"  (4)
    default-qos="1"  (5)
    qos-expression="" (6)
    default-retained="true"  (7)
    retained-expression="" (8)
    default-topic="bar"  (9)
    topic-expression="" (10)
    async="false"  (11)
    async-events="false"  (12)
    channel="target" />
1 客户端 ID。
2 代理服务器URL。
3 An MqttMessageConverter (可选)。 默认值 DefaultPahoMessageConverter 识别以下标头:
4 客户端工厂。
5 默认的服务质量。 如果未找到mqtt_qos标头或qos-expression返回null,则使用此值。 如果您提供了自定义converter,则不使用此值。
6 用于评估以确定 qos 的表达式。 默认值为 headers[mqtt_qos]
7 保留标志的默认值。 如果未找到 mqtt_retained 标头,则使用此值。 如果提供了自定义 converter,则不使用此值。
8 用于评估以确定保留的布尔值的表达式。 默认值为 headers[mqtt_retained]
9 发送消息的默认主题(如果未找到 mqtt_topic 标头时使用)。
10 用于确定目标主题的表达式。 默认值为 headers['mqtt_topic']
11 当值为true时,调用方不会阻塞。 相反,在发送消息时会等待交付确认。 默认值为false(发送操作会阻塞直到交付确认)。
12 asyncasync-events 均为 true 时,会发出一个 MqttMessageSentEvent(参见 事件)。 它包含消息、主题、由客户端库生成的 messageIdclientId 以及 clientInstance(每次客户端连接时递增)。 当客户端库确认交付后,会发出一个 MqttMessageDeliveredEvent。 它包含 messageIdclientIdclientInstance,使得交付能够与 send() 进行关联。 任何 ApplicationListener 或入站通道适配器均可接收这些事件。 请注意,MqttMessageDeliveredEvent 可能在 MqttMessageSentEvent 之前被接收。 默认值为 false
从版本 4.1 开始,URL 可以省略。 取而代之的是,可以在 serverURIs 属性的 DefaultMqttPahoClientFactory 中提供服务器 URI。 这使得例如连接到高可用(HA)集群成为可能。

使用 Java 配置进行配置

以下 Spring Boot 应用程序展示了如何使用 Java 配置来配置出站适配器:spring-doc.cadn.net.cn

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序提供了一个示例,展示如何使用 Java DSL 配置出站适配器:spring-doc.cadn.net.cn

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

       @Bean
       public IntegrationFlow mqttOutboundFlow() {
           return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

}

事件

某些应用程序事件由适配器发布。spring-doc.cadn.net.cn

  • MqttConnectionFailedEvent - 当连接失败或后续连接丢失时,两个适配器都会发布此事件。 对于 MQTT v5 Paho 客户端,当服务器执行正常断开连接时也会发出此事件,此时丢失连接的 causenullspring-doc.cadn.net.cn

  • MqttMessageSentEvent - 当消息已发送时,如果以异步模式运行,则由出站适配器发布。spring-doc.cadn.net.cn

  • MqttMessageDeliveredEvent - 当客户端指示消息已送达时,若以异步模式运行,则由出站适配器发布。spring-doc.cadn.net.cn

  • MqttMessageNotDeliveredEvent - 当客户端指示消息未成功传递时,如果运行在异步模式下,由出站适配器发布。spring-doc.cadn.net.cn

  • MqttSubscribedEvent - 由入站适配器在订阅主题后发布。spring-doc.cadn.net.cn

这些事件可以通过ApplicationListener<MqttIntegrationEvent>或带有@EventListener的方法接收。spring-doc.cadn.net.cn

要确定事件的来源,请使用以下方法:您可以检查 bean 名称和/或连接选项(以访问服务器 URI 等)。spring-doc.cadn.net.cn

MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();

MQTT v5 支持

从版本 5.5.5 开始,spring-integration-mqtt模块提供了针对 MQTT v5 协议的通道适配器实现。 org.eclipse.paho:org.eclipse.paho.mqttv5.client是一个optional依赖项,因此必须在目标项目中显式包含。spring-doc.cadn.net.cn

由于 MQTT v5 协议支持在 MQTT 消息中包含额外的任意属性,因此引入了 MqttHeaderMapper 实现以在发布和接收操作期间映射/反映射到标头。 默认情况下(通过 * 模式),它将所有接收到的 PUBLISH 帧属性(包括用户属性)进行映射。 在出站侧,它为 PUBLISH 帧映射此标头子集:contentTypemqtt_messageExpiryIntervalmqtt_responseTopicmqtt_correlationDataspring-doc.cadn.net.cn

MQTT v5 协议的出站通道适配器表现为一个 Mqttv5PahoMessageHandler。 它需要一个 clientId 以及 MQTT 代理 URL 或 MqttConnectionOptions 引用。 它支持 MqttClientPersistence 选项,可以是 async,并且在相应情况下可以发射 MqttIntegrationEvent 对象(参见 asyncEvents 选项)。 如果请求消息的负载是 org.eclipse.paho.mqttv5.common.MqttMessage,则通过内部 IMqttAsyncClient 直接发布。 如果负载是 byte[],则直接用作目标 MqttMessage 负载进行发布。 如果负载是一个 String,则将其转换为 byte[] 后进行发布。 其余用例委托给提供的 MessageConverter,该对象是应用程序上下文中的一个 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME ConfigurableCompositeMessageConverter Bean。 注意:当请求的消息负载已经是 MqttMessage 时,提供的 HeaderMapper<MqttProperties> 不会被使用。 以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow mqttOutFlow() {
    Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
    MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
    mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
    messageHandler.setHeaderMapper(mqttHeaderMapper);
    messageHandler.setAsync(true);
    messageHandler.setAsyncEvents(true);
    messageHandler.setConverter(mqttStringToBytesConverter());

    return f -> f.handle(messageHandler);
}
The org.springframework.integration.mqtt.support.MqttMessageConverter cannot be used with the Mqttv5PahoMessageHandler since its contract is aimed only for the MQTT v3 protocol.

如果在启动时或运行时连接失败,Mqttv5PahoMessageHandler将尝试在下一个消息发送到该处理器时重新连接。 如果此手动重连失败,连接异常将被抛回给调用者。 在这种情况下,将应用标准的 Spring Integration 错误处理流程,包括请求处理器建议(例如重试或熔断器)。spring-doc.cadn.net.cn

Mqttv5PahoMessageHandler javadocs 及其父类中查看更多详细信息。spring-doc.cadn.net.cn

MQTT v5 协议的入站通道适配器表示为 Mqttv5PahoMessageDrivenChannelAdapter。 它需要一个 clientId 以及 MQTT 代理 URL 或 MqttConnectionOptions 引用,加上要订阅和消费的主题。 它支持一个 MqttClientPersistence 选项,默认情况下是内存中的。 预期的 payloadType(默认为 byte[])可以配置,并且会传播到提供的 SmartMessageConverter,用于将接收到的 MqttMessagebyte[] 进行转换。 如果设置了 manualAck 选项,则会在消息中添加一个 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 头,以便作为 SimpleAcknowledgment 的实例生成。 HeaderMapper<MqttProperties> 用于将 PUBLISH 帧属性(包括用户属性)映射到目标消息头中。 标准的 MqttMessage 属性,如 qosiddupretained,以及接收到的主题,始终会被映射到头中。 有关更多信息,请参阅 MqttHeadersspring-doc.cadn.net.cn

从 6.3 版本开始,Mqttv5PahoMessageDrivenChannelAdapter 提供了基于 MqttSubscription 的构造函数,用于细粒度配置,而不是简单的主题名称。 当提供这些订阅时,通道适配器的 qos 选项将无法使用,因为这种 qos 模式是 MqttSubscription API 的一部分。spring-doc.cadn.net.cn

以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow mqttInFlow() {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
    messageProducer.setPayloadType(String.class);
    messageProducer.setMessageConverter(mqttStringToBytesConverter());
    messageProducer.setManualAcks(true);

    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}
The org.springframework.integration.mqtt.support.MqttMessageConverter cannot be used with the Mqttv5PahoMessageDrivenChannelAdapter since its contract is aimed only for the MQTT v3 protocol.

Mqttv5PahoMessageDrivenChannelAdapter javadocs 及其父类中查看更多详细信息。spring-doc.cadn.net.cn

建议将 MqttConnectionOptions#setAutomaticReconnect(boolean) 设置为 true,以便让内部的 IMqttAsyncClient 实例处理重连。 否则,只有手动重启 Mqttv5PahoMessageDrivenChannelAdapter 才能处理重连,例如通过 MqttConnectionFailedEvent 在断开连接时进行处理。

共享 MQTT 客户端支持

如果多个集成需要单个 MQTT ClientID,则不能使用多个 MQTT 客户端实例,因为 MQTT 代理可能对每个 ClientID 的连接数有限制(通常只允许一个连接)。 为了在多个通道适配器中重用单个客户端,可以使用 org.springframework.integration.mqtt.core.ClientManager 组件并将其传递给任何需要的通道适配器。 它将管理 MQTT 连接的生命周期,并在需要时自动重连。 此外,也可以像当前为通道适配器组件所做的那样,向客户端管理器提供自定义连接选项和 MqttClientPersistencespring-doc.cadn.net.cn

请注意,同时支持 MQTT v5 和 v3 通道适配器。spring-doc.cadn.net.cn

以下 Java DSL 配置示例演示了如何在集成流中使用此客户端管理器:spring-doc.cadn.net.cn

@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
    MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
    connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
    connectionOptions.setConnectionTimeout(30000);
    connectionOptions.setMaxReconnectDelay(1000);
    connectionOptions.setAutomaticReconnect(true);
    Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
    clientManager.setPersistence(new MqttDefaultFilePersistence());
    return clientManager;
}

@Bean
public IntegrationFlow mqttInFlowTopic1(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttInFlowTopic2(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttOutFlow(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}
从版本 6.4 开始,现在可以在运行时使用相应的 ClientManagerIntegrationFlowContext 添加多个 MqttPahoMessageDrivenChannelAdapterMqttv5PahoMessageDrivenChannelAdapter 实例。
private void addAddRuntimeAdapter(IntegrationFlowContext flowContext, Mqttv5ClientManager clientManager,
                                  String topic, MessageChannel channel) {
    flowContext
        .registration(
            IntegrationFlow
                .from(new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, topic))
                .channel(channel)
                .get())
        .register();
}