对于最新稳定版本,请使用 Spring Integration 7.0.0spring-doc.cadn.net.cn

子流支持

一些如果。。。还发布订阅组件通过使用子流来指定其逻辑或映射的能力。 最简单的样本为.publishSubscribeChannel(),如下示例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

你也可以用分开达到同样的结果集成流程 @Bean定义,但我们希望你觉得子流式逻辑组合对你有用。 我们发现,这会让代码更短(因此更易读)。spring-doc.cadn.net.cn

从5.3版本开始,a广播能力频道-基于publishSubscribeChannel()实现方式用于配置经纪人支持的消息通道上的子流用户。 例如,我们现在可以将多个订阅者配置为子流Jms.publishSubscribeChannel():spring-doc.cadn.net.cn

@Bean
public JmsPublishSubscribeMessageChannelSpec jmsPublishSubscribeChannel() {
    return Jms.publishSubscribeChannel(jmsConnectionFactory())
                .destination("pubsub");
}

@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
    return f -> f
            .publishSubscribeChannel(jmsPublishSubscribeChannel,
                    pubsub -> pubsub
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel1")))
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}

类似的发布订阅子流组成提供.routeToRecipients()方法。spring-doc.cadn.net.cn

另一个例子是.discardFlow()而不是.discardChannel().filter()方法。spring-doc.cadn.net.cn

.route()值得特别关注。 请考虑以下例子:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

.channelMapping()正常运行路由器映射,但.subFlowMapping()把那个子流连接到主流。 换句话说,任一路由器的子流在之后返回主流.route().spring-doc.cadn.net.cn

有时,你需要参考现有的集成流程 @Bean来自.subFlowMapping(). 以下示例展示了如何实现:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow splitRouteAggregate() {
    return f -> f
            .split()
            .<Integer, Boolean>route(o -> o % 2 == 0,
                    m -> m
                            .subFlowMapping(true, oddFlow())
                            .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate();
}

@Bean
public IntegrationFlow oddFlow() {
    return f -> f.handle(m -> System.out.println("odd"));
}

@Bean
public IntegrationFlow evenFlow() {
    return f -> f.handle((p, h) -> "even");
}


在这种情况下,当你需要接收来自此类子流的回复并继续主流时,这个集成流程BEAN 参考(或其输入通道)必须用.gateway()如前例所示。 这oddFlow()前述示例中的引用并未被包裹到.gateway(). 因此,我们不期待该路由分支回复。 否则,你会遇到类似以下情况的例外情况:spring-doc.cadn.net.cn

Caused by: org.springframework.beans.factory.BeanCreationException:
    The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c)
    is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.
    This is the end of the integration flow.

当你将子流配置为 lambda 时,框架会处理与子流的请求-回复交互,无需网关。spring-doc.cadn.net.cn

子流量可以嵌套到任意深度,但我们不建议这样做。 事实上,即使在路由器的情况下,在流程中加入复杂的子流也会很快看起来像一盘意大利面,人类很难理解。spring-doc.cadn.net.cn

在DSL支持子流配置的情况下,通常需要一个通道用于配置的组件,而该子流以通道()该框架隐含地放置一个元素桥()分量输出通道与流量输入通道之间。 例如,在这Filter定义:spring-doc.cadn.net.cn

.filter(p -> p instanceof String, e -> e
	.discardFlow(df -> df
                         .channel(MessageChannels.queue())
                         ...)

框架内部创建了直达频道注射到中MessageFilter.discardChannel. 然后它将子流包裹成集成流程从这个隐式通道开始,订阅并放置一个通道()在流程中指定。 当集成流程BEAN 被用作子流参考(而非内联子流,例如 lambda),不需要这样的桥接,因为框架可以解析 Flow Bean 的第一个通道。 对于内联子流,输入通道尚未可用。spring-doc.cadn.net.cn