|
对于最新稳定版本,请使用 Spring Integration 7.0.0! |
Apache Cassandra 支持
Spring Integration(从6.0版本起)提供通道适配器,用于对Apache Cassandra集群进行数据库作。 它完全基于 Apache Cassandra 的 Spring Data 项目。
你需要把这种依赖性纳入你的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-cassandra</artifactId>
<version>6.3.11</version>
</dependency>
compile "org.springframework.integration:spring-integration-cassandra:6.3.11"
卡桑德拉出站组件
这CassandraMessageHandler是摘要回复制作消息处理器实现 和 可以同时进行单向(默认)和请求-回复模式(a生成回复选项)。
它默认是异步的(setAsync(false)以重置)并执行响应式插入,更新,删除或陈述针对所提供响应式Cassandra运营.
作类型可以通过以下方式配置CassandraMessageHandler.Type选择。
这ingestQuery将模态设置为插入;这查询或声明表达式或语句处理器将 模式设置为陈述.
以下代码片段展示了该通道适配器或网关的各种配置:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) {
return flow -> flow
.handle(Cassandra.outboundGateway(cassandraOperations)
.query("SELECT * FROM book WHERE author = :author limit :size")
.parameter("author", "payload")
.parameter("size", m -> m.getHeaders().get("limit")))
.channel(c -> c.flux("resultChannel"));
}
@Bean
fun outboundReactive(cassandraOperations: ReactiveCassandraOperations) =
integrationFlow {
handle(
Cassandra.outboundChannelAdapter(cassandraOperations)
.statementExpression("T(QueryBuilder).truncate('book').build()")
) { async(false) }
}
@ServiceActivator(inputChannel = "cassandraSelectChannel")
@Bean
public MessageHandler cassandraMessageHandler() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size");
Map<String, Expression> params = new HashMap<>();
params.put("author", PARSER.parseExpression("payload"));
params.put("size", PARSER.parseExpression("headers.limit"));
cassandraMessageHandler.setParameterExpressions(params);
cassandraMessageHandler.setOutputChannel(resultChannel());
cassandraMessageHandler.setProducesReply(true);
return cassandraMessageHandler;
}
<int-cassandra:outbound-channel-adapter id="outboundAdapter"
cassandra-template="cassandraTemplate"
write-options="writeOptions"
auto-startup="false"
async="false"/>
<int-cassandra:outbound-gateway id="outgateway"
request-channel="input"
cassandra-template="cassandraTemplate"
mode="STATEMENT"
write-options="writeOptions"
query="SELECT * FROM book limit :size"
reply-channel="resultChannel"
auto-startup="true">
<int-cassandra:parameter-expression name="author" expression="payload"/>
<int-cassandra:parameter-expression name="size" expression="headers.limit"/>
</int-cassandra:outbound-gateway>
如果CassandraMessageHandler作为默认异步模式下的网关使用,a单<写结果>产生了,处理方式如下消息频道实现。
对于真正的响应式处理流信息频道推荐用于输出通道配置。
同步模式下单块块()调用 以获得回复值。
如果插入,更新或删除作被执行,一个实体(标记为org.springframework.data.cassandra.core.mapping.Table)期望出现在请求消息有效载荷中。
如果有效载荷是实体列表,则执行相应的批处理作。
这ingestQuery模式期望有效载荷作为一个值矩阵插入——名单<名单<?>>.
例如,如果该实体是这样的:
@Table("book")
public record Book(@PrimaryKey String isbn,
String title,
@Indexed String author,
int pages,
LocalDate saleDate,
boolean isInStock) {
}
通道适配器的配置如下:
@Bean
public MessageHandler cassandraMessageHandler3() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
cassandraMessageHandler.setIngestQuery(cqlIngest);
cassandraMessageHandler.setAsync(false);
return cassandraMessageHandler;
}
请求消息有效载荷必须如下转换:
List<List<Object>> ingestBooks =
payload.stream()
.map(book ->
List.<Object>of(
book.isbn(),
book.title(),
book.author(),
book.pages(),
book.saleDate(),
book.isInStock()))
.toList();
对于更复杂的用例,载荷可以作为com.datastax.oss.driver.api.core.cql.Statement.
这com.datastax.oss.driver.api.querybuilder.QueryBuilder建议用API构建针对Apache Cassandra执行的各种语句。
例如,要从书表中,带有此类有效载荷的消息可以发送给CassandraMessageHandler:QueryBuilder.truncate(“book”).build().
或者,对于基于请求消息的逻辑,一个声明表达式或语句处理器可以为CassandraMessageHandler建造一个陈述基于那条信息。
为方便起见,acom.datastax.oss.driver.api.querybuilder注册为进口进入 SpEL 评估上下文,因此目标表达式可以简单地描述如下:
statement-expression="T(QueryBuilder).selectFrom("book").all()"
这setParameterExpressions(Map<String, Expression> ParameterExpressions)表示可绑定的命名查询参数,仅用于setQuery(字符串查询)选择。
参见上述提到的 Java 和 XML 示例。