如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

R2DBC 支持

Spring Integration 提供了通道适配器,用于通过 R2DBC 驱动程序对数据库进行响应式访问来接收和发送消息。spring-doc.cadn.net.cn

您需要将以下依赖项包含到您的项目中:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-r2dbc</artifactId>
    <version>6.4.10</version>
</dependency>
compile "org.springframework.integration:spring-integration-r2dbc:6.4.10"

R2DBC 入站通道适配器

The R2dbcMessageSource 是一个基于 R2dbcEntityOperations 的可轮询 MessageSource 实现,它根据 expectSingleResult 选项从数据库获取数据,并生成以 FluxMono 作为有效负载的消息。 To SELECT 的查询可以静态提供,也可以基于在每个 receive() 调用时求值的 SpEL 表达式。 The R2dbcMessageSource.SelectCreator 作为评估上下文的根对象存在,以便可以使用 StatementMapper.SelectSpec 流畅 API。 默认情况下,此通道适配器将来自 select 的记录映射到 LinkedCaseInsensitiveMap 实例中。 可以通过提供 payloadType 选项来自定义,该选项由 EntityRowMapper 基于 this.r2dbcEntityOperations.getConverter() 在底层使用。 The updateSql 是可选的,用于标记数据库中已读取的记录,以便在后续轮询中跳过这些记录。 The UPDATE 操作可以提供 BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>,以便根据 SELECT 结果中的记录将值绑定到 UPDATE 中。spring-doc.cadn.net.cn

此通道适配器的典型配置可能如下所示:spring-doc.cadn.net.cn

@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
    R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
            "SELECT * FROM person WHERE name='Name'");
    r2dbcMessageSource.setPayloadType(Person.class);
    r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
    r2dbcMessageSource.setBindFunction(
				(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
    return r2dbcMessageSource;
}

使用 Java DSL,此通道适配器的配置如下:spring-doc.cadn.net.cn

@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlow
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
            (selectCreator) ->
                    selectCreator.createSelect("person")
                        .withProjection("*")
                        .withCriteria(Criteria.where("id").is(1)))
                    .expectSingleResult(true)
                    .payloadType(Person.class)
                    .updateSql("UPDATE Person SET id='2' where id = :id")
                    .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
                            bindSpec.bind("id", o.getId())),
            e -> e.poller(p -> p.fixedDelay(100)))
        .handle((p, h) -> p)
        .channel(MessageChannels.flux())
        .get();
}

R2DBC 出站通道适配器

The R2dbcMessageHandler 是用于使用提供的 R2dbcEntityOperations 在数据库中执行 INSERT(默认)、UPDATEDELETE 查询的 ReactiveMessageHandler 实现。 The R2dbcMessageHandler.Type 可以静态配置,也可以针对请求消息通过 SpEL 表达式进行配置。 要执行的查询可以基于 tableNamevaluescriteria 表达式选项,或者(如果未提供 tableName)将整个消息负载视为一个 org.springframework.data.relational.core.mapping.Table 实体以执行 SQL 操作。 包 org.springframework.data.relational.core.query 已注册为 SpEL 评估上下文的导入项,以便直接访问 Criteria 流式 API,该 API 用于 UPDATEDELETE 查询。 The valuesExpressionINSERTUPDATE 中使用,必须求值为 Map,以便针对请求消息对目标表中的列值对执行更改。spring-doc.cadn.net.cn

此通道适配器的典型配置可能如下所示:spring-doc.cadn.net.cn

@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
    R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
    messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
    messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
    messageHandler.setCriteriaExpression(
        EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
    return messageHandler;
}

使用 Java DSL,此通道适配器的配置如下:spring-doc.cadn.net.cn

.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
        .queryType(R2dbcMessageHandler.Type.UPDATE)
        .tableNameExpression("payload.class.simpleName")
        .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
        .values("{age:36}"))