如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

响应式流支持

Spring Integration 在框架的部分区域以及从不同方面提供了对响应式流交互的支持。 我们将在此讨论其中大部分内容,并在必要时提供指向相关章节的适当链接以获取详细信息。spring-doc.cadn.net.cn

前言

简而言之,Spring Integration 扩展了 Spring 编程模型以支持众所周知的企业集成模式。 Spring Integration 使得基于 Spring 的应用程序能够进行轻量级消息传递,并支持通过声明式适配器与外部系统集成。 Spring Integration 的主要目标是为构建企业集成解决方案提供一个简单的模型,同时保持对于生成可维护、可测试代码至关重要的关注点分离。 这一目标通过在目标应用程序中使用一等公民(first class citizens),如 messagechannelendpoint 来实现,它们使我们能够构建一个集成流(管道),其中(在大多数情况下)一个端点将消息生产到通道中,供另一个端点消费。 通过这种方式,我们将集成交互模型与目标业务逻辑区分开来。 其中的关键部分是中间的通道:流的行为取决于其实现,而端点保持不变。spring-doc.cadn.net.cn

另一方面,Reactive Streams 是一种用于异步流处理的标准,支持非阻塞背压机制。Reactive Streams 的主要目标是管理流数据在异步边界(例如将元素传递到另一个线程或线程池)之间的交换,同时确保接收端不会被强制缓冲任意数量的数据。换句话说,背压是该模型不可或缺的一部分,以便允许在线程之间进行中介的队列是有界的。响应式流(Reactive Streams)实现(如 Project Reactor)的意图是在整个流应用程序的处理图中保持这些优势和特点。响应式流库的终极目标是为目标应用程序提供类型、一组操作符和支持 API,以尽可能透明和流畅的方式利用现有编程语言结构,但最终解决方案不像普通函数链调用那样具有命令式特征。它分为两个阶段:定义和执行。执行发生在稍后订阅最终响应式发布器时,数据需求会从定义的底部向顶部推送,并根据需要应用背压——我们仅请求当前能够处理的事件数量。响应式应用程序看起来像 "stream",或者按照我们在 Spring Integration 中习惯的说法 - "flow"。事实上,自 Java 9 起,响应式流 SPI 已在 java.util.concurrent.Flow 类中展示。spring-doc.cadn.net.cn

从这里看,当我们对端点应用一些响应式框架操作符时,Spring Integration 流程似乎非常适合编写响应式流(Reactive Streams)应用程序,但实际上问题要广泛得多,我们需要记住,并非所有端点(例如 JdbcMessageHandler)都能在响应式流中被透明地处理。 当然,Spring Integration 中支持响应式流的主要目标是使整个过程完全响应式、按需启动并准备好背压(back-pressure)。 在目标协议和通道适配器所连接的系统提供响应式流交互模型之前,这是无法实现的。 在接下来的章节中,我们将介绍 Spring Integration 为开发保留集成流结构的响应式应用程序所提供的组件和方法。spring-doc.cadn.net.cn

Spring Integration 中所有基于 Project Reactor 类型(如 FluxMono)实现的响应式流交互。

消息网关

与响应式流(Reactive Streams)交互的最简单方式是将网关方法的返回类型设置为 @MessagingGateway,此时整个集成流程将在对返回的 Mono 实例进行订阅时执行。查看 Reactor Mono 以获取更多信息。框架内部对完全基于响应式流兼容协议的内向网关使用了类似的 Mono 回复方法(更多信息请参阅下文中的 响应式通道适配器)。发送和接收操作被封装在 Mono.defer() 中,并在可用时通过 replyChannel 头部链接回复评估。这样,针对特定响应式协议(例如。g.Netty)将作为在 Spring Integration 上执行的响应式流的订阅者和发起者。如果请求负载是响应式类型,最好在响应式流定义中处理它,将处理推迟到发起者订阅。为此,处理器方法也必须返回响应式类型。查看下一节以获取更多信息。spring-doc.cadn.net.cn

