如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

STOMP 支持

Spring Integration 4.2 版本引入了 STOMP(面向简单文本的消息协议)客户端支持。 它基于 Spring Framework 消息模块中 stomp 包的架构、基础设施和 API。 Spring Integration 使用了大量 Spring STOMP 组件(例如 StompSessionStompClientSupport)。 更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework STOMP 支持 章节。spring-doc.cadn.net.cn

您需要将以下依赖项包含到您的项目中:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stomp</artifactId>
    <version>6.4.10</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:6.4.10"

对于服务器端组件,您需要添加 org.springframework:spring-websocket 和/或 io.projectreactor.netty:reactor-netty 依赖项。spring-doc.cadn.net.cn

概述

要配置 STOMP,您应从 STOMP 客户端对象开始。 Spring Framework 提供以下实现:spring-doc.cadn.net.cn

  • WebSocketStompClient: 基于 Spring WebSocket API 构建,支持标准的 JSR-356 WebSocket、Jetty 9,以及使用 SockJS Client 进行基于 HTTP 的 WebSocket 模拟(SockJS)。spring-doc.cadn.net.cn

  • ReactorNettyTcpStompClient: 基于 ReactorNettyTcpClient 构建,源自 reactor-netty 项目。spring-doc.cadn.net.cn

您可以提供任何其他 StompClientSupport 实现。 请参阅这些类的 Javadocspring-doc.cadn.net.cn

The StompClientSupport 类被设计为一个工厂,用于为提供的StompSessionHandler生成一个StompSession,所有其余工作均通过对该StompSessionHandlerStompSession抽象的回调完成。 借助 Spring Integration 的适配器抽象,我们需要提供一个受管理的共享对象,以将我们的应用程序表示为具有唯一会话的 STOMP 客户端。 为此,Spring Integration 提供了StompSessionManager抽象,用于管理任何提供的StompSessionHandler之间的单个StompSession。 这使得可以为特定的 STOMP 代理使用入站出站通道适配器(或两者)。 有关更多信息,请参阅StompSessionManager(及其实现)的 Javadocs。spring-doc.cadn.net.cn

STOMP 入站通道适配器

The StompInboundChannelAdapter 是一个一站式 MessageProducer 组件,它将您的 Spring Integration 应用程序订阅到提供的 STOMP 目的地,并从这些目的地接收消息(通过连接到 StompSession 上提供的 MessageConverter 将 STOMP 帧转换为消息)。 您可以使用 StompInboundChannelAdapter 上的适当 @ManagedOperation 注解在运行时更改目的地(从而更改 STOMP 订阅)。spring-doc.cadn.net.cn

有关更多配置选项,请参见 STOMP 命名空间支持 以及 StompInboundChannelAdapter Javadocspring-doc.cadn.net.cn

STOMP 出站通道适配器

The StompMessageHandler<int-stomp:outbound-channel-adapter>MessageHandler,用于通过 StompSession(由共享的 StompSessionManager 提供)将出站的 Message<?> 实例发送到 STOMP destination(预配置或在运行时通过 SpEL 表达式确定)。spring-doc.cadn.net.cn

有关更多配置选项,请参阅 STOMP 命名空间支持 以及 StompMessageHandler Javadocspring-doc.cadn.net.cn

STOMP 头映射

STOMP 协议在其帧中提供头部信息。 STOMP 帧的整个结构格式如下:spring-doc.cadn.net.cn

....
COMMAND
header1:value1
header2:value2

Body^@
....

Spring Framework 提供了 StompHeaders 来表示这些头信息。 有关更多详细信息,请参阅 Javadoc。 STOMP 帧被转换为 Message<?> 实例并从其转换,这些头信息被映射到 MessageHeaders 实例并从中映射。 Spring Integration 为 STOMP 适配器提供了一个默认的 HeaderMapper 实现。 该实现是 StompHeaderMapper。 它分别为入站和出站适配器提供 fromHeaders()toHeaders() 操作。spring-doc.cadn.net.cn

与许多其他 Spring Integration 模块一样,IntegrationStompHeaders 类用于将标准 STOMP 标头映射到 MessageHeaders,其中 stomp_ 作为标头名称前缀。 此外,所有带有该前缀的 MessageHeaders 实例在发送到目标地址时都会被映射到 StompHeadersspring-doc.cadn.net.cn

