|
对于最新稳定版本,请使用 Spring Integration 7.0.0! |
MQTT 支持
Spring Integration 提供进站和出站通道适配器,支持消息队列遥测传输(MQTT)协议。
你需要把这种依赖性纳入你的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.2.11</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:6.2.11"
当前实现使用Eclipse Paho MQTT客户端库。
| XML 配置以及本章大部分内容涉及 MQTT v3.1 协议支持及相应的 Paho 客户端。 有关相应协议支持,请参见MQTT v5支持段落。 |
两个适配器的配置均通过以下方式实现DefaultMqttPahoClientFactory.
有关配置选项的更多信息,请参阅Paho文档。
我们建议配置一个MqttConnectOptions并将其注入到工厂,而不是在工厂本身设置(已弃用的)选项。 |
入站(消息驱动)通道适配器
入站通道适配器由MqttPahoMessageDrivenChannelAdapter.
为了方便起见,你可以用命名空间来配置。
最小配置可能如下:
<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="connectionOptions">
<bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
<property name="userName" value="${mqtt.username}"/>
<property name="password" value="${mqtt.password}"/>
</bean>
</property>
</bean>
<int-mqtt:message-driven-channel-adapter id="mqttInbound"
client-id="${mqtt.default.client.id}.src"
url="${mqtt.url}"
topics="sometopic"
client-factory="clientFactory"
channel="output"/>
以下列表展示了可用的属性:
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="foo" (1)
url="tcp://localhost:1883" (2)
topics="bar,baz" (3)
qos="1,2" (4)
converter="myConverter" (5)
client-factory="clientFactory" (6)
send-timeout="123" (7)
error-channel="errors" (8)
recovery-interval="10000" (9)
manual-acks="false" (10)
channel="out" />
| 1 | 客户ID。 |
| 2 | 经纪商网址。 |
| 3 | 一个逗号分隔的主题列表,该适配器接收消息来源。 |
| 4 | 一个逗号分隔的QoS值列表。 它可以是对所有主题应用的单一值,也可以是每个主题的值(此时列表长度必须相同)。 |
| 5 | 一MqttMessage转换器(可选)。
默认情况下,默认默认PahoMessage转换器生成一个消息,满足字符串有效载荷头部如下:
|
| 6 | 客户工厂。 |
| 7 | 这发送()超时。
它仅适用于通道可能阻塞(例如有界通道)队列通道目前已满)。 |
| 8 | 错误通道。
如果有提供,下游异常会发送到该信道,在错误消息.
有效载荷为消息异常其中包含失败的信息和原因。 |
| 9 | 恢复间歇。
它控制适配器在故障后尝试重新连接的间隔。
它默认为10000毫秒(十秒) |
| 10 | 确认模式;手动确认时设置为真。 |
从4.1版本开始,你可以省略URL。
相反,你可以在服务器URI的属性DefaultMqttPahoClientFactory.
这样做可以实现例如连接到高可用性(HA)集群。 |
从版本4.2.2开始,一个MqttSubscribedEvent当适配器成功订阅主题时发布。MqttConnectionFailedEvent当连接或订阅失败时,事件会被发布。
这些事件可以被实现ApplicationListener.
还有一个新作品,叫做恢复间隔控制适配器在故障后尝试重新连接的间隔。
它默认为10000毫秒(十秒)
|
在4.2.3版本之前,客户端在适配器停止时总是取消订阅。
这是错误的,因为如果客户端QOS大于0,我们需要保持订阅有效,以便消息能够到达
当适配器停止时,会在下一次启动时交付。
这还需要设置 从4.2.3版本开始,适配器默认不会在 通过设置 来覆盖该行为 要恢复到 4.2.3 之前的行为,请 |
|
从5.0版本开始, |
运行时添加和删除主题
从4.1版本开始,你可以通过程序方式更改适配器订阅的主题。
Spring 集成提供了addTopic()和removeTopic()方法。
添加主题时,你可以选择性地指定服务质量(默认:1)。
你也可以通过发送适当的消息给<控制-总线/>配备合适的有效载荷——例如:“myMqttAdapter.addTopic('foo', 1)”.
停止和启动适配器不会影响主题列表(配置中不会恢复到原始设置)。 这些更改不会被保留到应用上下文生命周期之外。 新的应用上下文会恢复到已配置的设置。
在适配器停止(或与中介断开连接)时更改主题,将在下一次连接建立时生效。
手动加速
从5.3版本开始,你可以设置manualAcks财产变真。
通常用于异步确认送达。
当设置为true, 头部 (IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK)被添加到消息中,其值为简单确认.
你必须引用确认()完成送达的方法。
请参见 JavadocsIMqttClient setManualAcks()和messageArrivedComplete()更多信息请见。
为方便起见,提供了一个头部访问器:
StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();
从版本开始5.2.11当消息转换器抛出异常或返回时,零来自MqttMessage转换,该MqttPahoMessageDrivenChannelAdapter发送一个错误消息进入errorChannel,如果提供了。
将该转换错误重新抛入MQTT客户端回调。
使用 Java 配置配置
以下 Spring Boot 应用程序展示了如何用 Java 配置配置入站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
"topic1", "topic2");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 配置
以下 Spring Boot 应用程序示例了如何用 Java DSL 配置入站适配器:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttInbound() {
return IntegrationFlow.from(
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
"testClient", "topic1", "topic2"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
出站通道适配器
出站通道适配器由MqttPahoMessageHandler,它被包裹在一个消费者端点.
为了方便起见,你可以用命名空间来配置。
从4.1版本开始,适配器支持异步发送作,避免在确认送达前阻塞。 你可以发送应用事件,使应用程序能够确认交付。
以下列表展示了出站信道适配器可用的属性:
<int-mqtt:outbound-channel-adapter id="withConverter"
client-id="foo" (1)
url="tcp://localhost:1883" (2)
converter="myConverter" (3)
client-factory="clientFactory" (4)
default-qos="1" (5)
qos-expression="" (6)
default-retained="true" (7)
retained-expression="" (8)
default-topic="bar" (9)
topic-expression="" (10)
async="false" (11)
async-events="false" (12)
channel="target" />
| 1 | 客户ID。 |
| 2 | 经纪商网址。 |
| 3 | 一MqttMessage转换器(可选)。
默认默认PahoMessage转换器识别以下头部:
|
| 4 | 客户工厂。 |
| 5 | 默认的服务质量。
如果不存在,则使用该词mqtt_qos查找头部或Qos表达式返回零.
如果你提供定制,则不会被使用转炉. |
| 6 | 一个用于计算qos的表达式。
默认为头[mqtt_qos]. |
| 7 | 保留旗帜的默认值。
如果不存在,则使用该词mqtt_retained找到了头部。
如果是自定义,则不使用转炉是提供的。 |
| 8 | 用于计算保留布尔值的表达式。
默认为头[mqtt_retained]. |
| 9 | 发送消息的默认主题(如果不是,则使用mqtt_topic找到了头部)。 |
| 10 | 一个用于评估以确定目标主题的表达式。
默认为标题['mqtt_topic']. |
| 11 | 什么时候true,来电者不会被阻挡。
相反,它会等待发送消息时收到送达确认。
默认为false(发送会阻塞直到确认送达)。 |
| 12 | 什么时候异步和异步事件两者皆为true一MqttMessageSentEvent被发射(见事件)。
它包含了信息、话题、以及messageId(信息ID由客户端库生成,客户标识,以及clientInstance(每次客户端连接时,数据会递增)。
当交付被客户端库确认时,MqttMessageDeliveredEvent发射。
它包含messageId(信息ID这客户标识,以及clientInstance使得传递与发送().
任何ApplicationListener或者事件入站通道适配器可以接收这些事件。
注意,对于MqttMessageDeliveredEvent在MqttMessageSentEvent.
默认为false. |
从4.1版本开始,URL可以省略。
相反,服务器URI可以提供在服务器URI的属性DefaultMqttPahoClientFactory.
例如,这使得连接到高可用性(HA)集群成为可能。 |
使用 Java 配置配置
以下 Spring Boot 应用程序展示了如何用 Java 配置配置出站适配器的示例:
@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToMqtt("foo");
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
options.setUserName("username");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}
}
使用 Java DSL 配置
以下 Spring Boot 应用程序示例,展示了如何用 Java DSL 配置出站适配器:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttOutboundFlow() {
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
}
}
事件
某些应用事件由适配器发布。
-
MqttConnectionFailedEvent- 如果连接失败或连接中断,两个适配器都会发布。 对于MQTT v5 Paho客户端,当服务器执行正常断开连接时也会触发该事件,此时原因失去连接的关系为零. -
MqttMessageSentEvent- 如果以异步模式运行,则由出站适配器在发送消息时发布。 -
MqttMessageDeliveredEvent- 当客户端表示消息已送达(且以异步模式运行)时,由出站适配器发布。 -
MqttSubscribedEvent- 由订阅主题后由入站适配器发布。
这些事件可以通过以下方式接收ApplicationListener<MqttIntegrationEvent>或者@EventListener方法。
要确定事件的来源,请使用以下方法:你可以检查BEN名称和/或连接选项(访问服务器URI等)。
MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();
MQTT v5 支持
从版本5.5.5开始,Spring积分 MQTT模块为MQTT v5协议提供了通道适配器实现。
这org.eclipse.paho:org.eclipse.paho.mqttv5.client是自选依赖性,因此必须明确包含在目标项目中。
由于MQTT v5协议支持MQTT消息中的额外任意属性,MqttHeaderMapper已引入实现发布和接收作中映射至/从标题的映射。
默认情况下(通过模式)它映射所有接收到的*发布框架属性(包括用户属性)。
在出站端,它映射了该子集的头部发布框架:内容类型,mqtt_messageExpiryInterval,mqtt_responseTopic,mqtt_correlationData.
MQTT v5协议的出站通道适配器以Mqttv5PahoMessageHandler.
它需要客户标识以及MQTT代理URL或MqttConnectionOptions参考。
它支持一个MqttClientPersistence选项,可以是异步并且可以发射MqttIntegrationEvent在这种情况下,对象(参见异步事件选项)。
如果请求消息载荷是org.eclipse.paho.mqttv5.common.MqttMessage,它通过内部IMqttAsyncClient.
如果有效载荷为字节[]它对目标直接使用。MqttMessage发布有效载荷。
如果有效载荷是字符串它被转换为字节[]出版。
其余的使用场景则委托给提供的消息转换器即IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME 可配置复合消息转换器应用上下文中的豆子。
注:已提供HeaderMapper<MqttProperties>当请求的消息载荷已经是MqttMessage.
以下Java DSL配置示例演示了如何在集成流程中使用该通道适配器:
@Bean
public IntegrationFlow mqttOutFlow() {
Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
messageHandler.setHeaderMapper(mqttHeaderMapper);
messageHandler.setAsync(true);
messageHandler.setAsyncEvents(true);
messageHandler.setConverter(mqttStringToBytesConverter());
return f -> f.handle(messageHandler);
}
这org.springframework.integration.mqtt.support.MqttMessageConverter不能与Mqttv5PahoMessageHandler因为其合同仅针对MQTT v3协议。 |
如果连接在启动时或运行时失败,Mqttv5PahoMessageHandler在下一条发送给该处理器的消息时尝试重新连接。
如果这次手动重新连接失败,连接例外会被抛回给呼叫者。
此时采用标准的Spring Integration错误处理程序,包括请求处理程序建议,例如重试或断路器。
更多信息请参见Mqttv5PahoMessageHandlerJavadocs 及其超类。
MQTT v5协议的入站通道适配器以mqttv5PahoMessageDrivenChannelAdapter.
它需要客户标识以及MQTT代理URL或MqttConnectionOptions参考资料,以及可供订阅和消费的主题。
它支持一个MqttClientPersistence选项,默认是内存中的。
预期有效载荷类型 (字节[]默认情况下,可以配置,并传播到所提供的智能消息转换器用于从字节[]在接受的MqttMessage.
如果manualAck选项被设置,然后IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK将头添加到消息中,作为 的实例生成简单确认.
这HeaderMapper<MqttProperties>用于映射发布将框架属性(包括用户属性)嵌入目标消息头部。
标准MqttMessage性质,例如QoS(服务质量),身份证,DUP,保留,加上接收的主题总是映射到报头。
看MqttHeaders更多信息请见。
以下Java DSL配置示例演示了如何在集成流程中使用该通道适配器:
@Bean
public IntegrationFlow mqttInFlow() {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
messageProducer.setPayloadType(String.class);
messageProducer.setMessageConverter(mqttStringToBytesConverter());
messageProducer.setManualAcks(true);
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
这org.springframework.integration.mqtt.support.MqttMessageConverter不能与mqttv5PahoMessageDrivenChannelAdapter因为其合同仅针对MQTT v3协议。 |
更多信息请参见mqttv5PahoMessageDrivenChannelAdapterJavadocs 及其超类。
建议拥有MqttConnectionOptions#setAutomaticReconnect(boolean)设为真以允许内部IMqttAsyncClient实例来处理重连。
否则,只有手动重启mqttv5PahoMessageDrivenChannelAdapter可以处理重连,例如通过MqttConnectionFailedEvent断开连接时处理。 |
共享MQTT客户端支持
如果多个集成都需要单个MQTT ClientID,则无法同时使用多个MQTT客户端实例,因为MQTT代理可能对每个ClientID的连接数量有限制(通常允许单个连接)。
对于让单个客户端被重复用于不同的通道适配器,一个org.springframework.integration.mqtt.core.ClientManager组件可以被使用并传递给任何需要的通道适配器。
它会管理MQTT连接生命周期,并在需要时自动重新连接。
另外,还有自定义连接选项和MqttClientPersistence可以像目前通道适配器组件一样提供给客户端管理器。
请注意,支持MQTT v5和v3通道适配器。
以下Java DSL配置示例演示如何在集成流程中使用该客户端管理器:
@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
connectionOptions.setConnectionTimeout(30000);
connectionOptions.setMaxReconnectDelay(1000);
connectionOptions.setAutomaticReconnect(true);
Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
clientManager.setPersistence(new MqttDefaultFilePersistence());
return clientManager;
}
@Bean
public IntegrationFlow mqttInFlowTopic1(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttInFlowTopic2(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttOutFlow(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}