|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
RSocket 支持
RSocket Spring 集成模块(spring-integration-rsocket)允许执行RSocket 应用协议。
您需要将以下依赖项包含到您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>6.4.10</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:6.4.10"
此模块从版本 5.2 开始可用,基于 Spring Messaging 基础及其 RSocket 组件实现,例如 RSocketRequester、RSocketMessageHandler 和 RSocketStrategies。
有关 RSocket 协议、术语和组件的更多信息,请参阅 Spring Framework RSocket 支持。
在通过通道适配器开始集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。
为此,Spring Integration RSocket 支持提供了 ServerRSocketConnector 和 ClientRSocketConnector 两个实现类,它们均实现了 AbstractRSocketConnector 接口。
The ServerRSocketConnector 根据提供的 io.rsocket.transport.ServerTransport 在主机和端口上暴露一个监听器,用于接受来自客户端的连接。内部 RSocketServer 实例可以使用 setServerConfigurer() 以及其他可配置的选项进行自定义,例如。g.RSocketStrategies and MimeType for payload data and headers metadata.当客户端请求方提供setupRoute时(见下文ClientRSocketConnector),已连接的客户端将作为RSocketRequester存储,其键由clientRSocketKeyStrategyBiFunction<Map<String, Object>, DataBuffer, Object>确定。默认情况下,连接数据将作为键使用,并转换为 UTF-8 字符集的字符串值。这样一个 RSocketRequester 注册表可用于应用程序逻辑中,以确定与特定客户端连接进行交互,或将同一消息发布给所有已连接的客户端。当客户端建立连接时,RSocketConnectedEvent 会从 ServerRSocketConnector 发出。这类似于 Spring Messaging 模块中提供的 @ConnectMapping 注解的功能。映射模式 * 表示接受所有客户端路由。The RSocketConnectedEvent can be used to distinguish different routes via DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER header.
典型的服务器配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
所有选项,包括 RSocketStrategies 的 bean 和 @EventListener 用于 RSocketConnectedEvent,都是可选的。
有关更多信息,请参阅 ServerRSocketConnector JavaDocs。
从版本 5.2.1 开始,ServerRSocketMessageHandler 被提取为一个公共的顶层类,以便与现有的 RSocket 服务器进行连接。
当提供一个外部 ServerRSocketMessageHandler 实例并传入 ServerRSocketConnector 时,它不会在内部创建 RSocket 服务器,而是将所有处理逻辑委托给提供的实例。
此外,ServerRSocketMessageHandler 可以配置一个 messageMappingCompatible 标志,以便也为 RSocket 控制器处理 @MessageMapping,从而完全替代标准 RSocketMessageHandler 提供的功能。
这在混合配置中非常有用,例如当应用程序中同时存在经典的 @MessageMapping 方法、RSocket 通道适配器以及已外部配置的 RSocket 服务器时。
The ClientRSocketConnector 作为基于 RSocket 的 RSocketRequester 的持有者,通过提供的 ClientTransport 进行连接。
The RSocketConnector 可以使用提供的 RSocketConnectorConfigurer 进行自定义。
The setupRoute(可选模板变量)和带有元数据的 setupData 也可以在此组件上进行配置。
典型的客户端配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
大多数这些选项(包括 RSocketStrategies bean)都是可选的。
请注意我们如何连接到本地启动的 RSocket 服务器所使用的任意端口。
有关示例用例,请参见 ServerRSocketConnector.clientRSocketKeyStrategy 中的 setupData。
此外,请参阅 ClientRSocketConnector 及其 AbstractRSocketConnector 超类的 Javadocs 以获取更多信息。
Both ClientRSocketConnector and ServerRSocketConnector 都负责将入站通道适配器映射到其 path 配置,以路由传入的 RSocket 请求。
有关更多信息,请参阅下一节。
RSocket 入站网关
RSocketInboundGateway负责接收RSocket请求并生成响应(如果有)。它需要一个包含path个映射的数组,该映射可以是类似于MVC请求映射的模式或@MessageMapping种语义。此外,(自版本 5.起)2.2), 一组交互模型(见RSocketInteractionModel)可在RSocketInboundGateway上配置,以通过特定帧类型限制RSocket请求到该端点。默认情况下,所有交互模型均受支持。这样一个 Bean,根据其IntegrationRSocketEndpoint实现(对ReactiveMessageHandler的扩展),会通过ServerRSocketConnector或ClientRSocketConnector自动检测,以便在内部IntegrationRSocketMessageHandler中为传入请求执行路由逻辑。一个 AbstractRSocketConnector 可以被提供给 RSocketInboundGateway 用于显式端点注册。通过这种方式,该AbstractRSocketConnector上的自动检测选项将被禁用。RSocketStrategies 也可以被注入到 RSocketInboundGateway 中,或者从提供的 AbstractRSocketConnector 中获取,从而覆盖任何显式注入。解码器用于从这些 RSocketStrategies 中根据提供的 requestElementType 来解码请求负载。如果传入的 RSocketPayloadReturnValueHandler.RESPONSE_HEADER 头未提供,Message 将把请求视为 fireAndForget RSocket 交互模型。在此情况下,RSocketInboundGateway 对 outputChannel 执行一个简单的 send 操作。否则,将使用来自 RSocketPayloadReturnValueHandler.RESPONSE_HEADER 头部的 MonoProcessor 值向 RSocket 发送回复。为此目的,一个RSocketInboundGateway对outputChannel执行sendAndReceiveMessageReactive操作。根据 MessagingRSocket 逻辑,要向下游发送的消息的 payload 始终为 Flux。当处于 fireAndForget RSocket 交互模型时,消息具有一个普通的转换 payload。回复 payload 可以是一个普通对象或一个 Publisher - RSocketInboundGateway 会根据 RSocketStrategies 中提供的编码器,将两者正确地转换为 RSocket 响应。
从 5.3 版本开始,decodeFluxAsUnit 选项(默认值为 false)已添加到 RSocketInboundGateway 中。
默认情况下,传入的 Flux 会以每个事件单独解码的方式进行转换。
这与当前 @MessageMapping 语义下的确切行为一致。
若要恢复之前的行为或根据应用程序需求将整个 Flux 作为单个单元进行解码,则需将 decodeFluxAsUnit 设置为 true。
然而,目标解码逻辑取决于所选的 Decoder,例如 StringDecoder 要求流中存在换行符(默认情况)以指示字节缓冲区的结束。
查看 使用 Java 配置 RSocket 端点 以获取如何配置 RSocketInboundGateway 端点并在下游处理负载的示例。
RSocket 出站网关
The RSocketOutboundGateway 是一个用于向 RSocket 发起请求并根据 RSocket 回复(如果有)生成回复的 AbstractReplyProducingMessageHandler。
底层的 RSocket 协议交互被委托给一个从提供的 ClientRSocketConnector 或服务器端请求消息中的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER 头信息中解析出的 RSocketRequester。
服务器端的目标 RSocketRequester 可以从 RSocketConnectedEvent 解析,或者根据为连接请求映射所选的某个业务键,通过 ServerRSocketConnector.setClientRSocketKeyStrategy() 使用 ServerRSocketConnector.getClientRSocketRequester() API 进行解析。
有关更多信息,请参阅 ServerRSocketConnector JavaDocs。
发送请求的 route 必须显式配置(与路径变量一起)或通过针对请求消息求值的 SpEL 表达式进行配置。
RSocket 交互模型可通过 RSocketInteractionModel 选项或相应的表达式设置提供。
默认情况下,对于通用网关用例会使用 requestResponse。
当请求消息负载为Publisher时,可提供一个publisherElementType选项,根据目标RSocketRequester中提供的RSocketStrategies对其元素进行编码。
此选项的表达式可求值为ParameterizedTypeReference。
有关数据和其类型的更多信息,请参阅RSocketRequester.RequestSpec.data() JavaDocs。
RSocket 请求也可以通过 metadata 进行增强。
为此,可以在 RSocketOutboundGateway 上针对请求消息配置一个 metadataExpression。
此类表达式必须求值为 Map<Object, MimeType>。
当interactionModel不等于fireAndForget时,必须提供expectedResponseType。
默认情况下,它是一个String.class。
此选项的表达式可求值为ParameterizedTypeReference。
有关回复数据及其类型的更多信息,请参阅RSocketRequester.RetrieveSpec.retrieveMono()和RSocketRequester.RetrieveSpec.retrieveFlux()的 JavaDocs。
来自 RSocketOutboundGateway 的回复 payload 是一个 Mono(即使对于 fireAndForget 交互模型,它也是 Mono<Void>),始终使该组件成为 async。
这样的 Mono 在产生到用于常规通道的 outputChannel 之前会被订阅,或者由 FluxMessageChannel 按需处理。
针对 requestStream 或 requestChannel 交互模型的 Flux 响应也会被包装成一个回复 Mono。
它可以通过带有直通服务激活器的 FluxMessageChannel 在下游进行扁平化处理:
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
或在目标应用程序逻辑中显式订阅。
预期的响应类型也可以配置(或通过表达式评估)为 void,将此网关视为出站通道适配器。
然而,outputChannel 仍必须被配置(即使只是一个 NullChannel),以启动对返回的 Mono 的订阅。
查看 使用 Java 配置 RSocket 端点 以获取如何配置 RSocketOutboundGateway 端点并处理下游负载的示例。
RSocket 命名空间支持
Spring Integration 提供了一个rsocket命名空间及相应的架构定义。
要将其包含在您的配置中,请在应用程序上下文配置文件中添加以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
传入
要使用 XML 配置 Spring Integration RSocket 入站通道适配器,您需要从 int-rsocket 命名空间中使用适当的 inbound-gateway 组件。
以下示例展示了如何配置它:
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
ClientRSocketConnector 和 ServerRSocketConnector 应配置为通用的 <bean> 定义。
出站
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
查看 spring-integration-rsocket.xsd 以获取所有 XML 属性的描述。
使用 Java 配置 RSocket 端点
以下示例展示了如何使用 Java 配置 RSocket 入站端点:
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
在此配置中,ClientRSocketConnector 或 ServerRSocketConnector 被假定具有用于在“echo”路径上自动检测此类端点的含义。
请注意 @Transformer 签名,它完全采用响应式方式处理 RSocket 请求并生成响应式回复。
以下示例展示了如何使用 Java DSL 配置 RSocket 入站网关:
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
在此配置中,ClientRSocketConnector或ServerRSocketConnector被假定为具有特定含义,用于自动检测“/uppercase”路径上的此类端点,并期望的交互模型为“请求通道”。
以下示例展示了如何使用 Java 配置 RSocket 出站网关:
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
The setClientRSocketConnector() is required only for the client side.
On the server side, the RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER header with an RSocketRequester value must be supplied in the request message.
以下示例展示了如何使用 Java DSL 配置 RSocket 出站网关:
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
有关如何在上述流程开始时使用提到的Function接口,请参阅IntegrationFlow作为网关以获取更多信息。