对于最新稳定版本,请使用 Spring Integration 7.0.0spring-doc.cadn.net.cn

MQTT 支持

Spring Integration 提供进站和出站通道适配器,支持消息队列遥测传输(MQTT)协议。spring-doc.cadn.net.cn

你需要把这种依赖性纳入你的项目中:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>6.2.11</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:6.2.11"
XML 配置以及本章大部分内容涉及 MQTT v3.1 协议支持及相应的 Paho 客户端。 有关相应协议支持,请参见MQTT v5支持段落。

两个适配器的配置均通过以下方式实现DefaultMqttPahoClientFactory. 有关配置选项的更多信息,请参阅Paho文档。spring-doc.cadn.net.cn

我们建议配置一个MqttConnectOptions并将其注入到工厂,而不是在工厂本身设置(已弃用的)选项。

入站(消息驱动)通道适配器

入站通道适配器由MqttPahoMessageDrivenChannelAdapter. 为了方便起见,你可以用命名空间来配置。 最小配置可能如下:spring-doc.cadn.net.cn

<bean id="clientFactory"
        class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="connectionOptions">
        <bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
            <property name="userName" value="${mqtt.username}"/>
            <property name="password" value="${mqtt.password}"/>
        </bean>
    </property>
</bean>

<int-mqtt:message-driven-channel-adapter id="mqttInbound"
    client-id="${mqtt.default.client.id}.src"
    url="${mqtt.url}"
    topics="sometopic"
    client-factory="clientFactory"
    channel="output"/>

以下列表展示了可用的属性:spring-doc.cadn.net.cn

<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    topics="bar,baz"  (3)
    qos="1,2"  (4)
    converter="myConverter"  (5)
    client-factory="clientFactory"  (6)
    send-timeout="123"  (7)
    error-channel="errors"  (8)
    recovery-interval="10000"  (9)
    manual-acks="false" (10)
    channel="out" />
1 客户ID。
2 经纪商网址。
3 一个逗号分隔的主题列表,该适配器接收消息来源。
4 一个逗号分隔的QoS值列表。 它可以是对所有主题应用的单一值,也可以是每个主题的值(此时列表长度必须相同)。
5 MqttMessage转换器(可选)。 默认情况下,默认默认PahoMessage转换器生成一个消息,满足字符串有效载荷头部如下:
6 客户工厂。
7 发送()超时。 它仅适用于通道可能阻塞(例如有界通道)队列通道目前已满)。
8 错误通道。 如果有提供,下游异常会发送到该信道,在错误消息. 有效载荷为消息异常其中包含失败的信息和原因。
9 恢复间歇。 它控制适配器在故障后尝试重新连接的间隔。 它默认为10000毫秒(十秒)
10 确认模式;手动确认时设置为真。
从4.1版本开始,你可以省略URL。 相反,你可以在服务器URI的属性DefaultMqttPahoClientFactory. 这样做可以实现例如连接到高可用性(HA)集群。

从版本4.2.2开始,一个MqttSubscribedEvent当适配器成功订阅主题时发布。MqttConnectionFailedEvent当连接或订阅失败时,事件会被发布。 这些事件可以被实现ApplicationListener.spring-doc.cadn.net.cn

还有一个新作品,叫做恢复间隔控制适配器在故障后尝试重新连接的间隔。 它默认为10000毫秒(十秒)spring-doc.cadn.net.cn

在4.2.3版本之前,客户端在适配器停止时总是取消订阅。 这是错误的,因为如果客户端QOS大于0,我们需要保持订阅有效,以便消息能够到达 当适配器停止时,会在下一次启动时交付。 这还需要设置清洁会谈客户工厂的财产为false. 它默认为true.spring-doc.cadn.net.cn

从4.2.3版本开始,适配器默认不会在清洁会谈财产是false.spring-doc.cadn.net.cn

通过设置 来覆盖该行为consumerCloseAction工厂的财产。 它可以有以下数值:UNSUBSCRIBE_ALWAYS,UNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEAN. 后者(默认)只有在清洁会谈财产是true.spring-doc.cadn.net.cn