响应式回复负载

当生成回复的 MessageHandler 返回用于回复消息的响应式类型负载时,它将以异步方式处理,并为 outputChannel 提供常规的 MessageChannel 实现(必须将 async 设置为 true),并在输出通道为 ReactiveStreamsSubscribableChannel 实现(例如 FluxMessageChannel)时,通过按需订阅进行扁平化处理。 对于标准的命令式 MessageChannel 用例,如果回复负载是多值发布者(有关更多信息,请参阅 ReactiveAdapter.isMultiValue()),则它将被包装到 Mono.just() 中。 因此,Mono 必须在下游显式订阅或由下游的 FluxMessageChannel 进行扁平化处理。 对于 outputChannel 使用 ReactiveStreamsSubscribableChannel 的情况,无需担心返回类型和订阅;框架内部会平滑地处理所有内容。spring-doc.cadn.net.cn

查看 异步服务激活器 以获取更多信息。spring-doc.cadn.net.cn

另请参阅 Kotlin 协程 以获取更多信息。spring-doc.cadn.net.cn

FluxMessageChannelReactiveStreamsConsumer

FluxMessageChannelMessageChannelPublisher<Message<?>>的组合实现。 作为热源的Flux在内部创建,用于接收来自send()实现的传入消息。 Publisher.subscribe()的实现被委托给该内部的Flux。 此外,为了满足按需上游消费的需求,FluxMessageChannelReactiveStreamsSubscribableChannel契约提供了实现。 为此通道提供的任何上游Publisher(例如见下面的源轮询通道适配器和拆分器),在该通道的订阅就绪时会自动订阅。 来自这些委托发布者的事件会被沉入上述的内部Flux中。spring-doc.cadn.net.cn

FluxMessageChannel的消费者必须是org.reactivestreams.Subscriber实例,以遵守响应式流(Reactive Streams)契约。 幸运的是,Spring Integration中所有的MessageHandler实现也都实现了来自Reactor项目的CoreSubscriber接口。 并且,由于中间存在一个ReactiveStreamsConsumer实现,整个集成流的配置对目标开发者来说是完全透明的。 在这种情况下,流的行为从命令式推送模型转变为响应式拉取模型。 此外,还可以使用ReactiveStreamsConsumer结合IntegrationReactiveUtils将任何MessageChannel转换为响应式源,从而使集成流部分具备响应式特性。spring-doc.cadn.net.cn

参见 FluxMessageChannel 以获取更多信息。spring-doc.cadn.net.cn

从版本 5.5 开始,ConsumerEndpointSpec 引入了一个 reactive() 选项,使流程中的端点独立于输入通道成为 ReactiveStreamsConsumer。 可以提供可选的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> 来通过 Flux.transform() 操作自定义来自输入通道的源 Flux,例如使用 publishOn()doOnNext()retry() 等。 此功能作为所有消息注解(如 @ServiceActivator@Splitter 等)的 @Reactive 子注解表示,并通过其 reactive() 属性进行配置。spring-doc.cadn.net.cn

源轮询通道适配器

通常,SourcePollingChannelAdapter 依赖于由 TaskScheduler 启动的任务。轮询触发器是根据提供的选项构建的,用于周期性调度任务以轮询目标数据源或事件。当 outputChannelReactiveStreamsSubscribableChannel 时,相同的 Trigger 用于确定下一次执行时间,但此时 SourcePollingChannelAdapter 不再调度任务,而是根据 Flux.generate()nextExecutionTime 的值以及上一步的持续时间 Mono.delay() 创建一个 Flux<Message<?>>。一个 Flux.flatMapMany() 随后被用来轮询 maxMessagesPerPoll,并将它们汇入输出 Flux。此生成器 Flux 已订阅提供的 ReactiveStreamsSubscribableChannel,并遵循下游的反压机制。从版本 5 开始。5,当 maxMessagesPerPoll == 0 时,源根本不会被调用,flatMapMany() 会通过 Mono.empty() 结果立即完成,直到稍后 maxMessagesPerPoll 被更改为非零值,例如。g.通过控制总线。通过这种方式,任何 MessageSource 实现都可以转换为响应式热源。spring-doc.cadn.net.cn

