如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

RSocket 支持

RSocket Spring 集成模块(spring-integration-rsocket)允许执行RSocket 应用协议spring-doc.cadn.net.cn

您需要将以下依赖项包含到您的项目中:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
    <version>6.4.10</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:6.4.10"

此模块从版本 5.2 开始可用,基于 Spring Messaging 基础及其 RSocket 组件实现,例如 RSocketRequesterRSocketMessageHandlerRSocketStrategies。 有关 RSocket 协议、术语和组件的更多信息,请参阅 Spring Framework RSocket 支持spring-doc.cadn.net.cn

在通过通道适配器开始集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。 为此,Spring Integration RSocket 支持提供了 ServerRSocketConnectorClientRSocketConnector 两个实现类,它们均实现了 AbstractRSocketConnector 接口。spring-doc.cadn.net.cn

The ServerRSocketConnector 根据提供的 io.rsocket.transport.ServerTransport 在主机和端口上暴露一个监听器,用于接受来自客户端的连接。内部 RSocketServer 实例可以使用 setServerConfigurer() 以及其他可配置的选项进行自定义,例如。g.RSocketStrategies and MimeType for payload data and headers metadata.当客户端请求方提供setupRoute时(见下文ClientRSocketConnector),已连接的客户端将作为RSocketRequester存储,其键由clientRSocketKeyStrategyBiFunction<Map<String, Object>, DataBuffer, Object>确定。默认情况下,连接数据将作为键使用,并转换为 UTF-8 字符集的字符串值。这样一个 RSocketRequester 注册表可用于应用程序逻辑中,以确定与特定客户端连接进行交互,或将同一消息发布给所有已连接的客户端。当客户端建立连接时,RSocketConnectedEvent 会从 ServerRSocketConnector 发出。这类似于 Spring Messaging 模块中提供的 @ConnectMapping 注解的功能。映射模式 * 表示接受所有客户端路由。The RSocketConnectedEvent can be used to distinguish different routes via DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER header.spring-doc.cadn.net.cn

典型的服务器配置可能如下所示:spring-doc.cadn.net.cn

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ServerRSocketConnector serverRSocketConnector() {
    ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
    serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
    serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
    serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
    serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
                                    + headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
    return serverRSocketConnector;
}

@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
	...
}

所有选项,包括 RSocketStrategies 的 bean 和 @EventListener 用于 RSocketConnectedEvent,都是可选的。 有关更多信息,请参阅 ServerRSocketConnector JavaDocs。spring-doc.cadn.net.cn

从版本 5.2.1 开始,ServerRSocketMessageHandler 被提取为一个公共的顶层类,以便与现有的 RSocket 服务器进行连接。 当提供一个外部 ServerRSocketMessageHandler 实例并传入 ServerRSocketConnector 时,它不会在内部创建 RSocket 服务器,而是将所有处理逻辑委托给提供的实例。 此外,ServerRSocketMessageHandler 可以配置一个 messageMappingCompatible 标志,以便也为 RSocket 控制器处理 @MessageMapping,从而完全替代标准 RSocketMessageHandler 提供的功能。 这在混合配置中非常有用,例如当应用程序中同时存在经典的 @MessageMapping 方法、RSocket 通道适配器以及已外部配置的 RSocket 服务器时。spring-doc.cadn.net.cn

The ClientRSocketConnector 作为基于 RSocketRSocketRequester 的持有者,通过提供的 ClientTransport 进行连接。 The RSocketConnector 可以使用提供的 RSocketConnectorConfigurer 进行自定义。 The setupRoute(可选模板变量)和带有元数据的 setupData 也可以在此组件上进行配置。spring-doc.cadn.net.cn

典型的客户端配置可能如下所示:spring-doc.cadn.net.cn

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ClientRSocketConnector clientRSocketConnector() {
    ClientRSocketConnector clientRSocketConnector =
            new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
    clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
    clientRSocketConnector.setSetupRoute("clientConnect/{user}");
    clientRSocketConnector.setSetupRouteVariables("myUser");
    return clientRSocketConnector;
}

大多数这些选项(包括 RSocketStrategies bean)都是可选的。 请注意我们如何连接到本地启动的 RSocket 服务器所使用的任意端口。 有关示例用例,请参见 ServerRSocketConnector.clientRSocketKeyStrategy 中的 setupData。 此外,请参阅 ClientRSocketConnector 及其 AbstractRSocketConnector 超类的 Javadocs 以获取更多信息。spring-doc.cadn.net.cn

Both ClientRSocketConnector and ServerRSocketConnector 都负责将入站通道适配器映射到其 path 配置,以路由传入的 RSocket 请求。 有关更多信息,请参阅下一节。spring-doc.cadn.net.cn

RSocket 入站网关

