|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
响应式流支持
Spring Integration 在框架的部分区域以及从不同方面提供了对响应式流交互的支持。 我们将在此讨论其中大部分内容,并在必要时提供指向相关章节的适当链接以获取详细信息。
前言
简而言之,Spring Integration 扩展了 Spring 编程模型以支持众所周知的企业集成模式。
Spring Integration 使得基于 Spring 的应用程序能够进行轻量级消息传递,并支持通过声明式适配器与外部系统集成。
Spring Integration 的主要目标是为构建企业集成解决方案提供一个简单的模型,同时保持对于生成可维护、可测试代码至关重要的关注点分离。
这一目标通过在目标应用程序中使用一等公民(first class citizens),如 message、channel 和 endpoint 来实现,它们使我们能够构建一个集成流(管道),其中(在大多数情况下)一个端点将消息生产到通道中,供另一个端点消费。
通过这种方式,我们将集成交互模型与目标业务逻辑区分开来。
其中的关键部分是中间的通道:流的行为取决于其实现,而端点保持不变。
另一方面,Reactive Streams 是一种用于异步流处理的标准,支持非阻塞背压机制。Reactive Streams 的主要目标是管理流数据在异步边界(例如将元素传递到另一个线程或线程池)之间的交换,同时确保接收端不会被强制缓冲任意数量的数据。换句话说,背压是该模型不可或缺的一部分,以便允许在线程之间进行中介的队列是有界的。响应式流(Reactive Streams)实现(如 Project Reactor)的意图是在整个流应用程序的处理图中保持这些优势和特点。响应式流库的终极目标是为目标应用程序提供类型、一组操作符和支持 API,以尽可能透明和流畅的方式利用现有编程语言结构,但最终解决方案不像普通函数链调用那样具有命令式特征。它分为两个阶段:定义和执行。执行发生在稍后订阅最终响应式发布器时,数据需求会从定义的底部向顶部推送,并根据需要应用背压——我们仅请求当前能够处理的事件数量。响应式应用程序看起来像 "stream",或者按照我们在 Spring Integration 中习惯的说法 - "flow"。事实上,自 Java 9 起,响应式流 SPI 已在 java.util.concurrent.Flow 类中展示。
从这里看,当我们对端点应用一些响应式框架操作符时,Spring Integration 流程似乎非常适合编写响应式流(Reactive Streams)应用程序,但实际上问题要广泛得多,我们需要记住,并非所有端点(例如 JdbcMessageHandler)都能在响应式流中被透明地处理。
当然,Spring Integration 中支持响应式流的主要目标是使整个过程完全响应式、按需启动并准备好背压(back-pressure)。
在目标协议和通道适配器所连接的系统提供响应式流交互模型之前,这是无法实现的。
在接下来的章节中,我们将介绍 Spring Integration 为开发保留集成流结构的响应式应用程序所提供的组件和方法。
Spring Integration 中所有基于 Project Reactor 类型(如 Flux 和 Mono)实现的响应式流交互。 |
消息网关
与响应式流(Reactive Streams)交互的最简单方式是将网关方法的返回类型设置为 @MessagingGateway,此时整个集成流程将在对返回的 Mono 实例进行订阅时执行。查看 Reactor Mono 以获取更多信息。框架内部对完全基于响应式流兼容协议的内向网关使用了类似的 Mono 回复方法(更多信息请参阅下文中的 响应式通道适配器)。发送和接收操作被封装在 Mono.defer() 中,并在可用时通过 replyChannel 头部链接回复评估。这样,针对特定响应式协议(例如。g.Netty)将作为在 Spring Integration 上执行的响应式流的订阅者和发起者。如果请求负载是响应式类型,最好在响应式流定义中处理它,将处理推迟到发起者订阅。为此,处理器方法也必须返回响应式类型。查看下一节以获取更多信息。
响应式回复负载
当生成回复的 MessageHandler 返回用于回复消息的响应式类型负载时,它将以异步方式处理,并为 outputChannel 提供常规的 MessageChannel 实现(必须将 async 设置为 true),并在输出通道为 ReactiveStreamsSubscribableChannel 实现(例如 FluxMessageChannel)时,通过按需订阅进行扁平化处理。
对于标准的命令式 MessageChannel 用例,如果回复负载是多值发布者(有关更多信息,请参阅 ReactiveAdapter.isMultiValue()),则它将被包装到 Mono.just() 中。
因此,Mono 必须在下游显式订阅或由下游的 FluxMessageChannel 进行扁平化处理。
对于 outputChannel 使用 ReactiveStreamsSubscribableChannel 的情况,无需担心返回类型和订阅;框架内部会平滑地处理所有内容。
查看 异步服务激活器 以获取更多信息。
另请参阅 Kotlin 协程 以获取更多信息。
FluxMessageChannel和ReactiveStreamsConsumer
FluxMessageChannel是MessageChannel和Publisher<Message<?>>的组合实现。
作为热源的Flux在内部创建,用于接收来自send()实现的传入消息。
Publisher.subscribe()的实现被委托给该内部的Flux。
此外,为了满足按需上游消费的需求,FluxMessageChannel为ReactiveStreamsSubscribableChannel契约提供了实现。
为此通道提供的任何上游Publisher(例如见下面的源轮询通道适配器和拆分器),在该通道的订阅就绪时会自动订阅。
来自这些委托发布者的事件会被沉入上述的内部Flux中。
FluxMessageChannel的消费者必须是org.reactivestreams.Subscriber实例,以遵守响应式流(Reactive Streams)契约。
幸运的是,Spring Integration中所有的MessageHandler实现也都实现了来自Reactor项目的CoreSubscriber接口。
并且,由于中间存在一个ReactiveStreamsConsumer实现,整个集成流的配置对目标开发者来说是完全透明的。
在这种情况下,流的行为从命令式推送模型转变为响应式拉取模型。
此外,还可以使用ReactiveStreamsConsumer结合IntegrationReactiveUtils将任何MessageChannel转换为响应式源,从而使集成流部分具备响应式特性。
参见 FluxMessageChannel 以获取更多信息。
从版本 5.5 开始,ConsumerEndpointSpec 引入了一个 reactive() 选项,使流程中的端点独立于输入通道成为 ReactiveStreamsConsumer。
可以提供可选的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> 来通过 Flux.transform() 操作自定义来自输入通道的源 Flux,例如使用 publishOn()、doOnNext()、retry() 等。
此功能作为所有消息注解(如 @ServiceActivator、@Splitter 等)的 @Reactive 子注解表示,并通过其 reactive() 属性进行配置。
源轮询通道适配器
通常,SourcePollingChannelAdapter 依赖于由 TaskScheduler 启动的任务。轮询触发器是根据提供的选项构建的,用于周期性调度任务以轮询目标数据源或事件。当 outputChannel 是 ReactiveStreamsSubscribableChannel 时,相同的 Trigger 用于确定下一次执行时间,但此时 SourcePollingChannelAdapter 不再调度任务,而是根据 Flux.generate() 为 nextExecutionTime 的值以及上一步的持续时间 Mono.delay() 创建一个 Flux<Message<?>>。一个 Flux.flatMapMany() 随后被用来轮询 maxMessagesPerPoll,并将它们汇入输出 Flux。此生成器 Flux 已订阅提供的 ReactiveStreamsSubscribableChannel,并遵循下游的反压机制。从版本 5 开始。5,当 maxMessagesPerPoll == 0 时,源根本不会被调用,flatMapMany() 会通过 Mono.empty() 结果立即完成,直到稍后 maxMessagesPerPoll 被更改为非零值,例如。g.通过控制总线。通过这种方式,任何 MessageSource 实现都可以转换为响应式热源。
请参阅 轮询消费者 以获取更多信息。
事件驱动通道适配器
MessageProducerSupport 是事件驱动通道适配器的基类,通常其 sendMessage(Message<?>) 用作生产端驱动 API 中的监听器回调。当消息生产者实现构建的是doOnNext()条消息的Flux流,而不是基于监听器的功能时,此回调也可以轻松插入到 Reactor 运算符中。实际上,当消息生产者的outputChannel不是ReactiveStreamsSubscribableChannel时,框架会自动完成此操作。然而,为了改善最终用户体验,并支持更多具备背压就绪功能的能力,MessageProducerSupport 提供了 subscribeToPublisher(Publisher<? extends Message<?>>) API,以便在目标实现中使用,当 Publisher<Message<?>>> 作为来自目标系统的数据源时。通常,当针对Publisher源数据调用目标驱动程序 API 时,会从doStart()实现中使用它。建议将响应式 MessageProducerSupport 实现与 FluxMessageChannel 结合使用,作为下游按需订阅和事件消费的 outputChannel。当取消对 Publisher 的订阅时,通道适配器将进入停止状态。调用此类通道适配器的 stop() 方法,将完成从源 Publisher 的生产过程。通道适配器可以重新启动,并自动订阅新创建的源 Publisher。
消息源到响应式流
从版本 5 开始。3,提供了 ReactiveMessageSourceProducer。它是提供的 MessageSource 与事件驱动的生产结合到配置的 outputChannel 中。在内部,它将 MessageSource 封装到重复重新订阅的 Mono 中,从而生成一个 Flux<Message<?>>,以便在上述提到的 subscribeToPublisher(Publisher<? extends Message<?>>) 中进行订阅。该 Mono 的订阅是使用 Schedulers.boundedElastic() 完成的,以避免在目标 MessageSource 中可能出现的阻塞。当消息源返回 null(无数据可拉取)时,Mono 会基于订阅者上下文中的 IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY Duration 条目,转换为带有 delay 的 repeatWhenEmpty() 状态,以便进行后续的重新订阅。默认情况下,为 1 秒。如果 MessageSource 在头信息中生成带有 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 的消息,则会在原始 Mono 的 doOnSuccess() 中进行确认(如需要),如果下游流抛出带有失败消息的 MessagingException 以拒绝,则在 doOnError() 中拒绝。这个 ReactiveMessageSourceProducer 可用于任何需要将以轮询方式工作的通道适配器功能转化为响应式、按需解决方案的场景,适用于任何现有的 MessageSource<?> 实现。
拆分器和聚合器
当AbstractMessageSplitter的逻辑获得Publisher时,该流程会自然地遍历Publisher中的项,将它们映射为发送给outputChannel的消息。
如果此通道是ReactiveStreamsSubscribableChannel,则针对Publisher的Flux包装器会从该通道按需订阅,此时这种拆分行为看起来更像是一个flatMap Reactor 操作符,当我们把传入的事件映射为多值输出Publisher时。
在拆分器前后都使用FluxMessageChannel构建整个集成流时最为合理,这使得 Spring Integration 配置与响应式流(Reactive Streams)的要求及其事件处理操作符保持一致。
对于普通通道,Publisher会被转换为Iterable以执行标准的迭代并产生(iterate-and-produce)拆分逻辑。
FluxAggregatorMessageHandler 是特定响应式流逻辑实现的另一个示例,就 Project Reactor 而言,它可以被视为 "reactive operator"。
它基于 Flux.groupBy() 和 Flux.window()(或 buffer())操作符。
传入的消息被 sink 到 Flux.create() 中,该 Flux.create() 在创建 FluxAggregatorMessageHandler 时初始化,使其成为热源。
当需要时,ReactiveStreamsSubscribableChannel 会订阅此 Flux,或者直接在 FluxAggregatorMessageHandler.start() 中订阅,前提是 outputChannel 不具备响应式特性。
当整个集成流程在此组件前后都使用 FluxMessageChannel 构建时,此 MessageHandler 的优势得以体现,从而使整个逻辑具备背压能力。
请参阅 流和 Flux 拆分 以及 Flux 聚合器 以获取更多信息。
Java DSL
Java DSL 中的 IntegrationFlow 可以从任何 Publisher 实例开始(参见 IntegrationFlow.from(Publisher<Message<T>>))。
此外,使用 IntegrationFlowBuilder.toReactivePublisher() 操作符,IntegrationFlow 可以被转换为响应式热数据源。
在这两种情况下,内部都会使用 FluxMessageChannel;它可以根据其 ReactiveStreamsSubscribableChannel 合约订阅入站 Publisher,同时对于下游订阅者而言,它本身就是一个 Publisher<Message<?>>。
通过动态 IntegrationFlow 注册,我们可以实现强大的逻辑,将 Reactive Streams 与这种集成流相结合,并桥接至/从 Publisher。
从版本 5 开始。5.6,存在一个 toReactivePublisher(boolean autoStartOnSubscribe) 操作符变体,用于控制返回的 Publisher<Message<?>> 背后整个 IntegrationFlow 的生命周期。通常在运行时后期发生来自响应式发布者的订阅和消费,而不是在响应式流组合期间,甚至在 ApplicationContext 启动期间。为了避免在 IntegrationFlow 的 Publisher<Message<?>> 订阅点处为生命周期管理产生样板代码,并提升最终用户体验,现已引入带有 autoStartOnSubscribe 标志的新操作符。它标记(如果true)IntegrationFlow及其组件用于autoStartup = false,因此ApplicationContext不会自动启动流程中消息的生产和消费。相反,IntegrationFlow 的 start() 是从内部 Flux.doOnSubscribe() 初始化的。无论 autoStartOnSubscribe 的值如何,流程都会在 Flux.doOnCancel() 和 Flux.doOnTerminate() 处停止——如果没有消费者来消费消息,继续生产消息是没有意义的。
对于完全相反的用例,当 IntegrationFlow 应该调用响应式流并在完成后继续时,IntegrationFlowDefinition 中提供了一个 fluxTransform() 操作符。
此时的流程被转换为一个 FluxMessageChannel,该转换被传播到提供的 fluxFunction 中,并在 Flux.transform() 操作符中执行。
函数的结果被封装在一个 Mono<Message<?>> 中,以便平展映射为输出 Flux,该输出由另一个 FluxMessageChannel 订阅以用于下游流程。
请参阅 Java DSL 章节 以获取更多信息。
ReactiveMessageHandler
从版本 5.3 开始,框架原生支持 ReactiveMessageHandler。
这种类型的消息处理器专为响应式客户端设计,它返回一个响应式类型以进行按需订阅,用于执行底层操作,但不提供任何回复数据来继续响应式流组合。
当在命令式集成流程中使用 ReactiveMessageHandler 时,handleMessage() 会在返回后立即被订阅,仅因为此类流程中没有响应式流组合来支持背压。
在这种情况下,框架会将此 ReactiveMessageHandler 包装为 ReactiveMessageHandlerAdapter——这是 MessageHandler 的一个纯实现。
然而,当流程中涉及 ReactiveStreamsConsumer(例如,消费通道是 FluxMessageChannel)时,这样的 ReactiveMessageHandler 会通过 flatMap() Reactor 运算符与整个响应式流组合,以在消费期间支持背压。
其中一个开箱即用的ReactiveMessageHandler实现是用于出站通道适配器的ReactiveMongoDbStoringMessageHandler。
有关更多信息,请参阅MongoDB 响应式通道适配器。
从 6.1 版本开始,IntegrationFlowDefinition 暴露了一个便捷的 handleReactive(ReactiveMessageHandler) 终端操作符。
任何 ReactiveMessageHandler 实现(甚至仅使用 Mono API 的普通 lambda)都可以用于此操作符。
框架会自动订阅返回的 Mono<Void>。
以下是该操作符的一个简单配置示例:
@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlow.from("tappedChannel1")
.wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
.handleReactive((message) -> Mono.just(message).log().then());
}
该运算符的一个重载版本接受一个 Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>,用于围绕提供的 ReactiveMessageHandler 自定义消费者端点。
此外,还提供了基于 ReactiveMessageHandlerSpec 的变体。
在大多数情况下,它们用于特定于协议的通道适配器实现。
请参阅下一节,其中包含指向目标技术及其相应响应式通道适配器的链接。
响应式通道适配器
当集成的目标协议提供响应式流解决方案时,在 Spring Integration 中实现通道适配器变得非常简单。
一个入站、事件驱动的通道适配器实现,旨在将请求(如有必要)封装为延迟的 Mono 或 Flux,并仅在协议组件对监听器方法返回的 Mono 发起订阅时执行发送(并在有回复时产生回复)。
通过这种方式,我们拥有一个完全封装在该组件中的响应式流解决方案。
当然,订阅在输出通道上的下游集成流应遵循响应式流规范,并以按需、支持背压的方式进行。
由于集成流中使用的 MessageHandler 处理器的性质(或当前实现),此功能并不总是可用。
当没有响应式实现时,可以通过在线程池和队列之间,或在集成端点之前和之后使用 FluxMessageChannel(参见上文)来处理此限制。
一个用于响应式事件驱动入站通道适配器的示例:
public class CustomReactiveMessageProducer extends MessageProducerSupport {
private final CustomReactiveSource customReactiveSource;
public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
this.customReactiveSource = customReactiveSource;
}
@Override
protected void doStart() {
Flux<Message<?>> messageFlux =
this.customReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
subscribeToPublisher(messageFlux);
}
}
用法示例如下:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.channel(outputChannel)
.get();
}
}
或者以声明式方式:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
.handle(outputChannel)
.get();
}
}
甚至在没有通道适配器的情况下,我们也可以始终按以下方式使用 Java DSL:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
Flux<Message<?>> myFlux = this.customReactiveSource
.map(event ->
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
return IntegrationFlow.from(myFlux)
.handle(outputChannel)
.get();
}
}
响应式出站通道适配器实现旨在根据为目标协议提供的响应式 API,启动(或继续)与外部系统的响应式流交互。 入站负载本身可以是响应式类型,也可以是整个集成流程的事件,而该事件是上层响应式流的一部分。 返回的响应式类型可以在单向、发后即忘的场景中立即订阅,或者在请求 - 回复场景中向下游传播,以便进行进一步的集成流程或在目标业务逻辑中进行显式订阅,但始终保留响应式流的语义。
一个响应式出站通道适配器的示例:
public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {
private final CustomEntityOperations customEntityOperations;
public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
this.customEntityOperations = customEntityOperations;
}
@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
.flatMap(mode -> {
switch (mode) {
case INSERT:
return handleInsert(message);
case UPDATE:
return handleUpdate(message);
default:
return Mono.error(new IllegalArgumentException());
}
}).then();
}
private Mono<Void> handleInsert(Message<?> message) {
return this.customEntityOperations.insert(message.getPayload())
.then();
}
private Mono<Void> handleUpdate(Message<?> message) {
return this.r2dbcEntityOperations.update(message.getPayload())
.then();
}
public enum Type {
INSERT,
UPDATE,
}
}
我们将能够使用这两种通道适配器:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Autowired
private CustomReactiveMessageHandler customReactiveMessageHandler;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.transform(someOperation)
.handle(customReactiveMessageHandler)
.get();
}
}
目前,Spring Integration 为 WebFlux、RSocket、MongoDb、R2DBC、ZeroMQ、GraphQL、Apache Cassandra 提供了通道适配器(或网关)实现。
Redis 流通道适配器也是响应式的,并使用了 Spring Data 中的 ReactiveStreamOperations。
更多的响应式通道适配器即将到来,例如基于 Kafka 的 Apache Kafka 适配器,它依赖于 Spring for Apache Kafka 中的 ReactiveKafkaProducerTemplate 和 ReactiveKafkaConsumerTemplate 等。
对于许多其他非响应式通道适配器,建议使用线程池以避免在响应式流处理期间发生阻塞。
响应式到命令式上下文传播
当 上下文传播 库位于类路径上时,Project Reactor 可以接受 ThreadLocal 值(例如。g.Micrometer 观测 或 SecurityContextHolder) 并将它们存储到 Subscriber 上下文中。相反的操作也是可能的,当我们需要为追踪填充日志 MDC,或让我们从响应式流中调用的服务恢复作用域中的观测时。有关其用于上下文传播的特殊运算符的更多信息,请参阅 Project Reactor 文档。如果我们的整个解决方案是单个响应式流组合,那么上下文存储和恢复将顺利进行,因为 Subscriber 上下文在从下游到组合开始处(Flux 或 Mono)都是可见的。但是,如果应用程序在不同 Flux 实例之间切换,或切换到命令式处理后再返回,则与 Subscriber 关联的上下文可能不可用。对于此类用例,Spring Integration 提供了一个额外的功能(从版本 6.0.5 开始),可以将 Reactor ContextView 存储到由响应式流生成的 IntegrationMessageHeaderAccessor.REACTOR_CONTEXT 消息标头中,例如。g.当我们执行直接的 send() 操作时。此标头随后在 FluxMessageChannel.subscribeTo() 中使用,以恢复该通道将要发射的 Message 所需的 Reactor 上下文。当前,此标题由 WebFluxInboundEndpoint 和 RSocketInboundGateway 组件填充,但也可用于任何需要响应式与命令式集成的解决方案中。填充此标头的逻辑如下:
return requestMono
.flatMap((message) ->
Mono.deferContextual((context) ->
Mono.just(message)
.handle((messageToSend, sink) ->
send(messageWithReactorContextIfAny(messageToSend, context)))));
...
private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
if (!context.isEmpty()) {
return getMessageBuilderFactory()
.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
.build();
}
return message;
}
注意,我们仍然需要使用 handle() 操作符,以便 Reactor 从上下文中恢复 ThreadLocal 值。
即使它作为头信息发送,框架也无法假设下游是否会将其恢复到 ThreadLocal 值。
要从另一个 Flux 或 Mono 组合恢复 Message 的上下文,可以执行以下逻辑:
Mono.just(message)
.handle((messageToHandle, sink) -> ...)
.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));