请参阅 轮询消费者 以获取更多信息。spring-doc.cadn.net.cn

事件驱动通道适配器

MessageProducerSupport 是事件驱动通道适配器的基类,通常其 sendMessage(Message<?>) 用作生产端驱动 API 中的监听器回调。当消息生产者实现构建的是doOnNext()条消息的Flux流,而不是基于监听器的功能时,此回调也可以轻松插入到 Reactor 运算符中。实际上,当消息生产者的outputChannel不是ReactiveStreamsSubscribableChannel时,框架会自动完成此操作。然而,为了改善最终用户体验,并支持更多具备背压就绪功能的能力,MessageProducerSupport 提供了 subscribeToPublisher(Publisher<? extends Message<?>>) API,以便在目标实现中使用,当 Publisher<Message<?>>> 作为来自目标系统的数据源时。通常,当针对Publisher源数据调用目标驱动程序 API 时,会从doStart()实现中使用它。建议将响应式 MessageProducerSupport 实现与 FluxMessageChannel 结合使用,作为下游按需订阅和事件消费的 outputChannel。当取消对 Publisher 的订阅时,通道适配器将进入停止状态。调用此类通道适配器的 stop() 方法,将完成从源 Publisher 的生产过程。通道适配器可以重新启动,并自动订阅新创建的源 Publisherspring-doc.cadn.net.cn

消息源到响应式流

从版本 5 开始。3,提供了 ReactiveMessageSourceProducer。它是提供的 MessageSource 与事件驱动的生产结合到配置的 outputChannel 中。在内部,它将 MessageSource 封装到重复重新订阅的 Mono 中,从而生成一个 Flux<Message<?>>,以便在上述提到的 subscribeToPublisher(Publisher<? extends Message<?>>) 中进行订阅。该 Mono 的订阅是使用 Schedulers.boundedElastic() 完成的,以避免在目标 MessageSource 中可能出现的阻塞。当消息源返回 null(无数据可拉取)时,Mono 会基于订阅者上下文中的 IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY Duration 条目,转换为带有 delayrepeatWhenEmpty() 状态,以便进行后续的重新订阅。默认情况下,为 1 秒。如果 MessageSource 在头信息中生成带有 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 的消息,则会在原始 MonodoOnSuccess() 中进行确认(如需要),如果下游流抛出带有失败消息的 MessagingException 以拒绝,则在 doOnError() 中拒绝。这个 ReactiveMessageSourceProducer 可用于任何需要将以轮询方式工作的通道适配器功能转化为响应式、按需解决方案的场景,适用于任何现有的 MessageSource<?> 实现。spring-doc.cadn.net.cn

拆分器和聚合器

AbstractMessageSplitter的逻辑获得Publisher时,该流程会自然地遍历Publisher中的项,将它们映射为发送给outputChannel的消息。 如果此通道是ReactiveStreamsSubscribableChannel,则针对PublisherFlux包装器会从该通道按需订阅,此时这种拆分行为看起来更像是一个flatMap Reactor 操作符,当我们把传入的事件映射为多值输出Publisher时。 在拆分器前后都使用FluxMessageChannel构建整个集成流时最为合理,这使得 Spring Integration 配置与响应式流(Reactive Streams)的要求及其事件处理操作符保持一致。 对于普通通道,Publisher会被转换为Iterable以执行标准的迭代并产生(iterate-and-produce)拆分逻辑。spring-doc.cadn.net.cn