要恢复到 4.2.3 之前的行为,请UNSUBSCRIBE_ALWAYS.spring-doc.cadn.net.cn

从5.0版本开始,主题,QoS(服务质量)保留属性映射为.收到_。。。头部(MqttHeaders.RECEIVED_TOPIC,MqttHeaders.RECEIVED_QOSMqttHeaders.RECEIVED_RETAINED),以避免无意中传播到(默认情况下)使用MqttHeaders.TOPIC,MqttHeaders.QOSMqttHeaders。保留头。spring-doc.cadn.net.cn

运行时添加和删除主题

从4.1版本开始,你可以通过程序方式更改适配器订阅的主题。 Spring 集成提供了addTopic()removeTopic()方法。 添加主题时,你可以选择性地指定服务质量(默认:1)。 你也可以通过发送适当的消息给<控制-总线/>配备合适的有效载荷——例如:“myMqttAdapter.addTopic('foo', 1)”.spring-doc.cadn.net.cn

停止和启动适配器不会影响主题列表(配置中不会恢复到原始设置)。 这些更改不会被保留到应用上下文生命周期之外。 新的应用上下文会恢复到已配置的设置。spring-doc.cadn.net.cn

在适配器停止(或与中介断开连接)时更改主题,将在下一次连接建立时生效。spring-doc.cadn.net.cn

手动加速

从5.3版本开始,你可以设置manualAcks财产变真。 通常用于异步确认送达。 当设置为true, 头部 (IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK)被添加到消息中,其值为简单确认. 你必须引用确认()完成送达的方法。 请参见 JavadocsIMqttClient setManualAcks()messageArrivedComplete()更多信息请见。 为方便起见,提供了一个头部访问器:spring-doc.cadn.net.cn

StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();

从版本开始5.2.11当消息转换器抛出异常或返回时,来自MqttMessage转换,该MqttPahoMessageDrivenChannelAdapter发送一个错误消息进入errorChannel,如果提供了。 将该转换错误重新抛入MQTT客户端回调。spring-doc.cadn.net.cn

使用 Java 配置配置

以下 Spring Boot 应用程序展示了如何用 Java 配置配置入站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 配置

以下 Spring Boot 应用程序示例了如何用 Java DSL 配置入站适配器:spring-doc.cadn.net.cn

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlow.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
                                        "testClient", "topic1", "topic2"))
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}

出站通道适配器

出站通道适配器由MqttPahoMessageHandler,它被包裹在一个消费者端点. 为了方便起见,你可以用命名空间来配置。spring-doc.cadn.net.cn

从4.1版本开始,适配器支持异步发送作,避免在确认送达前阻塞。 你可以发送应用事件,使应用程序能够确认交付。spring-doc.cadn.net.cn

以下列表展示了出站信道适配器可用的属性:spring-doc.cadn.net.cn

<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    converter="myConverter"  (3)
    client-factory="clientFactory"  (4)
    default-qos="1"  (5)
    qos-expression="" (6)
    default-retained="true"  (7)
    retained-expression="" (8)
    default-topic="bar"  (9)
    topic-expression="" (10)
    async="false"  (11)
    async-events="false"  (12)
    channel="target" />
