如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

WebFlux 支持

WebFlux Spring 集成模块(spring-integration-webflux)允许以响应式方式执行 HTTP 请求和处理传入的 HTTP 请求。spring-doc.cadn.net.cn

您需要将以下依赖项包含到您的项目中:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-webflux</artifactId>
    <version>6.4.10</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:6.4.10"

在采用非Servlet-based服务器配置的情况下,必须包含io.projectreactor.netty:reactor-netty依赖项。spring-doc.cadn.net.cn

WebFlux 支持包含以下网关实现:WebFluxInboundEndpointWebFluxRequestExecutingMessageHandler。 该支持完全基于 Spring WebFluxProject Reactor 基础。 有关更多信息,请参阅 HTTP 支持,因为许多选项在响应式和常规 HTTP 组件之间是共享的。spring-doc.cadn.net.cn

WebFlux 命名空间支持

Spring Integration 提供了 webflux 命名空间及相应的架构定义。 要将其包含在您的配置中,请在应用程序上下文配置文件中添加以下命名空间声明:spring-doc.cadn.net.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/webflux
    https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
    ...
</beans>

WebFlux 入站组件

从版本 5.0 开始,WebFluxInboundEndpoint 实现了 WebHandler。 该组件类似于基于 MVC 的 HttpRequestHandlingEndpointSupport,并通过新提取的 BaseHttpInboundEndpoint 与其共享一些通用选项。 它用于 Spring WebFlux 响应式环境(替代 MVC)。 以下示例展示了一个简单的 WebFlux 端点实现:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
    return IntegrationFlow
        .from(WebFlux.inboundChannelAdapter("/reactivePost")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
            .statusCodeFunction(m -> HttpStatus.ACCEPTED))
        .channel(c -> c.queue("storeChannel"))
        .get();
}
@Bean
fun inboundChannelAdapterFlow() =
    integrationFlow(
        WebFlux.inboundChannelAdapter("/reactivePost")
            .apply {
                requestMapping { m -> m.methods(HttpMethod.POST) }
                requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
                statusCodeFunction { m -> HttpStatus.ACCEPTED }
            })
    {
        channel { queue("storeChannel") }
    }
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {

    @Bean
    public WebFluxInboundEndpoint simpleInboundEndpoint() {
        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setPathPatterns("/test");
        endpoint.setRequestMapping(requestMapping);
        endpoint.setRequestChannelName("serviceChannel");
        return endpoint;
    }

    @ServiceActivator(inputChannel = "serviceChannel")
    String service() {
        return "It works!";
    }

}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
    <int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>

该配置与之前的示例中提到的HttpRequestHandlingEndpointSupport类似,不同之处在于我们使用@EnableWebFlux将 WebFlux 基础设施添加到我们的集成应用中。 此外,WebFluxInboundEndpoint利用响应式 HTTP 服务器实现所提供的背压和按需能力,对下游流执行sendAndReceive操作。spring-doc.cadn.net.cn

回复部分也是非阻塞的,并且基于内部的 FutureReplyChannel,该值被平坦映射为用于按需解析的回复 Mono

您可以使用自定义的 WebFluxInboundEndpointServerCodecConfigurer 以及甚至 RequestedContentTypeResolver 来配置 ReactiveAdapterRegistry。 后者提供了一种机制,允许您以任意响应式类型返回响应:Reactor Flux、RxJava ObservableFlowable 以及其他类型。 通过这种方式,我们可以像以下示例所示,利用 Spring Integration 组件实现 服务端发送事件(Server Sent Events) 场景:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow sseFlow() {
    return IntegrationFlow
            .from(WebFlux.inboundGateway("/sse")
                    .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            .handle((p, h) -> Flux.just("foo", "bar", "baz"))
            .get();
}
@Bean
fun sseFlow() =
     integrationFlow(
            WebFlux.inboundGateway("/sse")
                       .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            {
                 handle { (p, h) -> Flux.just("foo", "bar", "baz") }
            }
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
    WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
    RequestMapping requestMapping = new RequestMapping();
    requestMapping.setPathPatterns("/sse");
    requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
    endpoint.setRequestMapping(requestMapping);
    endpoint.setRequestChannelName("requests");
    return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
                               path="test1"
                               auto-startup="false"
                               phase="101"
                               request-payload-type="byte[]"
                               error-channel="errorChannel"
                               payload-expression="payload"
                               supported-methods="PUT"
                               status-code-expression="'202'"
                               header-mapper="headerMapper"
                               codec-configurer="codecConfigurer"
                               reactive-adapter-registry="reactiveAdapterRegistry"
                               requested-content-type-resolver="requestedContentTypeResolver">
            <int-webflux:request-mapping headers="foo"/>
            <int-webflux:cross-origin origin="foo" method="PUT"/>
            <int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>

有关更多可能的配置选项,请参阅请求映射支持跨域资源共享 (CORS) 支持spring-doc.cadn.net.cn

当请求体为空或 payloadExpression 返回 null 时,将使用请求参数(MultiValueMap<String, String>)对目标消息的 payload 进行处理。spring-doc.cadn.net.cn

负载验证

从版本 5.2 开始,WebFluxInboundEndpoint 可以通过 Validator 进行配置。 与 HTTP 支持 中的 MVC 验证不同,它用于在回退和 payloadExpression 功能执行之前,验证由 HttpMessageReader 转换请求后所在的 Publisher 中的元素。 框架无法假设构建最终负载后 Publisher 对象的复杂程度。 如果需要对确切最终负载(或其 Publisher 元素)的验证可见性进行限制,则验证应移至 WebFlux 端点之后进行。 有关更多信息,请参阅 Spring WebFlux 文档。 无效的负载将被拒绝,并返回一个 IntegrationWebExchangeBindException(一种 WebExchangeBindException 扩展),其中包含所有验证 Errors。 有关验证的更多信息,请参见 Spring Framework 参考手册spring-doc.cadn.net.cn

WebFlux 出站组件

The WebFluxRequestExecutingMessageHandler(从版本 5.0 开始)的实现类似于 HttpRequestExecutingMessageHandler。 它使用了 Spring Framework WebFlux 模块中的 WebClient。 要配置它,请定义一个如下所示的 bean:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow outboundReactive() {
    return f -> f
        .handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                        .queryParams(m.getPayload())
                        .build()
                        .toUri())
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
    integrationFlow {
        handle(
            WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                    .queryParams(m.getPayload())
                    .build()
                    .toUri()
            })
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String::class.java)
        )
    }
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
    WebFluxRequestExecutingMessageHandler handler =
        new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
    handler.setHttpMethod(HttpMethod.POST);
    handler.setExpectedResponseType(String.class);
    return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
    request-channel="requests"
    url="http://localhost/test"
    http-method-expression="headers.httpMethod"
    extract-request-payload="false"
    expected-response-type-expression="payload"
    charset="UTF-8"
    reply-timeout="1234"
    reply-channel="replies"/>

