此版本仍在开发中,尚未被视为稳定版。如需最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

R2DBC 支持

Spring Integration 提供了通道适配器,用于通过 R2DBC 驱动程序对数据库进行响应式访问来接收和发送消息。spring-doc.cadn.net.cn

此依赖项是项目所必需的:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-r2dbc</artifactId>
    <version>7.1.0-SNAPSHOT</version>
</dependency>
implementation "org.springframework.integration:spring-integration-r2dbc:7.1.0-SNAPSHOT"

R2DBC 入站通道适配器

The R2dbcMessageSource 是一个基于 R2dbcEntityOperations 的可轮询 MessageSource 实现,它根据 expectSingleResult 选项从数据库获取数据,并生成以 FluxMono 为负载的消息。 查询 SELECT 可以静态提供,也可以基于在每次 receive() 调用时求值的 SpEL 表达式。 R2dbcMessageSource.SelectCreator 作为评估上下文的根对象存在,以便可以使用 StatementMapper.SelectSpec 的流畅 API。 默认情况下,此通道适配器将 select 中的记录映射到 LinkedCaseInsensitiveMap 实例中。 可以通过提供 payloadType 选项来自定义它,该选项被 EntityRowMapper 基于 this.r2dbcEntityOperations.getConverter() 在底层使用。 updateSql 是可选的,用于标记已读取的记录,以便在后续轮询中跳过它们。 UPDATE 操作可以提供 BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>,以便根据 SELECT 结果中的记录将值绑定到 UPDATE 中。spring-doc.cadn.net.cn

此通道适配器的典型配置可能如下所示:spring-doc.cadn.net.cn

@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
    R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
            "SELECT * FROM person WHERE name='Name'");
    r2dbcMessageSource.setPayloadType(Person.class);
    r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
    r2dbcMessageSource.setBindFunction(
				(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
    return r2dbcMessageSource;
}

使用 Java DSL,此通道适配器的配置如下:spring-doc.cadn.net.cn

@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlow
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
            (selectCreator) ->
                    selectCreator.createSelect("person")
                        .withProjection("*")
                        .withCriteria(Criteria.where("id").is(1)))
                    .expectSingleResult(true)
                    .payloadType(Person.class)
                    .updateSql("UPDATE Person SET id='2' where id = :id")
                    .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
                            bindSpec.bind("id", o.getId())),
            e -> e.poller(p -> p.fixedDelay(100)))
        .handle((p, h) -> p)
        .channel(MessageChannels.flux())
        .get();
}

R2DBC 出站通道适配器

The R2dbcMessageHandler 是一个 ReactiveMessageHandler 实现,用于使用提供的 R2dbcEntityOperations 在数据库中对 INSERT(默认)、UPDATEDELETE 执行查询。 The R2dbcMessageHandler.Type 可以静态配置,也可以针对请求消息通过 SpEL 表达式进行配置。 要执行的查询可以基于 tableNamevaluescriteria 表达式选项,或者(如果未提供 tableName)将整个消息负载视为一个 org.springframework.data.relational.core.mapping.Table 实体以对 SQL 进行操作。 包 org.springframework.data.relational.core.query 被注册为导入到 SpEL 求值上下文中,以便直接访问 Criteria 的流畅 API,该 API 用于 UPDATEDELETE 查询。 The valuesExpressionINSERTUPDATE 中使用,并且必须评估为 Map,以便对列 - 值对进行操作,从而根据请求消息对目标表执行更改。spring-doc.cadn.net.cn

此通道适配器的典型配置可能如下所示:spring-doc.cadn.net.cn

@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
    R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
    messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
    messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
    messageHandler.setCriteriaExpression(
        EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
    return messageHandler;
}

使用 Java DSL,此通道适配器的配置如下:spring-doc.cadn.net.cn

.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
        .queryType(R2dbcMessageHandler.Type.UPDATE)
        .tableNameExpression("payload.class.simpleName")
        .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
        .values("{age:36}"))