|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
WebSocket 支持
从 4.1 版本开始,Spring Integration 支持 WebSocket。
它基于 Spring Framework 的 web-socket 模块的架构、基础设施和 API。
因此,许多 Spring WebSocket 的组件(如 SubProtocolHandler 或 WebSocketClient)以及配置选项(如 @EnableWebSocketMessageBroker)都可以在 Spring Integration 中复用。
更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework WebSocket Support 章节。
您需要将以下依赖项包含到您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-websocket</artifactId>
<version>6.4.10</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:6.4.10"
对于服务器端,必须显式包含 org.springframework:spring-webmvc 依赖项。
Spring Framework WebSocket 基础设施基于 Spring 消息传递基础架构,并提供了基于相同 MessageChannel 实现和 MessageHandler 实现的基本消息传递框架,这些实现也是 Spring Integration 所使用的(以及某些 POJO 方法注解映射)。
因此,即使没有 WebSocket 适配器,Spring Integration 也可以直接参与 WebSocket 流程。
为此,您可以按照以下示例所示,配置带有适当注解的 Spring Integration @MessagingGateway:
@MessagingGateway
@Controller
public interface WebSocketGateway {
@MessageMapping("/greeting")
@SendToUser("/queue/answer")
@Gateway(requestChannel = "greetingChannel")
String greeting(String payload);
}
概述
由于 WebSocket 协议本质上就是流式的,并且我们可以同时向 WebSocket 发送和接收消息,因此无论处于客户端还是服务端,我们都可以处理适当的 WebSocketSession。
为了封装连接管理和 WebSocketSession 注册表,IntegrationWebSocketContainer 提供了 ClientWebSocketContainer 和 ServerWebSocketContainer 实现。
得益于 WebSocket API 及其在 Spring Framework(包含许多扩展)中的实现,从 Java 角度来看,服务器端和客户端使用的是相同的类。
因此,大多数连接和 WebSocketSession 注册表选项在两端都是相同的。
这使得我们能够重用许多配置项和基础设施钩子,以便在服务器端和客户端构建 WebSocket 应用程序。
以下示例展示了组件如何同时服务于这两个目的:
//Client side
@Bean
public WebSocketClient webSocketClient() {
return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}
@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}
//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
return new ServerWebSocketContainer("/endpoint").withSockJs();
}
The IntegrationWebSocketContainer 旨在实现双向消息传递,可在入站和出站通道适配器之间共享(见下文);当使用单向(发送或接收)WebSocket 消息传递时,它只能被其中一个引用。
它可以不配合任何通道适配器使用,但在这种情况下,IntegrationWebSocketContainer 仅充当 WebSocketSession 注册表的角色。
The ServerWebSocketContainer implements WebSocketConfigurer to register an internal IntegrationWebSocketContainer.IntegrationWebSocketHandler as an Endpoint.
It does so under the provided paths and other server WebSocket options (such as HandshakeHandler or SockJS fallback) within the ServletWebSocketHandlerRegistry for the target vendor WebSocket Container.
This registration is achieved with an infrastructural WebSocketIntegrationConfigurationInitializer component, which does the same as the @EnableWebSocket annotation.
This means that, by using @EnableIntegration (or any Spring Integration namespace in the application context), you can omit the @EnableWebSocket declaration, because the Spring Integration infrastructure detects all WebSocket endpoints. |
从版本 6.1 开始,ClientWebSocketContainer 可以使用提供的 URI 进行配置,而不再需要 uriTemplate 和 uriVariables 的组合。
这在某些 URI 部分需要自定义编码的情况下非常有用。
请参见 UriComponentsBuilder API 以获取便利功能。
WebSocket 入站通道适配器
WebSocketInboundChannelAdapter实现了WebSocketSession交互的接收部分。
您必须向其提供一个IntegrationWebSocketContainer,该适配器会将其自身注册为WebSocketListener以处理传入消息和WebSocketSession事件。
在 IntegrationWebSocketContainer 中只能注册一个 WebSocketListener。 |
对于 WebSocket 子协议,WebSocketInboundChannelAdapter 可以配置为第二个构造函数参数 SubProtocolHandlerRegistry。
适配器委托给 SubProtocolHandlerRegistry 来确定被接受的 WebSocketSession 所对应的适当 SubProtocolHandler,并根据子协议实现将 WebSocketMessage 转换为 Message。
默认情况下,WebSocketInboundChannelAdapter仅依赖于原始的PassThruSubProtocolHandler实现,该实现将WebSocketMessage转换为Message。 |
The WebSocketInboundChannelAdapter 接受并仅向底层集成流发送具有 SimpMessageType.MESSAGE 或空 simpMessageType 头的 Message 实例。
所有其他 Message 类型均通过从 SubProtocolHandler 实现(例如 StompSubProtocolHandler)发出的 ApplicationEvent 实例进行处理。
在服务器端,如果存在@EnableWebSocketMessageBroker配置,您可以使用useBroker = true选项配置WebSocketInboundChannelAdapter。
在这种情况下,所有non-MESSAGEMessage类型都会被委托给提供的AbstractBrokerMessageHandler。
此外,如果代理中继配置了目标前缀,那么与代理目标匹配的消息将被路由到AbstractBrokerMessageHandler,而不是路由到WebSocketInboundChannelAdapter的outputChannel。
如果 useBroker = false 且接收到的消息类型为 SimpMessageType.CONNECT,则 WebSocketInboundChannelAdapter 会立即向 WebSocketSession 发送一条 SimpMessageType.CONNECT_ACK 消息,而不会将其发送到通道。
Spring 的 WebSocket 支持仅允许配置一个代理中继。
因此,我们不需要 AbstractBrokerMessageHandler 引用。
它会在应用上下文中被检测到。 |
有关更多配置选项,请参见 WebSockets 命名空间支持。
WebSocket 出站通道适配器
The WebSocketOutboundChannelAdapter:
-
接受来自其
MessageChannel的 Spring Integration 消息 -
确定来自
MessageHeaders的WebSocketSessionid -
从提供的
IntegrationWebSocketContainer中检索WebSocketSession -
将
WebSocketMessage个工作的转换和发送委托给从提供的SubProtocolHandlerRegistry中获取的适当的SubProtocolHandler。
在客户端,WebSocketSession id 消息头不是必需的,因为 ClientWebSocketContainer 仅处理单个连接及其对应的 WebSocketSession。
要使用 STOMP 子协议,您应使用 StompSubProtocolHandler 配置此适配器。
然后,您可以使用 StompHeaderAccessor.create(StompCommand…) 和 MessageBuilder,或者仅使用 HeaderEnricher(参见 头部增强器),向此适配器发送任何 STOMP 消息类型。
本章其余部分主要介绍其他配置选项。
WebSocket 命名空间支持
Spring Integration WebSocket 命名空间包含本章其余部分描述的若干组件。 要将其包含在您的配置中,请在应用程序上下文配置文件中添加以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/websocket
https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
...
</beans>
<int-websocket:client-container>属性
以下列表显示了 <int-websocket:client-container> 元素可用的属性:
<int-websocket:client-container
id="" (1)
client="" (2)
uri="" (3)
uri-variables="" (4)
origin="" (5)
send-time-limit="" (6)
send-buffer-size-limit="" (7)
send-buffer-overflow-strategy="" (8)
auto-startup="" (9)
phase=""> (10)
<int-websocket:http-headers>
<entry key="" value=""/>
</int-websocket:http-headers> (11)
</int-websocket:client-container>
| 1 | 组件 bean 的名称。 |
| 2 | The WebSocketClient bean reference. |
| 3 | 将 uri 或 uriTemplate 指向目标 WebSocket 服务。
如果您将其用作带有 URI 变量占位符的 uriTemplate,则必须指定 uri-variables 属性。 |
| 4 | 用于 URI 变量占位符的逗号分隔值,这些占位符位于 uri 属性值内。
这些值将根据它们在 uri 中的顺序替换到对应的占位符中。
请参阅 UriComponents.expand(Object…uriVariableValues)。 |
| 5 | The Origin 握手 HTTP 头的值。 |
| 6 | WebSocket 会话的'发送'超时限制。
默认值为 10000。 |
| 7 | WebSocket 会话的'send'消息大小限制。
默认值为 524288。 |
| 8 | WebSocket 会话发送缓冲区溢出策略
确定了当会话的出站消息缓冲区达到 send-buffer-size-limit 时的行为。
有关可能的值和更多详细信息,请参阅 ConcurrentWebSocketSessionDecorator.OverflowStrategy。 |
| 9 | 布尔值,指示此端点是否应自动启动。
默认为 false,假设该容器是从 WebSocket 入站适配器 启动的。 |
| 10 | 该端点应启动和停止的生命周期阶段。
值越小,该端点启动越早,停止越晚。
默认值为 Integer.MAX_VALUE。
值可以为负数。
参见 SmartLifeCycle。 |
| 11 | 一个 Map 到 HttpHeaders 之间的值,用于握手请求。 |
<int-websocket:server-container>属性
以下列表显示了 <int-websocket:server-container> 元素可用的属性:
<int-websocket:server-container
id="" (1)
path="" (2)
handshake-handler="" (3)
handshake-interceptors="" (4)
decorator-factories="" (5)
send-time-limit="" (6)
send-buffer-size-limit="" (7)
send-buffer-overflow-strategy="" (8)
allowed-origins=""> (9)
<int-websocket:sockjs
client-library-url="" (10)
stream-bytes-limit="" (11)
session-cookie-needed="" (12)
heartbeat-time="" (13)
disconnect-delay="" (14)
message-cache-size="" (15)
websocket-enabled="" (16)
scheduler="" (17)
message-codec="" (18)
transport-handlers="" (19)
suppress-cors="true" /> (20)
</int-websocket:server-container>
| 1 | 组件 bean 的名称。 |
| 2 | 一个路径(或逗号分隔的路径列表),用于将特定请求映射到 WebSocketHandler。
支持精确路径映射 URI(例如 /myPath)和 ant 风格路径模式(例如 /myPath/**)。 |
| 3 | The HandshakeHandler bean 引用。
默认为 DefaultHandshakeHandler。 |
| 4 | 0 个 Bean 引用列表。 |
| 5 | 装饰用于处理 WebSocket 消息的处理器的工厂列表(WebSocketHandlerDecoratorFactory)。
这对于某些高级用例可能很有用(例如,允许 Spring Security 在相应的 HTTP 会话过期时强制关闭 WebSocket 会话)。
有关更多信息,请参阅 Spring Session 项目。 |
| 6 | 在 <int-websocket:client-container> 上查看相同选项。 |
| 7 | 在 <int-websocket:client-container> 上查看相同选项。 |
| 8 | WebSocket 会话发送缓冲区溢出策略
确定了当会话的出站消息缓冲区达到 send-buffer-size-limit 时的行为。
有关可能的值和更多详细信息,请参阅 ConcurrentWebSocketSessionDecorator.OverflowStrategy。 |
| 9 | 允许的源(origin)头值。
您可以指定多个源,以逗号分隔的列表形式。
此检查主要针对浏览器客户端设计。
其他类型的客户端修改源头值没有任何限制。
当启用 SockJS 且限制允许的源时,不使用源头进行跨域请求的传输类型(jsonp-polling、iframe-xhr-polling、iframe-eventsource和iframe-htmlfile)将被禁用。
因此,不支持 IE6 和 IE7;IE8 和 IE9 仅在无 Cookie 的情况下受支持。
默认情况下,允许所有源。 |
| 10 | 不支持原生跨域通信的传输方式(例如 eventsource 和 htmlfile)必须从“外部”域获取一个简单的页面,并将其加载到不可见的 iframe 中,以便 iframe 中的代码可以在与 SockJS 服务器相同的域下运行。
由于 iframe 需要加载 SockJS JavaScript 客户端库,此属性允许您指定加载该库的位置。
默认情况下,它指向 d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js。
不过,您也可以将其设置为指向由应用程序提供的 URL。
请注意,可以指定相对 URL;在这种情况下,该 URL 必须是相对于 iframe URL 的。
例如,假设 SockJS 端点映射为 /sockjs,且生成的 iframe URL 为 /sockjs/iframe.html,则相对 URL 必须以 "../../" 开头,以向上遍历至 SockJS 映射位置之上的目录。
对于基于前缀的 servlet 映射,您可能还需要多进行一次遍历。 |
| 11 | 在单个 HTTP 流式请求关闭之前,可发送的最小字节数。
默认值为 128K(即 128*1024 或 131072 字节)。 |
| 12 | 响应中来自 SockJs cookie_needed 端点的值。/info。
此属性指示应用程序正常运行是否需要 JSESSIONID cookie(例如,用于负载均衡或在 Java Servlet 容器中为 HTTP 会话使用)。 |
| 13 | 服务器在发送任何消息后未收到客户端响应(以毫秒为单位)的时间长度,超过该时间后,服务器应向客户端发送心跳帧以保持连接不中断。
默认值为 25,000(25 秒)。 |
| 14 | 在客户端被认为断开连接之前,服务器等待接收连接(即服务器可以向客户端发送数据的活跃连接)的时间长度(以毫秒为单位)。
默认值为 5000。 |
| 15 | 会话可以缓存的从服务器到客户端的消息数量,等待来自客户端的下一个 HTTP 轮询请求。
默认大小为 100。 |
| 16 | 某些负载均衡器不支持 WebSocket。
将此选项设置为 false 以在服务器端禁用 WebSocket 传输。
默认值为 true。 |
| 17 | The TaskScheduler bean reference.
A new ThreadPoolTaskScheduler instance is created if no value is provided.
This scheduler instance is used for scheduling heart-beat messages. |
| 18 | 用于对 SockJS 消息进行编码和解码的 SockJsMessageCodec bean 引用。
默认情况下使用 Jackson2SockJsMessageCodec,这要求类路径中存在 Jackson 库。 |
| 19 | 0 个 Bean 引用列表。 |
| 20 | 是否禁用为 SockJS 请求自动添加 CORS 头。
默认值为 false。 |
<int-websocket:outbound-channel-adapter>属性
以下列表显示了 <int-websocket:outbound-channel-adapter> 元素可用的属性:
<int-websocket:outbound-channel-adapter
id="" (1)
channel="" (2)
container="" (3)
default-protocol-handler="" (4)
protocol-handlers="" (5)
message-converters="" (6)
merge-with-default-converters="" (7)
auto-startup="" (8)
phase=""/> (9)
| 1 | 组件的 Bean 名称。
如果您未提供 channel 属性,则会创建并注册一个 DirectChannel 到应用上下文中,该 id 属性将作为 Bean 的名称。
在这种情况下,端点将以 Bean 名称 id 加上 .adapter 进行注册。
同时,MessageHandler 将以别名 id 加上 .handler 进行注册。 |
| 2 | 标识与此适配器关联的通道。 |
| 3 | 对封装了低级连接和WebSocketSession处理操作的IntegrationWebSocketContainer bean的引用。
必需项。 |
| 4 | 对 SubProtocolHandler 实例的可选引用。
当客户端未请求子协议或仅为单一协议处理器时使用它。
如果未提供此引用或 protocol-handlers 列表,则默认使用 PassThruSubProtocolHandler。 |
| 5 | 此通道适配器有 SubProtocolHandler 个 bean 引用列表。
如果您仅提供单个 bean 引用且未提供 default-protocol-handler,则该单个 SubProtocolHandler 将用作 default-protocol-handler。
如果您未设置此属性或 default-protocol-handler,则默认使用 PassThruSubProtocolHandler。 |
| 6 | 此通道适配器的 MessageConverter 个 bean 引用列表。 |
| 7 | 布尔值,指示是否应在任何自定义转换器之后注册默认转换器。
仅当提供message-converters时才会使用此标志。
否则,将注册所有默认转换器。
默认为false。
默认转换器(按顺序)为:StringMessageConverter、ByteArrayMessageConverter以及MappingJackson2MessageConverter(如果类路径中存在 Jackson 库)。 |
| 8 | 布尔值,指示此端点是否应自动启动。
默认值为 true。 |
| 9 | 该端点应启动和停止的生命周期阶段。
值越小,该端点启动越早,停止越晚。
默认值为 Integer.MIN_VALUE。
值可以为负数。
参见 SmartLifeCycle。 |
<int-websocket:inbound-channel-adapter>属性
以下列表显示了 <int-websocket:outbound-channel-adapter> 元素可用的属性:
<int-websocket:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
container="" (4)
default-protocol-handler="" (5)
protocol-handlers="" (6)
message-converters="" (7)
merge-with-default-converters="" (8)
send-timeout="" (9)
payload-type="" (10)
use-broker="" (11)
auto-startup="" (12)
phase=""/> (13)
| 1 | 组件 Bean 的名称。
如果您未设置 channel 属性,则会自动创建并注册一个 ID 为 DirectChannel 的 Bean,其名称由该 id 属性指定。
在此情况下,端点将以 Bean 名称 id 加上 .adapter 的形式进行注册。 |
| 2 | 标识与此适配器关联的通道。 |
| 3 | 要发送到的 MessageChannel bean 的引用,该引用将接收 ErrorMessage 个实例。 |
| 4 | 在 <int-websocket:outbound-channel-adapter> 上查看相同选项。 |
| 5 | 在 <int-websocket:outbound-channel-adapter> 上查看相同选项。 |
| 6 | 在 <int-websocket:outbound-channel-adapter> 上查看相同选项。 |
| 7 | 在 <int-websocket:outbound-channel-adapter> 上查看相同选项。 |
| 8 | 在 <int-websocket:outbound-channel-adapter> 上查看相同选项。 |
| 9 | 向通道发送消息时,如果通道可能阻塞,则等待的最大时间(以毫秒为单位)。
例如,如果已达到最大容量,QueueChannel 可能会一直阻塞直到有可用空间。 |
| 10 | 目标 Java 类型的完全限定名,用于将传入的payload转换为WebSocketMessage。
默认为java.lang.String。 |
| 11 | 指示此适配器是否将0、1实例和带有代理目标的消息从应用上下文发送到2。 当此属性为3时,需要配置4。 此属性仅用于服务器端。 在客户端侧,它将被忽略。 默认值为5。 |
| 12 | 在 <int-websocket:outbound-channel-adapter> 上查看相同选项。 |
| 13 | 在 <int-websocket:outbound-channel-adapter> 上查看相同选项。 |
使用ClientStompEncoder
从版本 4.3.13 开始,Spring Integration 提供了 ClientStompEncoder(作为标准 StompEncoder 的扩展),用于 WebSocket 通道适配器的客户端侧。
为了正确进行客户端侧的消息准备,您必须将 ClientStompEncoder 的实例注入到 StompSubProtocolHandler 中。
默认 StompSubProtocolHandler 的一个问题是,它是为服务器端设计的,因此会将 SEND stompCommand 头更新为 MESSAGE(这是 STOMP 协议对服务器端的要求)。
如果客户端未以正确的 SEND WebSocket 帧发送消息,某些 STOMP 代理将不接受这些消息。
在这种情况下,ClientStompEncoder 的目的是覆盖 stompCommand 头,并在将消息编码为 byte[] 之前将其设置为 SEND 值。
动态 WebSocket 端点注册
从版本 5.5 开始,WebSocket 服务器端点(基于 ServerWebSocketContainer 的通道适配器)现在可以在运行时注册(和移除)- 将 paths 映射到 ServerWebSocketContainer 的内容通过 HandlerMapping 暴露为 DispatcherServlet,并可被 WebSocket 客户端访问。
动态和运行时集成流 支持帮助以透明方式注册这些端点:
@Autowired
IntegrationFlowContext integrationFlowContext;
@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
new ServerWebSocketContainer("/dynamic")
.setHandshakeHandler(this.handshakeHandler);
WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
new WebSocketInboundChannelAdapter(serverWebSocketContainer);
QueueChannel dynamicRequestsChannel = new QueueChannel();
IntegrationFlow serverFlow =
IntegrationFlow.from(webSocketInboundChannelAdapter)
.channel(dynamicRequestsChannel)
.get();
IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
this.integrationFlowContext.registration(serverFlow)
.addBean(serverWebSocketContainer)
.register();
...
dynamicServerFlow.destroy();
调用 .addBean(serverWebSocketContainer) 对于动态流注册非常重要,以便将 ServerWebSocketContainer 的实例添加到 ApplicationContext 中用于端点注册。
当动态流注册被销毁时,关联的 ServerWebSocketContainer 实例也会被销毁,同时其对应的端点注册(包括 URL 路径映射)也会被销毁。 |
动态 Websocket 端点只能通过 Spring Integration 机制进行注册:当使用常规 Spring @EnableWebsocket 时,Spring Integration 配置会回退,且不会注册任何用于动态端点的底层设施。 |