如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

动态和运行时集成流程

IntegrationFlow 及其所有依赖组件都可以在运行时注册。 在 5.0 版本之前,我们使用 BeanFactory.registerSingleton() 钩子。 从 Spring Framework 5.0 开始,我们使用 instanceSupplier 钩子进行编程式 BeanDefinition 注册。 以下示例展示了如何编程式注册一个 Bean:spring-doc.cadn.net.cn

BeanDefinition beanDefinition =
         BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
               .getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

注意,在上述示例中,instanceSupplier钩子是genericBeanDefinition方法的最后一个参数,本例中由 lambda 提供。spring-doc.cadn.net.cn

所有必要的 Bean 初始化和生命周期管理均会自动完成,这与标准上下文配置 Bean 定义的行为一致。spring-doc.cadn.net.cn

为了简化开发体验,Spring Integration 引入了 IntegrationFlowContext 来在运行时注册和管理 IntegrationFlow 实例,如下示例所示:spring-doc.cadn.net.cn

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
    TestingUtilities.waitListening(this.server1, null);

    IntegrationFlow flow = f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client1"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
    assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

当我们拥有多个配置选项且需要创建多个相似流程的实例时,这将非常有用。 为此,我们可以遍历我们的选项,并在循环中创建并注册 IntegrationFlow 个实例。 另一种变体是当我们的数据源不是基于 Spring 的,因此我们必须动态地创建它。 如下示例所示,响应式流(Reactive Streams)事件源就是这样一个样本:spring-doc.cadn.net.cn

Flux<Message<?>> messageFlux =
    Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
    IntegrationFlow.from(messageFlux)
        .<Integer, Integer>transform(p -> p * 2)
        .channel(resultChannel)
        .get();

this.integrationFlowContext.registration(integrationFlow)
            .register();

IntegrationFlowRegistrationBuilder(作为IntegrationFlowContext.registration()的结果)可用于为要注册的IntegrationFlow指定 Bean 名称,控制其autoStartup,并注册非 Spring Integration 的 Bean。 通常,这些额外的 Bean 是连接工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化和反序列化器,或任何其他所需的支持组件。spring-doc.cadn.net.cn

当您不再需要时,可以使用 IntegrationFlowRegistration.destroy() 回调来移除动态注册的 IntegrationFlow 及其所有依赖的 Bean。 有关更多信息,请参阅 IntegrationFlowContext Javadocspring-doc.cadn.net.cn

从版本 5.0.6 开始,IntegrationFlow 定义中生成的所有 bean 名称都会以前缀形式添加流程 ID。 我们建议始终显式指定流程 ID。 否则,将在 IntegrationFlowContext 中启动同步屏障,以生成 IntegrationFlow 的 bean 名称并注册其 bean。 我们对这两个操作进行同步,以避免当相同的生成 bean 名称可能用于不同的 IntegrationFlow 实例时出现竞态条件。

此外,从 5.0.6 版本开始,注册构建器 API 新增了一个方法:useFlowIdAsPrefix()。 如果您希望声明同一流程的多个实例,并避免流程中具有相同 ID 的组件发生 Bean 名称冲突,此方法非常有用,如下示例所示:spring-doc.cadn.net.cn

private void registerFlows() {
    IntegrationFlowRegistration flow1 =
              this.flowContext.registration(buildFlow(1234))
                    .id("tcp1")
                    .useFlowIdAsPrefix()
                    .register();

    IntegrationFlowRegistration flow2 =
              this.flowContext.registration(buildFlow(1235))
                    .id("tcp2")
                    .useFlowIdAsPrefix()
                    .register();
}

private IntegrationFlow buildFlow(int port) {
    return f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());
}

在这种情况下,第一个流程的消息处理器可以通过 bean 名称 tcp1.client.handler 引用。spring-doc.cadn.net.cn

在使用 useFlowIdAsPrefix() 时,需要指定 id 属性。