1 客户ID。
2 经纪商网址。
3 MqttMessage转换器(可选)。 默认默认PahoMessage转换器识别以下头部:
4 客户工厂。
5 默认的服务质量。 如果不存在,则使用该词mqtt_qos查找头部或Qos表达式返回. 如果你提供定制,则不会被使用转炉.
6 一个用于计算qos的表达式。 默认为头[mqtt_qos].
7 保留旗帜的默认值。 如果不存在,则使用该词mqtt_retained找到了头部。 如果是自定义,则不使用转炉是提供的。
8 用于计算保留布尔值的表达式。 默认为头[mqtt_retained].
9 发送消息的默认主题(如果不是,则使用mqtt_topic找到了头部)。
10 一个用于评估以确定目标主题的表达式。 默认为标题['mqtt_topic'].
11 什么时候true,来电者不会被阻挡。 相反,它会等待发送消息时收到送达确认。 默认为false(发送会阻塞直到确认送达)。
12 什么时候异步异步事件两者皆为trueMqttMessageSentEvent被发射(见事件)。 它包含了信息、话题、以及messageId(信息ID由客户端库生成,客户标识,以及clientInstance(每次客户端连接时,数据会递增)。 当交付被客户端库确认时,MqttMessageDeliveredEvent发射。 它包含messageId(信息ID客户标识,以及clientInstance使得传递与发送(). 任何ApplicationListener或者事件入站通道适配器可以接收这些事件。 注意,对于MqttMessageDeliveredEventMqttMessageSentEvent. 默认为false.
从4.1版本开始,URL可以省略。 相反,服务器URI可以提供在服务器URI的属性DefaultMqttPahoClientFactory. 例如,这使得连接到高可用性(HA)集群成为可能。

使用 Java 配置配置

以下 Spring Boot 应用程序展示了如何用 Java 配置配置出站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

使用 Java DSL 配置

以下 Spring Boot 应用程序示例,展示了如何用 Java DSL 配置出站适配器:spring-doc.cadn.net.cn

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

   	@Bean
   	public IntegrationFlow mqttOutboundFlow() {
   	    return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

}

事件

某些应用事件由适配器发布。spring-doc.cadn.net.cn

  • MqttConnectionFailedEvent- 如果连接失败或连接中断,两个适配器都会发布。 对于MQTT v5 Paho客户端,当服务器执行正常断开连接时也会触发该事件,此时原因失去连接的关系为.spring-doc.cadn.net.cn

  • MqttMessageSentEvent- 如果以异步模式运行,则由出站适配器在发送消息时发布。spring-doc.cadn.net.cn

  • MqttMessageDeliveredEvent- 当客户端表示消息已送达(且以异步模式运行)时,由出站适配器发布。spring-doc.cadn.net.cn

  • MqttSubscribedEvent- 由订阅主题后由入站适配器发布。spring-doc.cadn.net.cn

这些事件可以通过以下方式接收ApplicationListener<MqttIntegrationEvent>或者@EventListener方法。spring-doc.cadn.net.cn

要确定事件的来源,请使用以下方法:你可以检查BEN名称和/或连接选项(访问服务器URI等)。spring-doc.cadn.net.cn

MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();

MQTT v5 支持

从版本5.5.5开始,Spring积分 MQTT模块为MQTT v5协议提供了通道适配器实现。 这org.eclipse.paho:org.eclipse.paho.mqttv5.client自选依赖性,因此必须明确包含在目标项目中。spring-doc.cadn.net.cn

由于MQTT v5协议支持MQTT消息中的额外任意属性,MqttHeaderMapper已引入实现发布和接收作中映射至/从标题的映射。 默认情况下(通过模式)它映射所有接收到的*发布框架属性(包括用户属性)。 在出站端,它映射了该子集的头部发布框架:内容类型,mqtt_messageExpiryInterval,mqtt_responseTopic,mqtt_correlationData.spring-doc.cadn.net.cn

MQTT v5协议的出站通道适配器以Mqttv5PahoMessageHandler. 它需要客户标识以及MQTT代理URL或MqttConnectionOptions参考。 它支持一个MqttClientPersistence选项,可以是异步并且可以发射MqttIntegrationEvent在这种情况下,对象(参见异步事件选项)。 如果请求消息载荷是org.eclipse.paho.mqttv5.common.MqttMessage,它通过内部IMqttAsyncClient. 如果有效载荷为字节[]它对目标直接使用。MqttMessage发布有效载荷。 如果有效载荷是字符串它被转换为字节[]出版。 其余的使用场景则委托给提供的消息转换器IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME 可配置复合消息转换器应用上下文中的豆子。 注:已提供HeaderMapper<MqttProperties>当请求的消息载荷已经是MqttMessage. 以下Java DSL配置示例演示了如何在集成流程中使用该通道适配器:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow mqttOutFlow() {
    Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
    MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
    mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
    messageHandler.setHeaderMapper(mqttHeaderMapper);
    messageHandler.setAsync(true);
    messageHandler.setAsyncEvents(true);
    messageHandler.setConverter(mqttStringToBytesConverter());

    return f -> f.handle(messageHandler);
}
org.springframework.integration.mqtt.support.MqttMessageConverter不能与Mqttv5PahoMessageHandler因为其合同仅针对MQTT v3协议。

如果连接在启动时或运行时失败,Mqttv5PahoMessageHandler在下一条发送给该处理器的消息时尝试重新连接。 如果这次手动重新连接失败,连接例外会被抛回给呼叫者。 此时采用标准的Spring Integration错误处理程序,包括请求处理程序建议,例如重试或断路器。spring-doc.cadn.net.cn

更多信息请参见Mqttv5PahoMessageHandlerJavadocs 及其超类。spring-doc.cadn.net.cn

MQTT v5协议的入站通道适配器以mqttv5PahoMessageDrivenChannelAdapter. 它需要客户标识以及MQTT代理URL或MqttConnectionOptions参考资料,以及可供订阅和消费的主题。 它支持一个MqttClientPersistence选项,默认是内存中的。 预期有效载荷类型 (字节[]默认情况下,可以配置,并传播到所提供的智能消息转换器用于从字节[]在接受的MqttMessage. 如果manualAck选项被设置,然后IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK将头添加到消息中,作为 的实例生成简单确认. 这HeaderMapper<MqttProperties>用于映射发布将框架属性(包括用户属性)嵌入目标消息头部。 标准MqttMessage性质,例如QoS(服务质量),身份证,DUP,保留,加上接收的主题总是映射到报头。 看MqttHeaders更多信息请见。spring-doc.cadn.net.cn

以下Java DSL配置示例演示了如何在集成流程中使用该通道适配器:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow mqttInFlow() {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
    messageProducer.setPayloadType(String.class);
    messageProducer.setMessageConverter(mqttStringToBytesConverter());
    messageProducer.setManualAcks(true);

    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}
org.springframework.integration.mqtt.support.MqttMessageConverter不能与mqttv5PahoMessageDrivenChannelAdapter因为其合同仅针对MQTT v3协议。

更多信息请参见mqttv5PahoMessageDrivenChannelAdapterJavadocs 及其超类。spring-doc.cadn.net.cn

建议拥有MqttConnectionOptions#setAutomaticReconnect(boolean)设为真以允许内部IMqttAsyncClient实例来处理重连。 否则,只有手动重启mqttv5PahoMessageDrivenChannelAdapter可以处理重连,例如通过MqttConnectionFailedEvent断开连接时处理。

共享MQTT客户端支持

如果多个集成都需要单个MQTT ClientID,则无法同时使用多个MQTT客户端实例,因为MQTT代理可能对每个ClientID的连接数量有限制(通常允许单个连接)。 对于让单个客户端被重复用于不同的通道适配器,一个org.springframework.integration.mqtt.core.ClientManager组件可以被使用并传递给任何需要的通道适配器。 它会管理MQTT连接生命周期,并在需要时自动重新连接。 另外,还有自定义连接选项和MqttClientPersistence可以像目前通道适配器组件一样提供给客户端管理器。spring-doc.cadn.net.cn

请注意,支持MQTT v5和v3通道适配器。spring-doc.cadn.net.cn

以下Java DSL配置示例演示如何在集成流程中使用该客户端管理器:spring-doc.cadn.net.cn

@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
    MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
    connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
    connectionOptions.setConnectionTimeout(30000);
    connectionOptions.setMaxReconnectDelay(1000);
    connectionOptions.setAutomaticReconnect(true);
    Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
    clientManager.setPersistence(new MqttDefaultFilePersistence());
    return clientManager;
}

@Bean
public IntegrationFlow mqttInFlowTopic1(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttInFlowTopic2(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttOutFlow(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}