有关更多信息,请参阅这些类的 Javadoc 以及 STOMP 命名空间支持 中的 mapped-headers 属性描述。spring-doc.cadn.net.cn

STOMP 集成事件

许多 STOMP 操作是异步的,包括错误处理。 例如,当客户端帧通过添加 RECEIPT 标头请求时,STOMP 会返回一个 RECEIPT 服务器帧。 为了访问这些异步事件,Spring Integration 会发出 StompIntegrationEvent 实例,您可以通过实现 ApplicationListener 或使用 <int-event:inbound-channel-adapter> 来获取它们(参见 接收 Spring 应用程序事件)。spring-doc.cadn.net.cn

具体而言,当 StompExceptionEvent 因无法连接到 STOMP 代理而失败时,AbstractStompSessionManager 会发出一个 StompExceptionEvent。 另一个例子是 StompMessageHandler。 它处理来自 ERROR 的 STOMP 帧,这些帧是服务器对由该 StompMessageHandler 发送的不正确(未被接受)消息的响应。spring-doc.cadn.net.cn

The StompMessageHandler 作为异步响应的一部分,向 StompSession 发送消息时,会触发 StompSession.Receiptable 回调中的 StompReceiptEvent。 The StompReceiptEvent 可以是正数或负数,取决于是否在 receiptTimeLimit 周期内从服务器收到了 RECEIPT 帧,该周期可在 StompClientSupport 实例上进行配置。 其默认值为 15 * 1000(单位为毫秒,即 15 秒)。spring-doc.cadn.net.cn

只有当要发送的消息的 RECEIPT STOMP 头不为 null 时,才会添加 StompSession.Receiptable 回调。 您可以在 StompSession 上通过其 autoReceipt 选项以及在 StompSessionManager 上分别启用自动 RECEIPT 头的生成。

有关如何配置 Spring Integration 以接受这些 ApplicationEvent 实例的更多信息,请参阅 STOMP 适配器 Java 配置spring-doc.cadn.net.cn

STOMP 适配器 Java 配置

以下示例展示了 STOMP 适配器的全面 Java 配置:spring-doc.cadn.net.cn

@Configuration
@EnableIntegration
public class StompConfiguration {

    @Bean
    public ReactorNettyTcpStompClient stompClient() {
        ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
        stompClient.setMessageConverter(new PassThruMessageConverter());
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.afterPropertiesSet();
        stompClient.setTaskScheduler(taskScheduler);
        stompClient.setReceiptTimeLimit(5000);
        return stompClient;
    }

    @Bean
    public StompSessionManager stompSessionManager() {
        ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
        stompSessionManager.setAutoReceipt(true);
        return stompSessionManager;
    }

    @Bean
    public PollableChannel stompInputChannel() {
        return new QueueChannel();
    }

    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter =
        		new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
        adapter.setOutputChannel(stompInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "stompOutputChannel")
    public MessageHandler stompMessageHandler() {
        StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
        handler.setDestination("/topic/myTopic");
        return handler;
    }

    @Bean
    public PollableChannel stompEvents() {
        return new QueueChannel();
    }

    @Bean
    public ApplicationListener<ApplicationEvent> stompEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(StompIntegrationEvent.class);
        producer.setOutputChannel(stompEvents());
        return producer;
    }

}

STOMP 命名空间支持

Spring Integration STOMP 命名空间实现了入站和出站通道适配器组件。 要将其包含在配置中,请在您的应用程序上下文配置文件中提供以下命名空间声明:spring-doc.cadn.net.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stomp
    https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
    ...
</beans>

理解<int-stomp:outbound-channel-adapter>元素

以下列表显示了 STOMP 出站通道适配器的可用属性:spring-doc.cadn.net.cn

<int-stomp:outbound-channel-adapter
                           id=""                      (1)
                           channel=""                 (2)
                           stomp-session-manager=""   (3)
                           header-mapper=""           (4)
                           mapped-headers=""          (5)
                           destination=""             (6)
                           destination-expression=""  (7)
                           auto-startup=""            (8)
                           phase=""/>                 (9)