FluxAggregatorMessageHandler 是特定响应式流逻辑实现的另一个示例,就 Project Reactor 而言,它可以被视为 "reactive operator"。 它基于 Flux.groupBy()Flux.window()(或 buffer())操作符。 传入的消息被 sink 到 Flux.create() 中,该 Flux.create() 在创建 FluxAggregatorMessageHandler 时初始化,使其成为热源。 当需要时,ReactiveStreamsSubscribableChannel 会订阅此 Flux,或者直接在 FluxAggregatorMessageHandler.start() 中订阅,前提是 outputChannel 不具备响应式特性。 当整个集成流程在此组件前后都使用 FluxMessageChannel 构建时,此 MessageHandler 的优势得以体现,从而使整个逻辑具备背压能力。spring-doc.cadn.net.cn

请参阅 流和 Flux 拆分 以及 Flux 聚合器 以获取更多信息。spring-doc.cadn.net.cn

Java DSL

Java DSL 中的 IntegrationFlow 可以从任何 Publisher 实例开始(参见 IntegrationFlow.from(Publisher<Message<T>>))。 此外,使用 IntegrationFlowBuilder.toReactivePublisher() 操作符,IntegrationFlow 可以被转换为响应式热数据源。 在这两种情况下,内部都会使用 FluxMessageChannel;它可以根据其 ReactiveStreamsSubscribableChannel 合约订阅入站 Publisher,同时对于下游订阅者而言,它本身就是一个 Publisher<Message<?>>。 通过动态 IntegrationFlow 注册,我们可以实现强大的逻辑,将 Reactive Streams 与这种集成流相结合,并桥接至/从 Publisherspring-doc.cadn.net.cn

从版本 5 开始。5.6,存在一个 toReactivePublisher(boolean autoStartOnSubscribe) 操作符变体,用于控制返回的 Publisher<Message<?>> 背后整个 IntegrationFlow 的生命周期。通常在运行时后期发生来自响应式发布者的订阅和消费,而不是在响应式流组合期间,甚至在 ApplicationContext 启动期间。为了避免在 IntegrationFlowPublisher<Message<?>> 订阅点处为生命周期管理产生样板代码,并提升最终用户体验,现已引入带有 autoStartOnSubscribe 标志的新操作符。它标记(如果trueIntegrationFlow及其组件用于autoStartup = false,因此ApplicationContext不会自动启动流程中消息的生产和消费。相反,IntegrationFlowstart() 是从内部 Flux.doOnSubscribe() 初始化的。无论 autoStartOnSubscribe 的值如何,流程都会在 Flux.doOnCancel()Flux.doOnTerminate() 处停止——如果没有消费者来消费消息,继续生产消息是没有意义的。spring-doc.cadn.net.cn

对于完全相反的用例,当 IntegrationFlow 应该调用响应式流并在完成后继续时,IntegrationFlowDefinition 中提供了一个 fluxTransform() 操作符。 此时的流程被转换为一个 FluxMessageChannel,该转换被传播到提供的 fluxFunction 中,并在 Flux.transform() 操作符中执行。 函数的结果被封装在一个 Mono<Message<?>> 中,以便平展映射为输出 Flux,该输出由另一个 FluxMessageChannel 订阅以用于下游流程。spring-doc.cadn.net.cn

请参阅 Java DSL 章节 以获取更多信息。spring-doc.cadn.net.cn

ReactiveMessageHandler

从版本 5.3 开始,框架原生支持 ReactiveMessageHandler。 这种类型的消息处理器专为响应式客户端设计,它返回一个响应式类型以进行按需订阅,用于执行底层操作,但不提供任何回复数据来继续响应式流组合。 当在命令式集成流程中使用 ReactiveMessageHandler 时,handleMessage() 会在返回后立即被订阅,仅因为此类流程中没有响应式流组合来支持背压。 在这种情况下,框架会将此 ReactiveMessageHandler 包装为 ReactiveMessageHandlerAdapter——这是 MessageHandler 的一个纯实现。 然而,当流程中涉及 ReactiveStreamsConsumer(例如,消费通道是 FluxMessageChannel)时,这样的 ReactiveMessageHandler 会通过 flatMap() Reactor 运算符与整个响应式流组合,以在消费期间支持背压。spring-doc.cadn.net.cn

其中一个开箱即用的ReactiveMessageHandler实现是用于出站通道适配器的ReactiveMongoDbStoringMessageHandler。 有关更多信息,请参阅MongoDB 响应式通道适配器spring-doc.cadn.net.cn

从 6.1 版本开始,IntegrationFlowDefinition 暴露了一个便捷的 handleReactive(ReactiveMessageHandler) 终端操作符。 任何 ReactiveMessageHandler 实现(甚至仅使用 Mono API 的普通 lambda)都可以用于此操作符。 框架会自动订阅返回的 Mono<Void>。 以下是该操作符的一个简单配置示例:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow wireTapFlow1() {
    return IntegrationFlow.from("tappedChannel1")
            .wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
            .handleReactive((message) -> Mono.just(message).log().then());
}

该运算符的一个重载版本接受一个 Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>,用于围绕提供的 ReactiveMessageHandler 自定义消费者端点。spring-doc.cadn.net.cn

此外,还提供了基于 ReactiveMessageHandlerSpec 的变体。 在大多数情况下,它们用于特定于协议的通道适配器实现。 请参阅下一节,其中包含指向目标技术及其相应响应式通道适配器的链接。spring-doc.cadn.net.cn

响应式通道适配器

当集成的目标协议提供响应式流解决方案时,在 Spring Integration 中实现通道适配器变得非常简单。spring-doc.cadn.net.cn

一个入站、事件驱动的通道适配器实现,旨在将请求(如有必要)封装为延迟的 MonoFlux,并仅在协议组件对监听器方法返回的 Mono 发起订阅时执行发送(并在有回复时产生回复)。 通过这种方式,我们拥有一个完全封装在该组件中的响应式流解决方案。 当然,订阅在输出通道上的下游集成流应遵循响应式流规范,并以按需、支持背压的方式进行。spring-doc.cadn.net.cn

由于集成流中使用的 MessageHandler 处理器的性质(或当前实现),此功能并不总是可用。 当没有响应式实现时,可以通过在线程池和队列之间,或在集成端点之前和之后使用 FluxMessageChannel(参见上文)来处理此限制。spring-doc.cadn.net.cn

一个用于响应式事件驱动入站通道适配器的示例:spring-doc.cadn.net.cn

public class CustomReactiveMessageProducer extends MessageProducerSupport {

    private final CustomReactiveSource customReactiveSource;

    public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
        Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
        this.customReactiveSource = customReactiveSource;
    }

    @Override
    protected void doStart() {
        Flux<Message<?>> messageFlux =
            this.customReactiveSource
                .map(event - >
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());

        subscribeToPublisher(messageFlux);
    }
}

