|
对于最新稳定版本,请使用 Spring Integration 7.0.0! |
MongoDb 支持
2.1 版本引入了对 MongoDB 的支持:一个“高性能、开源、面向文档的数据库”。
你需要把这种依赖性纳入你的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mongodb</artifactId>
<version>6.3.11</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:6.3.11"
要下载、安装和运行MongoDB,请参见MongoDB文档。
连接MongoDb
阻断还是反应?
从5.3版本开始,Spring Integration支持响应式MongoDB驱动程序,以实现访问MongoDB时的非阻塞I/O。 要启用响应式支持,请将MongoDB响应式流驱动添加到你的依赖中:
-
Maven
-
Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"
对于普通的同步客户端,你需要将其相应的驱动添加到依赖中:
-
Maven
-
Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"
他们俩都是自选在框架中,以更好地支持终端用户选择。
要开始与MongoDB交互,首先需要连接它。
Spring 集成基于另一个 Spring 项目 Spring Data MongoDB 的支持。
它提供工厂类别,称为Mongo数据库工厂和ReactiveMongo数据库工厂,简化了与 MongoDB 客户端 API 的集成。
| Spring Data 默认提供阻断 MongoDB 驱动,但你可以通过包含上述依赖选择被动使用。 |
用Mongo数据库工厂
要连接MongoDB,可以使用以下实现Mongo数据库工厂接口。
以下示例展示了如何使用SimpleMongoClient数据库工厂:
-
Java
-
XML
MongoDatabaseFactory mongoDbFactory =
new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
SimpleMongoClient数据库工厂有两个论元:aMongo客户端实例和字符串这会指定数据库的名称。
如果你需要配置诸如主机,端口以及其他,你可以通过底层提供的一个构造函数来传递这些内容。MongoClients(蒙哥客户端)类。
有关如何配置MongoDB的更多信息,请参见Spring-Data-MongoDB参考资料。
用ReactiveMongo数据库工厂
要用响应式驱动连接到MongoDB,可以使用以下实现ReactiveMongo数据库工厂接口。
以下示例展示了如何使用SimpleReactiveMongo数据库工厂:
-
Java
-
XML
ReactiveMongoDatabaseFactory mongoDbFactory =
new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
MongoDB 消息库
正如《企业集成模式(EIP)》一书中所述,消息存储允许你持久化消息。
在处理能够缓冲消息的组件时,这样做非常有用 (队列通道,聚合,重序器,以及其他。如果可靠性是个问题。
在春季集成中,消息商店策略还为理赔检查模式提供了基础,EIP中也有描述。
Spring Integration 的 MongoDB 模块提供了MongoDb消息存储,该是两个消息商店策略(主要用于主张检查模式)以及MessageGroupStore策略(主要用于聚合器和重编序模式)。
以下示例配置为MongoDb消息存储使用队列通道以及聚合:
<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="mongoDbMessageStore"/>
<int:channel>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="mongoDbMessageStore"/>
上述示例是一个简单的豆形态,它期望MongoDb工厂作为构造子论元。
这MongoDb消息存储扩展消息通过使用 Spring Data Mongo 映射机制,作为一个具有所有嵌套属性的 Mongo 文档。
当你需要访问有效载荷或头用于审计或分析——例如针对存储的消息。
这MongoDb消息存储使用自定义映射Mongo转换器实现到存储消息实例作为MongoDB文档,并且对这些属性(有效载荷和页眉值)的消息. |
从5.1.6版本开始,MongoDb消息存储可以通过自定义转换器配置,这些转换器会传播到内部映射Mongo转换器实现。
看MongoDbMessageStore.setCustomConverters(Object...自定义转换器)更多信息请参见JavaDocs。
Spring Integration 3.0 引入了ConfigurableMongoDbMessageStore.
它实现了消息商店和MessageGroupStore接口。
该类可以作为构造函数参数接收Mongo模板例如,你可以配置自定义写作关怀.
另一个构造函数要求映射Mongo转换器以及一个MongoDb工厂,这允许你提供一些自定义转换消息实例及其属性。
注意,默认情况下,ConfigurableMongoDbMessageStore使用标准的 Java 序列化来写入和读取消息与MongoDB之间的实例(参见MongoDbMessageBytes转换器)并依赖于其他属性的默认值,来自Mongo模板.
它构建了一个Mongo模板来自提供的MongoDb工厂和映射Mongo转换器.
由ConfigurableMongoDbMessageStore是可配置存储信息.
我们建议在消息包含复杂数据类型时,使用该实现来创建稳健且灵活的解决方案。
从6.0.8版本开始,AbstractConfigurableMongoDbMessageStore提供setCreateIndexes(boolean)(默认为true)选项,可用于禁用自动索引创建。
以下示例展示了如何声明豆子并禁用自动索引创建:
@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndexes(false);
return mongoDbChannelMessageStore;
}
MongoDB频道消息存储
4.0 版本引入了新的MongoDb频道消息存储.
它是优化的MessageGroupStore用于队列通道实例。
跟优先启用 = 真你可以用它<int:优先队列>实例以实现持久消息的优先级轮询。
优先级MongoDB文档字段由以下内容填充IntegrationMessageHeaderAccessor.PRIORITY (优先权) 消息头部。
此外,所有MongoDB也在消息商店实例现在有序列字段消息组文件。
这序列值是$inc简单序列同一收藏中的文档,按需创建。
这序列字段用于民意调查当消息存储在同一毫秒内时,提供先进先出(FIFO)消息顺序(如果配置为优先级内)。
我们不建议使用相同的产品MongoDb频道消息存储BEAN用于优先级和非优先级,因为优先启用这个选项适用于整个商店。
不过,还是一样收集可以同时用于两者MongoDb频道消息存储类型,因为从存储器中进行消息轮询是排序的,并且使用索引。
要配置该场景,你可以从一个消息存储豆扩展到另一个,如下示例所示: |
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="store"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
使用 AbstractConfigurableMongoDbMessageStore 并禁用自动索引创建
从6.0.8版本开始,AbstractConfigurableMongoDbMessageStore实现 asetCreateIndex(boolean)它可以用于可关闭或启用(默认)自动索引创建。
以下示例展示了如何声明豆子并禁用自动索引创建:
@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndex(false);
return mongoDbChannelMessageStore;
}
MongoDB 元数据存储
Spring Integration 4.2 引入了基于 MongoDB 的新版本元数据存储(参见元数据存储)实现。
你可以使用MongoDb元数据存储以在应用重启期间保持元数据状态。
你可以用这个新东西元数据存储通过以下适配器实现:
指导这些适配器使用新的MongoDb元数据存储,宣告一个豆名为元数据存储.
输入通道适配器会自动接收并使用声明的MongoDb元数据存储.
以下示例展示了如何声明一个名为元数据存储:
@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}
这MongoDb元数据存储其他实现并发元数据存储使得密钥能够可靠地在多个应用实例间共享,且只有一个实例允许存储或修改键值。
所有这些作都是原子作,得益于 MongoDB 的保证。
MongoDB 入站通道适配器
MongoDB的入站通道适配器是一个轮询式消费者,读取MongoDB的数据并作为消息有效载荷。
以下示例展示了如何配置MongoDB入站通道适配器:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query="{'name' : 'Bob'}"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>
如前面配置所示,你通过使用入站信道适配器并为各种属性提供值,例如:
-
查询: JSON 查询(参见 MongoDB 查询) -
查询表达式: 一个被评估为JSON查询字符串的SpEL表达式(作为查询属性上)或对应的实例o.s.data.mongodb.core.query.Query. 与查询属性。 -
实体类:有效载荷对象的类型。 如果没有提供,则com.mongodb.DBObject被归还。 -
收藏名称或集合-名称-表达式: 识别 MongoDB 集合的名称。 -
Mongodb工厂:引用 的实例o.s.data.mongodb.MongoDbFactory -
Mongo模板:引用 的实例o.s.data.mongodb.core.MongoTemplate -
其他所有入站适配器共有的属性(如“通道”)。
你不能同时设置Mongo模板和Mongodb工厂. |
前面的例子相对简单且静态,因为它有一个字面值查询并使用默认名称收集.
有时,你可能需要根据某些条件在运行时更改这些数值。
要做到这一点,请使用-表达等价物(查询表达式和集合-名称-表达式),其中所提供的表达式可以是任意有效的SpEL表达式。
另外,你可能还想对从 MongoDB 成功读取的数据做一些后处理。 例如;处理完文件后,你可能想移动或移除文件。 你可以利用 Spring Integration 2.2 添加的事务同步功能实现,如下示例所示:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="200" max-messages-per-poll="1">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-mongodb:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit
expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
channel="someChannel"/>
</int:transaction-synchronization-factory>
<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
以下示例展示了文档清理器上述例子中引用:
public class DocumentCleaner {
public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
if (target instanceof List<?> documents){
for (Object document : documents) {
mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
}
}
}
}
您可以通过以下方式声明你的轮询器为事务型事务元素。
该元素可以引用真实的事务管理器(例如,如果流程的其他部分调用了JDBC)。
如果你没有“真实”交易,可以使用o.s.i.transaction.PseudoTransactionManager,这是Spring的实现PlatformTransactionManager并且在没有实际事务时,能够使用Mongo适配器的事务同步功能。
| 这样做并不意味着 MongoDB 本身是事务性的。它允许在成功(提交)或失败(回滚)之后同步作。 |
一旦轮询器变成事务性,你可以设置一个实例o.s.i.transaction.事务同步工厂在事务元素。 一个交易同步工厂创建了事务同步. 为了方便,我们已公开了基于SpEL的默认版本交易同步工厂它允许你配置 SpEL 表达式,其执行与事务协调(同步)。支持提交前、提交后和回滚后事件的表达式,并为每个事件提供一个通道,发送评估结果(如有)。对于每个子元素,你可以指定表达和渠道属性。 如果渠道属性存在时,接收到的消息作为特定同步场景的一部分发送到该处。如果表达属性存在,表达式的结果是非空值,生成一个消息,结果为有效载荷,发送到默认通道(零通道)并出现在日志中(在调试层级)。如果你希望评估结果进入特定通道,请添加渠道属性。 如果表达式的结果为零或空,则不生成消息。
有关事务同步的更多信息,请参见事务同步。
从5.5版本开始,MongoDb消息源可以配置为更新表达式,必须计算为字符串与MongoDb合作更新语法或org.springframework.data.mongodb.core.query.Update实例。 它可以作为上述后处理过程的替代方案,并且会修改那些从集合中获取的实体,使其在下一个轮询周期中不会再次从集合中被拉取(假设更新改变了查询中使用的某些值)。当多个实例MongoDb消息源同一集合中使用。
MongoDB 变更流入站通道适配器
从5.3版本开始,Spring-integration-mongodb模块引入了MongoDbChangeStreamMessageProducer- 反应型MessageProducerSupportSpring Data 的实现ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class)应用程序接口。 该分量产生通量包含 a 的消息身体之变流事件作为默认的有效载荷,以及一些与流相关的头部变更(参见蒙古首领). 建议这样做MongoDbChangeStreamMessageProducer与流信息频道作为输出通道用于按需订阅和下游活动观看。
该通道适配器的Java DSL配置可能如下:
@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
return IntegrationFlow.from(
MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
.domainType(Person.class)
.collection("person")
.extractBody(false))
.channel(MessageChannels.flux())
.get();
}
当MongoDbChangeStreamMessageProducer被停止,或者订阅在下游被取消,或者MongoDb变更流产生作类型。无效这发行人已完成。通道适配器可以重新启动并重新开始发行人源数据被创建后自动订阅MessageProducerSupport.subscribeToPublisher(Publisher<? 扩展消息<?>>). 如果需要从其他地方消费变更流事件,该通道适配器可以在启动之间重新配置以适应新选项。
关于变更流支持的更多信息,请参见 Spring Data MongoDb 文档。
MongoDB 出站通道适配器
MongoDB出站通道适配器允许你将消息有效载荷写入MongoDB文档存储,如下示例所示:
<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
collection-name="myCollection"
mongo-converter="mongoConverter"
mongodb-factory="mongoDbFactory" />
如前面配置所示,您可以通过以下方式配置MongoDB出站通道适配器出站通道适配器元素,提供各种属性的值,例如:
-
收藏名称或集合-名称-表达式: 识别用于使用的MongoDb集合名称。 -
Mongo转换器:引用 的实例o.s.data.mongodb.core.convert.MongoConverter它帮助将原始 Java 对象转换为 JSON 文档表示。 -
Mongodb工厂:引用 的实例o.s.data.mongodb.MongoDbFactory. -
Mongo模板:引用 的实例o.s.data.mongodb.core.MongoTemplate. 注意:你不能同时设置mongo-template和mongodb-factory。 -
所有入站适配器共有的其他属性(如“通道”)。
前面的例子相对简单且静态,因为它有一个字面值收藏名称.
有时,你可能需要在运行时根据某些条件更改这个值。
要做到这一点,请使用集合-名称-表达式,其中所提供的表达式是任意有效的SpEL表达式。
MongoDB 出站网关
5.0版本引入了MongoDB出站网关。 它允许你通过向数据库的请求通道发送消息来查询。 网关随后将响应发送到回复信道。 你可以使用消息有效载荷和报头来指定查询和集合名称,如下示例所示:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory;
@Autowired
private MongoConverter;
@Bean
public IntegrationFlow gatewaySingleQueryFlow() {
return f -> f
.handle(queryOutboundGateway())
.channel(c -> c.queue("retrieveResults"));
}
private MongoDbOutboundGatewaySpec queryOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction(m -> m.getHeaders().get("collection"))
.expectSingleResult(true)
.entityClass(Person.class);
}
}
class MongoDbKotlinApplication {
fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)
@Autowired
lateinit var mongoDbFactory: MongoDatabaseFactory
@Autowired
lateinit var mongoConverter: MongoConverter
@Bean
fun gatewaySingleQueryFlow() =
integrationFlow {
handle(queryOutboundGateway())
channel { queue("retrieveResults") }
}
private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction<Any> { m -> m.headers["collection"] as String }
.expectSingleResult(true)
.entityClass(Person::class.java)
}
}
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory mongoDbFactory;
@Bean
@ServiceActivator(inputChannel = "requestChannel")
public MessageHandler mongoDbOutboundGateway() {
MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
gateway.setCollectionNameExpressionString("'myCollection'");
gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
gateway.setExpectSingleResult(true);
gateway.setEntityClass(Person.class);
gateway.setOutputChannelName("replyChannel");
return gateway;
}
@Bean
@ServiceActivator(inputChannel = "replyChannel")
public MessageHandler handler() {
return message -> System.out.println(message.getPayload());
}
}
<int-mongodb:outbound-gateway id="gatewayQuery"
mongodb-factory="mongoDbFactory"
mongo-converter="mongoConverter"
query="{firstName: 'Bob'}"
collection-name="myCollection"
request-channel="in"
reply-channel="out"
entity-class="org.springframework.integration.mongodb.test.entity$Person"/>
你可以在 MongoDB 出站网关中使用以下属性:
-
收藏名称或集合-名称-表达式: 识别 MongoDB 集合的名称。 -
Mongo转换器:引用 的实例o.s.data.mongodb.core.convert.MongoConverter它帮助将原始 Java 对象转换为 JSON 文档表示。 -
Mongodb工厂:引用 的实例o.s.data.mongodb.MongoDbFactory. -
Mongo模板:引用 的实例o.s.data.mongodb.core.MongoTemplate. 注意:你不能同时设置Mongo模板和Mongodb工厂. -
实体类: 将传递给找到(..)和findOne(..)MongoTemplate 中的方法。 如果未提供该属性,默认值为org.bson.文档. -
查询或查询表达式: 指定了MongoDB查询。 更多查询示例请参见MongoDB文档。 -
集合回调:引用 的实例org.springframework.data.mongodb.core.CollectionCallback. 最好是o.s.i.mongodb.outbound.消息收集回拨自5.0.11起,包含请求消息上下文。 更多信息请参见其Java文档。 注意:你不能两者兼得集合回调以及任何查询属性。
作为查询和查询表达式属性,你可以通过使用收藏回溯作为消息收集回调功能接口实现。
以下示例指定了一个计数作:
private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.collectionCallback((collection, requestMessage) -> collection.count())
.collectionName("myCollection");
}
MongoDB 响应式通道适配器
从5.3版本开始,ReactiveMongoDbStoringMessageHandler和ReactiveMongoDbMessageSource提供实现。
它们基于ReactiveMongoOperations来自Spring Data,并且需要org.mongodb:mongodb-driver-reactivestreamsDependency。
这ReactiveMongoDbStoringMessageHandler是响应式消息处理器当响应式流组合涉及集成流定义时,框架内原生支持该系统。
更多信息请参见ReactiveMessageHandler。
从配置角度看,这与许多其他标准频道适配器没有区别。 例如,在Java DSL中,这样的通道适配器可以像以下那样使用:
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return f -> f
.channel(MessageChannels.flux())
.handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}
在这个示例中,我们将通过以下方式连接到 MongoDb。ReactiveMongo数据库工厂并将请求消息中的数据存储到默认集合中,并具有数据名字。
实际作将根据内部生成的响应式流组合按需执行ReactiveStreamsConsumer.
这ReactiveMongoDbMessageSource是摘要消息源基于 的实现ReactiveMongo数据库工厂或ReactiveMongoOperations以及 MongoDb 查询(或表达式),调用find()或findOne()根据expectSingleResult带有期望值的选项entityClass输入以转换查询结果。
查询执行和结果评估按需执行,当发行人 (通量或单根据expectSingleResult选项)在生成消息的有效载荷中被订阅。
框架可以自动订阅这样的有效载荷(基本上平面地图) 当分流器和流信息频道这些数据被用于下游。
否则,目标应用负责在下游端点订阅被轮询的发布者。
使用 Java DSL,这样的通道适配器可以配置如下:
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return IntegrationFlow
.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
.entityClass(Person.class),
c -> c.poller(Pollers.fixedDelay(1000)))
.split()
.channel(c -> c.flux("output"))
.get();
}
从5.5版本开始,ReactiveMongoDbMessageSource可以配置为更新表达式.
它的功能和阻挡一样MongoDb消息源.
参见 MongoDB 入站通道适配器和摘要MongoDb消息源规格更多信息请参见JavaDocs。