<int-webflux:outbound-channel-adapter id="reactiveExample2"
    url="http://localhost/example"
    http-method="GET"
    channel="requests"
    charset="UTF-8"
    extract-payload="false"
    expected-response-type="java.lang.String"
    order="3"
    auto-startup="false"/>

The WebClient exchange() 操作返回一个 Mono<ClientResponse>,该结果通过多个 Mono.map() 步骤映射为 AbstractIntegrationMessageBuilder,作为 WebFluxRequestExecutingMessageHandler 的输出。与 ReactiveChannel 作为 outputChannel 一起,Mono<ClientResponse> 的求值会延迟到下游进行订阅时才会执行。否则,它被视为 async 模式,并且将 Mono 响应适配为 SettableListenableFuture,以便从 WebFluxRequestExecutingMessageHandler 进行异步回复。输出消息的目标负载取决于WebFluxRequestExecutingMessageHandler配置。The setExpectedResponseType(Class<?>) or setExpectedResponseTypeExpression(Expression) identifies the target type of the response body element conversion.如果将 replyPayloadToFlux 设置为 true,响应体将被转换为一个 Flux,其中每个元素都使用提供的 expectedResponseType,并且这个 Flux 将作为有效负载向下传递。之后,您可以使用 分割器 以响应式方式迭代此 Fluxspring-doc.cadn.net.cn

此外,可以将 BodyExtractor<?, ClientHttpResponse> 注入到 WebFluxRequestExecutingMessageHandler 中,而不是 expectedResponseTypereplyPayloadToFlux 属性。 它可用于低级访问 ClientHttpResponse,并对正文和 HTTP 头部的转换进行更精细的控制。 Spring Integration 提供了 ClientHttpResponseBodyExtractor 作为身份函数,以生成(下游)整个 ClientHttpResponse 以及任何其他可能的自定义逻辑。spring-doc.cadn.net.cn

从 5.2 版本开始,WebFluxRequestExecutingMessageHandler 支持将响应式 PublisherResourceMultiValueMap 类型作为请求消息负载。 相应的 BodyInserter 在内部使用,并填充到 WebClient.RequestBodySpec 中。 当负载为响应式 Publisher 时,可以配置 publisherElementTypepublisherElementTypeExpression 来确定发布者的元素类型。 该表达式必须解析为 Class<?>String,后者再解析为目标 Class<?>ParameterizedTypeReferencespring-doc.cadn.net.cn

从版本 5.5 开始,WebFluxRequestExecutingMessageHandler 暴露了一个 extractResponseBody 标志(默认值为 true),用于仅返回响应体,或返回整个 ResponseEntity 作为回复消息负载,而不受提供的 expectedResponseTypereplyPayloadToFlux 的影响。 如果 ResponseEntity 中不存在请求体,则忽略该标志并返回整个 ResponseEntityspring-doc.cadn.net.cn

查看 HTTP 出站组件 以获取更多可能的配置选项。spring-doc.cadn.net.cn

WebFlux 头部映射

由于 WebFlux 组件完全基于 HTTP 协议,因此在 HTTP 头映射方面没有区别。 有关更多可能的选项和用于映射头的组件,请参阅 HTTP 头映射spring-doc.cadn.net.cn

WebFlux 请求属性

从版本 6.0 开始,WebFluxRequestExecutingMessageHandler 可以配置为通过 setAttributeVariablesExpression() 评估请求属性。 此 SpEL 表达式必须在 Map 中求值。 随后,这样的映射会被传播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer) HTTP 请求配置回调中。 如果需要将键值对象形式的信息从 Message 传递给请求,且下游过滤器需要访问这些属性以进行进一步处理,这将非常有用。spring-doc.cadn.net.cn