1 组件 bean 的名称。 The MessageHandler 已使用别名 id 加上 .handler 注册为 bean。 如果您未设置 channel 属性,则会创建一个 DirectChannel,并将其注册到应用程序上下文中,该属性的值将作为 bean 名称。id。 在这种情况下,端点使用 bean 名称 id 加上 .adapter 进行注册。
2 如果存在id,则标识与此适配器关联的通道。 请参见id。 可选。
3 StompSessionManager Bean 的引用,该 Bean 封装了底层连接和 StompSession 处理操作。 必需。
4 引用一个实现了HeaderMapper<StompHeaders>的 Bean,该 Bean 负责将 Spring Integration MessageHeaders与 STOMP 帧头之间进行映射。 它与mapped-headers互斥。 其默认值为StompHeaderMapper
5 以逗号分隔的 STOMP 标头名称列表,这些标头将被映射到 STOMP 帧标头。 仅当未设置 header-mapper 引用时方可提供此内容。 该列表中的值也可以是简单模式,用于与标头名称进行匹配(例如 myheader**myheader)。 特殊标记(STOMP_OUTBOUND_HEADERS)代表所有标准 STOMP 标头(如 content-length、receipt、heart-beat 等)。 默认情况下会包含这些标头。 如果您希望添加自定义标头,同时要求标准标头也被映射,则必须包含此标记,或者通过使用 header-mapper 提供您自己的 HeaderMapper 实现。
6 发送 STOMP 消息的目标名称。 它与 destination-expression 互斥。
7 一个在运行时针对每个 Spring Integration Message(作为根对象)求值的 SpEL 表达式。 它与 destination 互斥。
8 Boolean 值,指示此端点是否应自动启动。 它默认为 true
9 该端点应启动和停止的生命周期阶段。 值越小,该端点启动越早,停止越晚。 默认值为 Integer.MIN_VALUE。 值可以为负数。 参见 SmartLifeCycle

理解<int-stomp:inbound-channel-adapter>元素

以下代码片段显示了 STOMP 入站通道适配器的可用属性:spring-doc.cadn.net.cn

<int-stomp:inbound-channel-adapter
                           id=""                     (1)
                           channel=""                (2)
                           error-channel=""          (3)
                           stomp-session-manager=""  (4)
                           header-mapper=""          (5)
                           mapped-headers=""         (6)
                           destinations=""           (7)
                           send-timeout=""           (8)
                           payload-type=""           (9)
                           auto-startup=""           (10)
                           phase=""/>                (11)
1 组件 bean 的名称。 如果您未设置 channel 属性,系统将创建一个 DirectChannel 并将其注册到应用程序上下文中,该 bean 的名称为此 id 属性的值。 在此情况下,端点将以 bean 名称 id 加上 .adapter 的形式进行注册。
2 标识与此适配器关联的通道。
3 应发送 ErrorMessage 个实例的目标 MessageChannel bean 引用。
4 <int-stomp:outbound-channel-adapter> 上查看相同选项。
5 以逗号分隔的 STOMP 标头名称列表,用于从 STOMP 帧标头进行映射。 仅当未设置 header-mapper 引用时,才能提供此内容。 此列表中的值也可以是简单模式,用于匹配标头名称(例如,myheader**myheader)。 特殊标记 (STOMP_INBOUND_HEADERS) 代表所有标准 STOMP 标头(content-length、receipt、heart-beat 等)。 默认情况下会包含它们。 如果您想添加自己的标头并希望同时映射标准标头,则必须包含此标记,或使用 header-mapper 提供您自己的 HeaderMapper 实现。
6 <int-stomp:outbound-channel-adapter> 上查看相同选项。
7 订阅的 STOMP 目标名称的逗号分隔列表。 目标列表(因此也是订阅列表)可以通过 addDestination()removeDestination() @ManagedOperation 注解在运行时进行修改。
8 向通道发送消息时,如果通道可能阻塞,则等待的最大时间(以毫秒为单位)。 例如,如果已达到最大容量,QueueChannel 可能会一直阻塞直到有可用空间。
9 目标 Java 类型的完全限定名,用于将传入的 STOMP 帧转换为payload。 其默认值为String.class
10 <int-stomp:outbound-channel-adapter> 上查看相同选项。
11 <int-stomp:outbound-channel-adapter> 上查看相同选项。