用法示例如下:spring-doc.cadn.net.cn

public class MainFlow {
  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .channel(outputChannel)
        .get();
  }
}

或者以声明式方式:spring-doc.cadn.net.cn

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
        .handle(outputChannel)
        .get();
  }
}

甚至在没有通道适配器的情况下,我们也可以始终按以下方式使用 Java DSL:spring-doc.cadn.net.cn

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
    Flux<Message<?>> myFlux = this.customReactiveSource
                .map(event ->
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());
     return IntegrationFlow.from(myFlux)
        .handle(outputChannel)
        .get();
  }
}

响应式出站通道适配器实现旨在根据为目标协议提供的响应式 API,启动(或继续)与外部系统的响应式流交互。 入站负载本身可以是响应式类型,也可以是整个集成流程的事件,而该事件是上层响应式流的一部分。 返回的响应式类型可以在单向、发后即忘的场景中立即订阅,或者在请求 - 回复场景中向下游传播,以便进行进一步的集成流程或在目标业务逻辑中进行显式订阅,但始终保留响应式流的语义。spring-doc.cadn.net.cn

一个响应式出站通道适配器的示例:spring-doc.cadn.net.cn

public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {

    private final CustomEntityOperations customEntityOperations;

    public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
        Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
        this.customEntityOperations = customEntityOperations;
    }

    @Override
    protected Mono<Void> handleMessageInternal(Message<?> message) {
        return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
                .flatMap(mode -> {
                    switch (mode) {
                        case INSERT:
                            return handleInsert(message);
                        case UPDATE:
                            return handleUpdate(message);
                        default:
                            return Mono.error(new IllegalArgumentException());
                    }
                }).then();
    }

    private Mono<Void> handleInsert(Message<?> message) {
        return this.customEntityOperations.insert(message.getPayload())
                .then();
    }

    private Mono<Void> handleUpdate(Message<?> message) {
        return this.r2dbcEntityOperations.update(message.getPayload())
                .then();
    }

    public enum Type {
        INSERT,
        UPDATE,
    }
}

