如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

子流程支持

一些 if…​elsepublish-subscribe 组件提供了使用子流程来指定其逻辑或映射的功能。 最简单的示例是 .publishSubscribeChannel(),如下例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

您可以使用单独的 IntegrationFlow @Bean 定义来实现相同的结果,但我们希望您会发现子流程式的逻辑组合方式非常有用。 我们发现这能生成更短(因此更易读)的代码。spring-doc.cadn.net.cn

从版本 5.3 开始,提供了一个基于 BroadcastCapableChannelpublishSubscribeChannel() 实现,用于在由代理支持的消息通道上配置子流订阅者。 例如,我们现在可以在 Jms.publishSubscribeChannel() 上配置多个订阅者作为子流:spring-doc.cadn.net.cn

@Bean
public JmsPublishSubscribeMessageChannelSpec jmsPublishSubscribeChannel() {
    return Jms.publishSubscribeChannel(jmsConnectionFactory())
                .destination("pubsub");
}

@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
    return f -> f
            .publishSubscribeChannel(jmsPublishSubscribeChannel,
                    pubsub -> pubsub
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel1")))
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}

类似的 publish-subscribe 子流程组合提供了 .routeToRecipients() 方法。spring-doc.cadn.net.cn

另一个例子是在 .filter() 方法中使用 .discardFlow() 而不是 .discardChannel()spring-doc.cadn.net.cn

.route() 值得特别关注。 考虑以下示例:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

.channelMapping() 继续像常规 Router 映射中那样工作,但 .subFlowMapping() 将该子流程与主流程绑定。 换句话说,任何路由器的子流程在 .route() 之后都会返回到主流程。spring-doc.cadn.net.cn

有时,您需要从 .subFlowMapping() 引用现有的 IntegrationFlow @Bean。 以下示例展示了如何实现这一点:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow splitRouteAggregate() {
    return f -> f
            .split()
            .<Integer, Boolean>route(o -> o % 2 == 0,
                    m -> m
                            .subFlowMapping(true, oddFlow())
                            .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate();
}

@Bean
public IntegrationFlow oddFlow() {
    return f -> f.handle(m -> System.out.println("odd"));
}

@Bean
public IntegrationFlow evenFlow() {
    return f -> f.handle((p, h) -> "even");
}


在这种情况下,当您需要从这样的子流程接收回复并继续主流程时,此IntegrationFlowbean 引用(或其输入通道)必须用.gateway()如前面的示例所示。 TheoddFlow()前述示例中的引用未被包装到.gateway(). 因此,我们预期不会从此路由分支收到回复。 否则,您最终会遇到类似于以下内容的异常:spring-doc.cadn.net.cn

Caused by: org.springframework.beans.factory.BeanCreationException:
    The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c)
    is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.
    This is the end of the integration flow.

当您将子流程配置为 lambda 时,框架将处理与子流程的请求 - 回复交互,因此不需要网关。spring-doc.cadn.net.cn

子流程可以嵌套到任意深度,但我们不建议这样做。 实际上,即使在路由器场景中,在流程中添加复杂的子流程也会迅速变得像一盘意大利面,难以被人解析。spring-doc.cadn.net.cn

在 DSL 支持子流配置的情况下,当为正在配置的组件通常需要通道,且该子流以 channel() 元素开头时,框架会在组件输出通道与流的输入通道之间隐式放置一个 bridge()。 例如,在此 filter 定义中:spring-doc.cadn.net.cn

.filter(p -> p instanceof String, e -> e
	.discardFlow(df -> df
                         .channel(MessageChannels.queue())
                         ...)

框架内部会创建一个 DirectChannel Bean,用于注入到 MessageFilter.discardChannel 中。 随后,它将子流程封装为一个 IntegrationFlow,以该隐式通道作为订阅的起点,并在流程中指定的 channel() 之前放置一个 bridge。 当使用现有的 IntegrationFlow Bean 作为子流程引用(而非内联子流程,例如 lambda 表达式)时,不需要此类桥接,因为框架可以从流程 Bean 中解析出第一个通道。 对于内联子流程,输入通道尚未可用。spring-doc.cadn.net.cn