RSocketInboundGateway负责接收RSocket请求并生成响应(如果有)。它需要一个包含path个映射的数组,该映射可以是类似于MVC请求映射的模式或@MessageMapping种语义。此外,(自版本 5.起)2.2), 一组交互模型(见RSocketInteractionModel)可在RSocketInboundGateway上配置,以通过特定帧类型限制RSocket请求到该端点。默认情况下,所有交互模型均受支持。这样一个 Bean,根据其IntegrationRSocketEndpoint实现(对ReactiveMessageHandler的扩展),会通过ServerRSocketConnectorClientRSocketConnector自动检测,以便在内部IntegrationRSocketMessageHandler中为传入请求执行路由逻辑。一个 AbstractRSocketConnector 可以被提供给 RSocketInboundGateway 用于显式端点注册。通过这种方式,该AbstractRSocketConnector上的自动检测选项将被禁用。RSocketStrategies 也可以被注入到 RSocketInboundGateway 中,或者从提供的 AbstractRSocketConnector 中获取,从而覆盖任何显式注入。解码器用于从这些 RSocketStrategies 中根据提供的 requestElementType 来解码请求负载。如果传入的 RSocketPayloadReturnValueHandler.RESPONSE_HEADER 头未提供,Message 将把请求视为 fireAndForget RSocket 交互模型。在此情况下,RSocketInboundGatewayoutputChannel 执行一个简单的 send 操作。否则,将使用来自 RSocketPayloadReturnValueHandler.RESPONSE_HEADER 头部的 MonoProcessor 值向 RSocket 发送回复。为此目的,一个RSocketInboundGatewayoutputChannel执行sendAndReceiveMessageReactive操作。根据 MessagingRSocket 逻辑,要向下游发送的消息的 payload 始终为 Flux。当处于 fireAndForget RSocket 交互模型时,消息具有一个普通的转换 payload。回复 payload 可以是一个普通对象或一个 Publisher - RSocketInboundGateway 会根据 RSocketStrategies 中提供的编码器,将两者正确地转换为 RSocket 响应。spring-doc.cadn.net.cn

从 5.3 版本开始,decodeFluxAsUnit 选项(默认值为 false)已添加到 RSocketInboundGateway 中。 默认情况下,传入的 Flux 会以每个事件单独解码的方式进行转换。 这与当前 @MessageMapping 语义下的确切行为一致。 若要恢复之前的行为或根据应用程序需求将整个 Flux 作为单个单元进行解码,则需将 decodeFluxAsUnit 设置为 true。 然而,目标解码逻辑取决于所选的 Decoder,例如 StringDecoder 要求流中存在换行符(默认情况)以指示字节缓冲区的结束。spring-doc.cadn.net.cn

查看 使用 Java 配置 RSocket 端点 以获取如何配置 RSocketInboundGateway 端点并在下游处理负载的示例。spring-doc.cadn.net.cn

RSocket 出站网关

The RSocketOutboundGateway 是一个用于向 RSocket 发起请求并根据 RSocket 回复(如果有)生成回复的 AbstractReplyProducingMessageHandler。 底层的 RSocket 协议交互被委托给一个从提供的 ClientRSocketConnector 或服务器端请求消息中的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER 头信息中解析出的 RSocketRequester。 服务器端的目标 RSocketRequester 可以从 RSocketConnectedEvent 解析,或者根据为连接请求映射所选的某个业务键,通过 ServerRSocketConnector.setClientRSocketKeyStrategy() 使用 ServerRSocketConnector.getClientRSocketRequester() API 进行解析。 有关更多信息,请参阅 ServerRSocketConnector JavaDocs。spring-doc.cadn.net.cn

发送请求的 route 必须显式配置(与路径变量一起)或通过针对请求消息求值的 SpEL 表达式进行配置。spring-doc.cadn.net.cn

RSocket 交互模型可通过 RSocketInteractionModel 选项或相应的表达式设置提供。 默认情况下,对于通用网关用例会使用 requestResponsespring-doc.cadn.net.cn

当请求消息负载为Publisher时,可提供一个publisherElementType选项,根据目标RSocketRequester中提供的RSocketStrategies对其元素进行编码。 此选项的表达式可求值为ParameterizedTypeReference。 有关数据和其类型的更多信息,请参阅RSocketRequester.RequestSpec.data() JavaDocs。spring-doc.cadn.net.cn

RSocket 请求也可以通过 metadata 进行增强。 为此,可以在 RSocketOutboundGateway 上针对请求消息配置一个 metadataExpression。 此类表达式必须求值为 Map<Object, MimeType>spring-doc.cadn.net.cn

interactionModel不等于fireAndForget时,必须提供expectedResponseType。 默认情况下,它是一个String.class。 此选项的表达式可求值为ParameterizedTypeReference。 有关回复数据及其类型的更多信息,请参阅RSocketRequester.RetrieveSpec.retrieveMono()RSocketRequester.RetrieveSpec.retrieveFlux()的 JavaDocs。spring-doc.cadn.net.cn