我们将能够使用这两种通道适配器:spring-doc.cadn.net.cn

public class MainFlow {

  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Autowired
  private CustomReactiveMessageHandler customReactiveMessageHandler;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .transform(someOperation)
        .handle(customReactiveMessageHandler)
        .get();
  }
}

目前,Spring Integration 为 WebFluxRSocketMongoDbR2DBCZeroMQGraphQLApache Cassandra 提供了通道适配器(或网关)实现。 Redis 流通道适配器也是响应式的,并使用了 Spring Data 中的 ReactiveStreamOperations。 更多的响应式通道适配器即将到来,例如基于 Kafka 的 Apache Kafka 适配器,它依赖于 Spring for Apache Kafka 中的 ReactiveKafkaProducerTemplateReactiveKafkaConsumerTemplate 等。 对于许多其他非响应式通道适配器,建议使用线程池以避免在响应式流处理期间发生阻塞。spring-doc.cadn.net.cn

响应式到命令式上下文传播

上下文传播 库位于类路径上时,Project Reactor 可以接受 ThreadLocal 值(例如。g.Micrometer 观测SecurityContextHolder) 并将它们存储到 Subscriber 上下文中。相反的操作也是可能的,当我们需要为追踪填充日志 MDC,或让我们从响应式流中调用的服务恢复作用域中的观测时。有关其用于上下文传播的特殊运算符的更多信息,请参阅 Project Reactor 文档。如果我们的整个解决方案是单个响应式流组合,那么上下文存储和恢复将顺利进行,因为 Subscriber 上下文在从下游到组合开始处(FluxMono)都是可见的。但是,如果应用程序在不同 Flux 实例之间切换,或切换到命令式处理后再返回,则与 Subscriber 关联的上下文可能不可用。对于此类用例,Spring Integration 提供了一个额外的功能(从版本 6.0.5 开始),可以将 Reactor ContextView 存储到由响应式流生成的 IntegrationMessageHeaderAccessor.REACTOR_CONTEXT 消息标头中,例如。g.当我们执行直接的 send() 操作时。此标头随后在 FluxMessageChannel.subscribeTo() 中使用,以恢复该通道将要发射的 Message 所需的 Reactor 上下文。当前,此标题由 WebFluxInboundEndpointRSocketInboundGateway 组件填充,但也可用于任何需要响应式与命令式集成的解决方案中。填充此标头的逻辑如下:spring-doc.cadn.net.cn

return requestMono
        .flatMap((message) ->
                Mono.deferContextual((context) ->
                        Mono.just(message)
                                .handle((messageToSend, sink) ->
                                        send(messageWithReactorContextIfAny(messageToSend, context)))));
...

private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
    if (!context.isEmpty()) {
        return getMessageBuilderFactory()
                .fromMessage(message)
                .setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
                .build();
    }
    return message;
}

注意,我们仍然需要使用 handle() 操作符,以便 Reactor 从上下文中恢复 ThreadLocal 值。 即使它作为头信息发送,框架也无法假设下游是否会将其恢复到 ThreadLocal 值。spring-doc.cadn.net.cn

要从另一个 FluxMono 组合恢复 Message 的上下文,可以执行以下逻辑:spring-doc.cadn.net.cn

Mono.just(message)
        .handle((messageToHandle, sink) -> ...)
        .contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));