|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
Apache Cassandra 支持
Spring Integration 提供了通道适配器(从 6.0 版本开始),用于对 Apache Cassandra 集群执行数据库操作。 它完全基于 Spring Data for Apache Cassandra 项目。
您需要将以下依赖项包含到您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-cassandra</artifactId>
<version>6.4.10</version>
</dependency>
compile "org.springframework.integration:spring-integration-cassandra:6.4.10"
Cassandra 出站组件
CassandraMessageHandler 是一个 AbstractReplyProducingMessageHandler 实现,可以在单向(默认)和请求 - 响应模式(一种 producesReply 选项)下工作。
它默认是异步的(setAsync(false) 用于重置),并对提供的 ReactiveCassandraOperations 执行响应式 INSERT、UPDATE、DELETE 或 STATEMENT 操作。
操作类型可通过 CassandraMessageHandler.Type 选项进行配置。
ingestQuery 将模式设置为 INSERT;query 或 statementExpression,或 statementProcessor 将模式设置为 STATEMENT。
以下代码片段展示了该通道适配器或网关的各种配置:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) {
return flow -> flow
.handle(Cassandra.outboundGateway(cassandraOperations)
.query("SELECT * FROM book WHERE author = :author limit :size")
.parameter("author", "payload")
.parameter("size", m -> m.getHeaders().get("limit")))
.channel(c -> c.flux("resultChannel"));
}
@Bean
fun outboundReactive(cassandraOperations: ReactiveCassandraOperations) =
integrationFlow {
handle(
Cassandra.outboundChannelAdapter(cassandraOperations)
.statementExpression("T(QueryBuilder).truncate('book').build()")
) { async(false) }
}
@ServiceActivator(inputChannel = "cassandraSelectChannel")
@Bean
public MessageHandler cassandraMessageHandler() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size");
Map<String, Expression> params = new HashMap<>();
params.put("author", PARSER.parseExpression("payload"));
params.put("size", PARSER.parseExpression("headers.limit"));
cassandraMessageHandler.setParameterExpressions(params);
cassandraMessageHandler.setOutputChannel(resultChannel());
cassandraMessageHandler.setProducesReply(true);
return cassandraMessageHandler;
}
<int-cassandra:outbound-channel-adapter id="outboundAdapter"
cassandra-template="cassandraTemplate"
write-options="writeOptions"
auto-startup="false"
async="false"/>
<int-cassandra:outbound-gateway id="outgateway"
request-channel="input"
cassandra-template="cassandraTemplate"
mode="STATEMENT"
write-options="writeOptions"
query="SELECT * FROM book limit :size"
reply-channel="resultChannel"
auto-startup="true">
<int-cassandra:parameter-expression name="author" expression="payload"/>
<int-cassandra:parameter-expression name="size" expression="headers.limit"/>
</int-cassandra:outbound-gateway>
如果在默认异步模式中使用 CassandraMessageHandler 作为网关,则会生成 Mono<WriteResult>,并根据提供的 MessageChannel 实现进行处理。
对于真正的响应式处理,建议为输出通道配置使用 FluxMessageChannel。
在同步模式下,将调用 Mono.block() 以获取回复值。
如果执行了 INSERT、UPDATE 或 DELETE 个操作,则请求消息负载中应包含一个实体(标记为 org.springframework.data.cassandra.core.mapping.Table)。
如果负载是实体的列表,则将执行相应的批量操作。
ingestQuery 模式期望有效负载以要插入的值矩阵形式存在 - List<List<?>>。
例如,如果实体如下所示:
@Table("book")
public record Book(@PrimaryKey String isbn,
String title,
@Indexed String author,
int pages,
LocalDate saleDate,
boolean isInStock) {
}
通道适配器具有以下配置:
@Bean
public MessageHandler cassandraMessageHandler3() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
cassandraMessageHandler.setIngestQuery(cqlIngest);
cassandraMessageHandler.setAsync(false);
return cassandraMessageHandler;
}
请求消息负载必须按如下方式转换:
List<List<Object>> ingestBooks =
payload.stream()
.map(book ->
List.<Object>of(
book.isbn(),
book.title(),
book.author(),
book.pages(),
book.saleDate(),
book.isInStock()))
.toList();
对于更复杂的用例,负载可以是 com.datastax.oss.driver.api.core.cql.Statement 的实例。
推荐使用 com.datastax.oss.driver.api.querybuilder.QueryBuilder API 来构建各种针对 Apache Cassandra 执行的语句。
例如,要从 Book 表中删除所有数据,可以向 CassandraMessageHandler 发送一条包含如下负载的消息:QueryBuilder.truncate("book").build()。
或者,基于请求消息的逻辑,可以为 CassandraMessageHandler 提供 statementExpression 或 statementProcessor,以便根据该消息构建 Statement。
为方便起见,com.datastax.oss.driver.api.querybuilder 作为 import 注册到 SpEL 评估上下文中,因此目标表达式可以简单如下:
statement-expression="T(QueryBuilder).selectFrom("book").all()"
setParameterExpressions(Map<String, Expression> parameterExpressions) 代表可绑定的命名查询参数,仅与 setQuery(String query) 选项一起使用。
请参阅上文提到的 Java 和 XML 示例。