|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
reactive() 端点
从版本 5.5 开始,ConsumerEndpointSpec 提供了一个带有可选自定义器 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> 的 reactive() 配置属性。
此选项将目标端点配置为 ReactiveStreamsConsumer 实例,独立于输入通道类型,该类型通过 IntegrationReactiveUtils.messageChannelToFlux() 转换为 Flux。
提供的函数用于从 Flux.transform() 操作符中自定义来自输入通道的响应式流源(例如 publishOn()、log()、doOnNext() 等)。
以下示例演示了如何独立于最终订阅者和生产者,将输入通道的发布线程更改为 DirectChannel:
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.transformWith(t -> t
.<String, Integer>transformer(Integer::parseInt)
.reactive(flux -> flux.publishOn(Schedulers.parallel()))
)
.get();
}
请参阅 响应式流支持 以获取更多信息。