|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
MQTT 支持
Spring Integration 提供了入站和出站通道适配器,以支持消息队列遥测传输(MQTT)协议。
您需要将以下依赖项包含到您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.4.10</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:6.4.10"
当前实现使用了 Eclipse Paho MQTT Client 库。
| 本章节的 XML 配置及大部分内容均关于 MQTT v3.1 协议支持及其对应的 Paho Client。 有关相应协议支持的详情,请参阅 MQTT v5 支持 段落。 |
Both adapters' configuration is achieved using the DefaultMqttPahoClientFactory.
Refer to the Paho documentation for more information about configuration options.
我们建议配置一个 MqttConnectOptions 对象并将其注入到工厂中,而不是在工厂本身上设置(已弃用)的选项。 |
入站(消息驱动)通道适配器
入站通道适配器由MqttPahoMessageDrivenChannelAdapter实现。
为方便起见,您可以使用命名空间对其进行配置。
最小配置可能如下所示:
<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="connectionOptions">
<bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
<property name="userName" value="${mqtt.username}"/>
<property name="password" value="${mqtt.password}"/>
</bean>
</property>
</bean>
<int-mqtt:message-driven-channel-adapter id="mqttInbound"
client-id="${mqtt.default.client.id}.src"
url="${mqtt.url}"
topics="sometopic"
client-factory="clientFactory"
channel="output"/>
以下列表显示了可用的属性:
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="foo" (1)
url="tcp://localhost:1883" (2)
topics="bar,baz" (3)
qos="1,2" (4)
converter="myConverter" (5)
client-factory="clientFactory" (6)
send-timeout="123" (7)
error-channel="errors" (8)
recovery-interval="10000" (9)
manual-acks="false" (10)
channel="out" />
| 1 | 客户端 ID。 |
| 2 | 代理服务器URL。 |
| 3 | 接收消息的主题的逗号分隔列表。 |
| 4 | QoS 值的逗号分隔列表。 它可以是应用于所有主题的单值,也可以是每个主题的对应值(此时两个列表的长度必须相同)。 |
| 5 | An MqttMessageConverter (optional)。
默认情况下,DefaultPahoMessageConverter 会生成一个带有以下标题的 String 负载的消息:
|
| 6 | 客户端工厂。 |
| 7 | The send() 超时。
它仅适用于通道可能会阻塞的情况(例如,当前已满的有界 QueueChannel)。 |
| 8 | 错误通道。
下游异常会被发送到此通道(如果已提供),在 ErrorMessage 中。
负载是一个包含失败消息和原因的 MessagingException。 |
| 9 | 恢复间隔。
它控制适配器在失败后尝试重新连接的间隔时间。
默认值为 10000ms(十秒)。 |
| 10 | 确认模式;设置为 true 以启用手动确认。 |
从 4.1 版本开始,您可以省略 URL。
相反,您可以在 serverURIs 属性的 DefaultMqttPahoClientFactory 中提供服务器 URI。
这样做可以实现例如连接到高可用(HA)集群的功能。 |
从版本 4.2.2 开始,当适配器成功订阅主题时,会发布一个MqttSubscribedEvent。
当连接或订阅失败时,会发布MqttConnectionFailedEvent个事件。
这些事件可以由实现ApplicationListener的 bean 接收。
此外,一个名为 recoveryInterval 的新属性控制适配器在失败后尝试重新连接的间隔时间。
其默认值为 10000ms(十秒)。
|
在 4.2.3 版本之前,当适配器停止时,客户端总是取消订阅。
这是不正确的,因为如果客户端 QoS 大于 0,我们需要保持订阅活动,以便在适配器停止期间到达的消息能在下次启动时交付。
这也需要将客户端工厂的 从版本 4.2.3 开始,如果 此行为可通过在工厂上设置 若要恢复到4.2.3之前的行为,请使用 |
|
从版本 5.0 开始, |
在运行时添加和移除主题
从版本 4.1 开始,您可以编程方式更改适配器订阅的主题。
Spring Integration 提供了 addTopic() 和 removeTopic() 方法。
添加主题时,您可以选择指定 QoS(默认值:1)。
您还可以通过向 <control-bus/> 发送带有适当负载的相应消息来修改主题,例如:"myMqttAdapter.addTopic('foo', 1)"。
停止和启动适配器不会影响主题列表(它不会恢复到配置中的原始设置)。 这些更改在应用程序上下文的生命周期之外不会被保留。 新的应用程序上下文将恢复到配置的设置。
在适配器停止(或从代理断开连接)时更改主题,将在下次建立连接时生效。
手动确认
从版本 5.3 开始,您可以将 manualAcks 属性设置为 true。
通常用于异步确认交付。
当设置为 true 时,会将头信息 (IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK) 添加到消息中,其值为一个 SimpleAcknowledgment。
您必须调用 acknowledge() 方法以完成交付。
有关更多信息,请参阅 IMqttClient、setManualAcks() 和 messageArrivedComplete() 的 Javadocs。
为方便起见,提供了一个头信息访问器:
StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();
从版本 5.2.11 开始,当消息转换器抛出异常或在 MqttMessage 转换中返回 null 时,MqttPahoMessageDrivenChannelAdapter 会将 ErrorMessage 发送到 errorChannel(如果已提供)。
否则,此转换错误将被重新抛出到 MQTT 客户端回调中。
使用 Java 配置进行配置
以下 Spring Boot 应用示例展示了如何使用 Java 配置来配置入站适配器:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
"topic1", "topic2");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用提供了使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttInbound() {
return IntegrationFlow.from(
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
"testClient", "topic1", "topic2"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
出站通道适配器
出站通道适配器由 MqttPahoMessageHandler 实现,该适配器被封装在 ConsumerEndpoint 中。
为便于配置,您可以使用命名空间对其进行配置。
从版本 4.1 开始,适配器支持异步发送操作,无需等待交付确认即可继续执行。 您可以发射应用程序事件,以便在需要时让应用程序确认交付。
以下清单显示了出站通道适配器可用的属性:
<int-mqtt:outbound-channel-adapter id="withConverter"
client-id="foo" (1)
url="tcp://localhost:1883" (2)
converter="myConverter" (3)
client-factory="clientFactory" (4)
default-qos="1" (5)
qos-expression="" (6)
default-retained="true" (7)
retained-expression="" (8)
default-topic="bar" (9)
topic-expression="" (10)
async="false" (11)
async-events="false" (12)
channel="target" />
| 1 | 客户端 ID。 |
| 2 | 代理服务器URL。 |
| 3 | An MqttMessageConverter (可选)。
默认值 DefaultPahoMessageConverter 识别以下标头:
|
| 4 | 客户端工厂。 |
| 5 | 默认的服务质量。
如果未找到mqtt_qos标头或qos-expression返回null,则使用此值。
如果您提供了自定义converter,则不使用此值。 |
| 6 | 用于评估以确定 qos 的表达式。
默认值为 headers[mqtt_qos]。 |
| 7 | 保留标志的默认值。
如果未找到 mqtt_retained 标头,则使用此值。
如果提供了自定义 converter,则不使用此值。 |
| 8 | 用于评估以确定保留的布尔值的表达式。
默认值为 headers[mqtt_retained]。 |
| 9 | 发送消息的默认主题(如果未找到 mqtt_topic 标头时使用)。 |
| 10 | 用于确定目标主题的表达式。
默认值为 headers['mqtt_topic']。 |
| 11 | 当值为true时,调用方不会阻塞。
相反,在发送消息时会等待交付确认。
默认值为false(发送操作会阻塞直到交付确认)。 |
| 12 | 当 async 和 async-events 均为 true 时,会发出一个 MqttMessageSentEvent(参见 事件)。
它包含消息、主题、由客户端库生成的 messageId、clientId 以及 clientInstance(每次客户端连接时递增)。
当客户端库确认交付后,会发出一个 MqttMessageDeliveredEvent。
它包含 messageId、clientId 和 clientInstance,使得交付能够与 send() 进行关联。
任何 ApplicationListener 或入站通道适配器均可接收这些事件。
请注意,MqttMessageDeliveredEvent 可能在 MqttMessageSentEvent 之前被接收。
默认值为 false。 |
从版本 4.1 开始,URL 可以省略。
取而代之的是,可以在 serverURIs 属性的 DefaultMqttPahoClientFactory 中提供服务器 URI。
这使得例如连接到高可用(HA)集群成为可能。 |
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置来配置出站适配器:
@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToMqtt("foo");
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
options.setUserName("username");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序提供了一个示例,展示如何使用 Java DSL 配置出站适配器:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttOutboundFlow() {
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
}
}
事件
某些应用程序事件由适配器发布。
-
MqttConnectionFailedEvent- 当连接失败或后续连接丢失时,两个适配器都会发布此事件。 对于 MQTT v5 Paho 客户端,当服务器执行正常断开连接时也会发出此事件,此时丢失连接的cause为null。 -
MqttMessageSentEvent- 当消息已发送时,如果以异步模式运行,则由出站适配器发布。 -
MqttMessageDeliveredEvent- 当客户端指示消息已送达时,若以异步模式运行,则由出站适配器发布。 -
MqttMessageNotDeliveredEvent- 当客户端指示消息未成功传递时,如果运行在异步模式下,由出站适配器发布。 -
MqttSubscribedEvent- 由入站适配器在订阅主题后发布。
这些事件可以通过ApplicationListener<MqttIntegrationEvent>或带有@EventListener的方法接收。
要确定事件的来源,请使用以下方法:您可以检查 bean 名称和/或连接选项(以访问服务器 URI 等)。
MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();
MQTT v5 支持
从版本 5.5.5 开始,spring-integration-mqtt模块提供了针对 MQTT v5 协议的通道适配器实现。
org.eclipse.paho:org.eclipse.paho.mqttv5.client是一个optional依赖项,因此必须在目标项目中显式包含。
由于 MQTT v5 协议支持在 MQTT 消息中包含额外的任意属性,因此引入了 MqttHeaderMapper 实现以在发布和接收操作期间映射/反映射到标头。
默认情况下(通过 * 模式),它将所有接收到的 PUBLISH 帧属性(包括用户属性)进行映射。
在出站侧,它为 PUBLISH 帧映射此标头子集:contentType、mqtt_messageExpiryInterval、mqtt_responseTopic、mqtt_correlationData。
MQTT v5 协议的出站通道适配器表现为一个 Mqttv5PahoMessageHandler。
它需要一个 clientId 以及 MQTT 代理 URL 或 MqttConnectionOptions 引用。
它支持 MqttClientPersistence 选项,可以是 async,并且在相应情况下可以发射 MqttIntegrationEvent 对象(参见 asyncEvents 选项)。
如果请求消息的负载是 org.eclipse.paho.mqttv5.common.MqttMessage,则通过内部 IMqttAsyncClient 直接发布。
如果负载是 byte[],则直接用作目标 MqttMessage 负载进行发布。
如果负载是一个 String,则将其转换为 byte[] 后进行发布。
其余用例委托给提供的 MessageConverter,该对象是应用程序上下文中的一个 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME ConfigurableCompositeMessageConverter Bean。
注意:当请求的消息负载已经是 MqttMessage 时,提供的 HeaderMapper<MqttProperties> 不会被使用。
以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:
@Bean
public IntegrationFlow mqttOutFlow() {
Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
messageHandler.setHeaderMapper(mqttHeaderMapper);
messageHandler.setAsync(true);
messageHandler.setAsyncEvents(true);
messageHandler.setConverter(mqttStringToBytesConverter());
return f -> f.handle(messageHandler);
}
The org.springframework.integration.mqtt.support.MqttMessageConverter cannot be used with the Mqttv5PahoMessageHandler since its contract is aimed only for the MQTT v3 protocol. |
如果在启动时或运行时连接失败,Mqttv5PahoMessageHandler将尝试在下一个消息发送到该处理器时重新连接。
如果此手动重连失败,连接异常将被抛回给调用者。
在这种情况下,将应用标准的 Spring Integration 错误处理流程,包括请求处理器建议(例如重试或熔断器)。
在 Mqttv5PahoMessageHandler javadocs 及其父类中查看更多详细信息。
MQTT v5 协议的入站通道适配器表示为 Mqttv5PahoMessageDrivenChannelAdapter。
它需要一个 clientId 以及 MQTT 代理 URL 或 MqttConnectionOptions 引用,加上要订阅和消费的主题。
它支持一个 MqttClientPersistence 选项,默认情况下是内存中的。
预期的 payloadType(默认为 byte[])可以配置,并且会传播到提供的 SmartMessageConverter,用于将接收到的 MqttMessage 的 byte[] 进行转换。
如果设置了 manualAck 选项,则会在消息中添加一个 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 头,以便作为 SimpleAcknowledgment 的实例生成。
HeaderMapper<MqttProperties> 用于将 PUBLISH 帧属性(包括用户属性)映射到目标消息头中。
标准的 MqttMessage 属性,如 qos、id、dup、retained,以及接收到的主题,始终会被映射到头中。
有关更多信息,请参阅 MqttHeaders。
从 6.3 版本开始,Mqttv5PahoMessageDrivenChannelAdapter 提供了基于 MqttSubscription 的构造函数,用于细粒度配置,而不是简单的主题名称。
当提供这些订阅时,通道适配器的 qos 选项将无法使用,因为这种 qos 模式是 MqttSubscription API 的一部分。
以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:
@Bean
public IntegrationFlow mqttInFlow() {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
messageProducer.setPayloadType(String.class);
messageProducer.setMessageConverter(mqttStringToBytesConverter());
messageProducer.setManualAcks(true);
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
The org.springframework.integration.mqtt.support.MqttMessageConverter cannot be used with the Mqttv5PahoMessageDrivenChannelAdapter since its contract is aimed only for the MQTT v3 protocol. |
在 Mqttv5PahoMessageDrivenChannelAdapter javadocs 及其父类中查看更多详细信息。
建议将 MqttConnectionOptions#setAutomaticReconnect(boolean) 设置为 true,以便让内部的 IMqttAsyncClient 实例处理重连。
否则,只有手动重启 Mqttv5PahoMessageDrivenChannelAdapter 才能处理重连,例如通过 MqttConnectionFailedEvent 在断开连接时进行处理。 |
共享 MQTT 客户端支持
如果多个集成需要单个 MQTT ClientID,则不能使用多个 MQTT 客户端实例,因为 MQTT 代理可能对每个 ClientID 的连接数有限制(通常只允许一个连接)。
为了在多个通道适配器中重用单个客户端,可以使用 org.springframework.integration.mqtt.core.ClientManager 组件并将其传递给任何需要的通道适配器。
它将管理 MQTT 连接的生命周期,并在需要时自动重连。
此外,也可以像当前为通道适配器组件所做的那样,向客户端管理器提供自定义连接选项和 MqttClientPersistence。
请注意,同时支持 MQTT v5 和 v3 通道适配器。
以下 Java DSL 配置示例演示了如何在集成流中使用此客户端管理器:
@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
connectionOptions.setConnectionTimeout(30000);
connectionOptions.setMaxReconnectDelay(1000);
connectionOptions.setAutomaticReconnect(true);
Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
clientManager.setPersistence(new MqttDefaultFilePersistence());
return clientManager;
}
@Bean
public IntegrationFlow mqttInFlowTopic1(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttInFlowTopic2(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttOutFlow(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}
从版本 6.4 开始,现在可以在运行时使用相应的 ClientManager 到 IntegrationFlowContext 添加多个 MqttPahoMessageDrivenChannelAdapter 和 Mqttv5PahoMessageDrivenChannelAdapter 实例。 |
private void addAddRuntimeAdapter(IntegrationFlowContext flowContext, Mqttv5ClientManager clientManager,
String topic, MessageChannel channel) {
flowContext
.registration(
IntegrationFlow
.from(new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, topic))
.channel(channel)
.get())
.register();
}