R2DBC 支持
R2DBC 支持
Spring Integration通过R2DBC驱动通过对数据库的响应式访问,提供通道适配器来接收和发送消息。
你需要把这种依赖性纳入你的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-r2dbc</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-r2dbc:6.0.9"
R2DBC 入站信道适配器
这R2dbc消息源是可轮询的消息源基于R2dbc实体作并产生具有通量或单作为从数据库获取数据的有效载荷,根据expectSingleResult选择。
查询选择可以静态提供,也可以基于 SpEL 表达式,该表达式对每个 都进行评估接收()叫。
这R2dbcMessageSource.SelectCreator作为评估上下文的根对象存在,以便使用StatementMapper.SelectSpec流利API。
默认情况下,该通道适配器将选择记录映射为LinkedCaseInsensitiveMap实例。
它可以进行定制,提供有效载荷类型选项,下面由EntityRowMapper基于this.r2dbcEntityOperations.getConverter().
这更新SQL是可选的,用于标记数据库中的已读记录,以便跳过后续轮询。
这更新作可以由BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>将值绑定为更新根据选择结果。
该通道适配器的典型配置可能如下:
@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
"SELECT * FROM person WHERE name='Name'");
r2dbcMessageSource.setPayloadType(Person.class);
r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
r2dbcMessageSource.setBindFunction(
(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
return r2dbcMessageSource;
}
Java DSL 的通道适配器配置如下:
@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
return IntegrationFlow
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
(selectCreator) ->
selectCreator.createSelect("person")
.withProjection("*")
.withCriteria(Criteria.where("id").is(1)))
.expectSingleResult(true)
.payloadType(Person.class)
.updateSql("UPDATE Person SET id='2' where id = :id")
.bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
bindSpec.bind("id", o.getId())),
e -> e.poller(p -> p.fixedDelay(100)))
.<Mono<?>>handle((p, h) -> p, e -> e.async(true))
.channel(MessageChannels.flux())
.get();
}
R2DBC 出站信道适配器
这R2dbcMessageHandler是响应式消息处理器实现以执行插入(默认),更新或删除在数据库中查询,使用提供的R2dbc实体作.
这R2dbcMessageHandler.Type可以静态配置,也可以通过针对请求消息的 SpEL 表达式进行配置。
要执行的查询可以基于tableName,值和标准表达式选项或(如果tableName未提供)整个消息有效载荷被视为org.springframework.data.relational.core.mapping.Table用于执行SQL的实体。
包装org.springframework.data.relational.core.query被注册为导入到 SpEL 评估上下文中,以便直接访问标准流流 API,用于更新和删除查询。
这价值表达式用于插入和更新并且必须被计算为地图用于列-值对对请求消息对目标表进行更改。
该通道适配器的典型配置可能如下:
@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
messageHandler.setCriteriaExpression(
EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
return messageHandler;
}
Java DSL 的通道适配器配置如下:
.handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
.queryType(R2dbcMessageHandler.Type.UPDATE)
.tableNameExpression("payload.class.simpleName")
.criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
.values("{age:36}"))