来自 RSocketOutboundGateway 的回复 payload 是一个 Mono(即使对于 fireAndForget 交互模型,它也是 Mono<Void>),始终使该组件成为 async。 这样的 Mono 在产生到用于常规通道的 outputChannel 之前会被订阅,或者由 FluxMessageChannel 按需处理。 针对 requestStreamrequestChannel 交互模型的 Flux 响应也会被包装成一个回复 Mono。 它可以通过带有直通服务激活器的 FluxMessageChannel 在下游进行扁平化处理:spring-doc.cadn.net.cn

@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
    return payload;
}

或在目标应用程序逻辑中显式订阅。spring-doc.cadn.net.cn

预期的响应类型也可以配置(或通过表达式评估)为 void,将此网关视为出站通道适配器。 然而,outputChannel 仍必须被配置(即使只是一个 NullChannel),以启动对返回的 Mono 的订阅。spring-doc.cadn.net.cn

查看 使用 Java 配置 RSocket 端点 以获取如何配置 RSocketOutboundGateway 端点并处理下游负载的示例。spring-doc.cadn.net.cn

RSocket 命名空间支持

Spring Integration 提供了一个rsocket命名空间及相应的架构定义。 要将其包含在您的配置中,请在应用程序上下文配置文件中添加以下命名空间声明:spring-doc.cadn.net.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/rsocket
    https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
    ...
</beans>

传入

要使用 XML 配置 Spring Integration RSocket 入站通道适配器,您需要从 int-rsocket 命名空间中使用适当的 inbound-gateway 组件。 以下示例展示了如何配置它:spring-doc.cadn.net.cn

<int-rsocket:inbound-gateway id="inboundGateway"
                             path="testPath"
                             interaction-models="requestStream,requestChannel"
                             rsocket-connector="clientRSocketConnector"
                             request-channel="requestChannel"
                             rsocket-strategies="rsocketStrategies"
                             request-element-type="byte[]"/>

ClientRSocketConnectorServerRSocketConnector 应配置为通用的 <bean> 定义。spring-doc.cadn.net.cn

出站

<int-rsocket:outbound-gateway id="outboundGateway"
                              client-rsocket-connector="clientRSocketConnector"
                              auto-startup="false"
                              interaction-model="fireAndForget"
                              route-expression="'testRoute'"
                              request-channel="requestChannel"
                              publisher-element-type="byte[]"
                              expected-response-type="java.util.Date"
                              metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>

查看 spring-integration-rsocket.xsd 以获取所有 XML 属性的描述。spring-doc.cadn.net.cn

使用 Java 配置 RSocket 端点

以下示例展示了如何使用 Java 配置 RSocket 入站端点:spring-doc.cadn.net.cn

@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
    RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
    rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
    return rsocketInboundGateway;
}

@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
    return payload.next().map(String::toUpperCase);
}

在此配置中,ClientRSocketConnectorServerRSocketConnector 被假定具有用于在“echo”路径上自动检测此类端点的含义。 请注意 @Transformer 签名,它完全采用响应式方式处理 RSocket 请求并生成响应式回复。spring-doc.cadn.net.cn

以下示例展示了如何使用 Java DSL 配置 RSocket 入站网关:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
    return IntegrationFlow
        .from(RSockets.inboundGateway("/uppercase")
                   .interactionModels(RSocketInteractionModel.requestChannel))
        .<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
        .get();
}

在此配置中,ClientRSocketConnectorServerRSocketConnector被假定为具有特定含义,用于自动检测“/uppercase”路径上的此类端点,并期望的交互模型为“请求通道”。spring-doc.cadn.net.cn

以下示例展示了如何使用 Java 配置 RSocket 出站网关:spring-doc.cadn.net.cn

@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
    RSocketOutboundGateway rsocketOutboundGateway =
            new RSocketOutboundGateway(
                    new FunctionExpression<Message<?>>((m) ->
                        m.getHeaders().get("route_header")));
    rsocketOutboundGateway.setInteractionModelExpression(
            new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
    rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
    return rsocketOutboundGateway;
}

The setClientRSocketConnector() is required only for the client side. On the server side, the RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER header with an RSocketRequester value must be supplied in the request message.spring-doc.cadn.net.cn

以下示例展示了如何使用 Java DSL 配置 RSocket 出站网关:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlow
        .from(Function.class)
        .handle(RSockets.outboundGateway("/uppercase")
            .interactionModel(RSocketInteractionModel.requestResponse)
            .expectedResponseType(String.class)
            .clientRSocketConnector(clientRSocketConnector))
        .get();
}

有关如何在上述流程开始时使用提到的Function接口,请参阅IntegrationFlow作为网关以获取更多信息。spring-doc.cadn.net.cn