此版本仍在开发中,尚未被视为稳定版。如需最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

MongoDB 支持

版本 2.1 引入对 MongoDB 的支持:一款“高性能、开源、面向文档的数据库”。spring-doc.cadn.net.cn

此依赖项是项目所必需的:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mongodb</artifactId>
    <version>7.1.0-M3</version>
</dependency>
implementation "org.springframework.integration:spring-integration-mongodb:7.1.0-M3"

要下载、安装和运行 MongoDB,请参阅 MongoDB 文档spring-doc.cadn.net.cn

连接到 MongoDB

阻塞式还是响应式?

从 5.3 版本开始,Spring Integration 提供对响应式 MongoDB 驱动的支持,以便在访问 MongoDB 时实现非阻塞 I/O。 要启用响应式支持,请将 MongoDB 响应式流驱动添加到您的依赖项中:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
implementation "org.mongodb:mongodb-driver-reactivestreams"

对于常规的同步客户端,您需要将其相应的驱动程序添加到依赖项中:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-sync</artifactId>
</dependency>
implementation "org.mongodb:mongodb-driver-sync"

两者在框架中均为 optional,以更好地支持最终用户的选择。spring-doc.cadn.net.cn

要开始与 MongoDB 交互,您首先需要连接到它。 Spring Integration 建立在另一个 Spring 项目 Spring Data MongoDB 提供的支持之上。 它提供了名为 MongoDatabaseFactoryReactiveMongoDatabaseFactory 的工厂类,可简化与 MongoDB Client API 的集成。spring-doc.cadn.net.cn

Spring Data 默认提供阻塞式 MongoDB 驱动程序,但您可以通过包含上述依赖项来选择使用响应式方式。

使用MongoDatabaseFactory

要连接 MongoDB,您可以使用 MongoDatabaseFactory 接口的实现。spring-doc.cadn.net.cn

下面的例子展示了如何使用SimpleMongoClientDatabaseFactoryspring-doc.cadn.net.cn

MongoDatabaseFactory mongoDbFactory =
        new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

SimpleMongoClientDatabaseFactory 接受两个参数:一个 MongoClient 实例和一个指定数据库名称的 String。 如果您需要配置诸如 hostport 等属性,可以通过底层 MongoClients 类提供的其中一个构造函数传递这些属性。 有关如何配置 MongoDB 的更多信息,请参阅 Spring-Data-MongoDB 参考文档。spring-doc.cadn.net.cn

使用ReactiveMongoDatabaseFactory

要使用响应式驱动程序连接 MongoDB,您可以实现 ReactiveMongoDatabaseFactory 接口。spring-doc.cadn.net.cn

下面的例子展示了如何使用SimpleReactiveMongoDatabaseFactoryspring-doc.cadn.net.cn

ReactiveMongoDatabaseFactory mongoDbFactory =
        new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

MongoDB 消息存储

正如《企业集成模式》(EIP) 一书所述,消息存储 允许您持久化消息。 在处理具有消息缓冲能力的组件(QueueChannel, aggregator, resequencer 等)时,如果可靠性是一个关注点,这样做可能非常有用。 在 Spring Integration 中,MessageStore 策略还为断言检查 模式提供了基础,该模式也在 EIP 中进行了描述。spring-doc.cadn.net.cn

Spring Integration 的 MongoDB 模块提供了 MongoDbMessageStore,它同时实现了 MessageStore 策略(主要用于断言检查模式)和 MessageGroupStore 策略(主要用于聚合器和重排序器模式)。spring-doc.cadn.net.cn

以下示例配置了一个 MongoDbMessageStore 使用一个 QueueChannel 和一个 aggregatorspring-doc.cadn.net.cn

<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="mongoDbMessageStore"/>

上述示例是一个简单的 bean 配置,它期望一个MongoDbFactory作为构造函数参数。spring-doc.cadn.net.cn

MongoDbMessageStoreMessage扩展为Mongo文档,并利用Spring Data Mongo映射机制包含所有嵌套属性。 当您需要访问payloadheaders以进行审计或分析时(例如针对存储的消息),此功能非常有用。spring-doc.cadn.net.cn

The MongoDbMessageStore uses a custom MappingMongoConverter implementation to store Message instances as MongoDB documents, and there are some limitations for the properties (payload and header values) of the Message.

从 5.1.6 版本开始,MongoDbMessageStore 可以配置自定义转换器,这些转换器会被传播到内部的 MappingMongoConverter 实现中。 有关更多信息,请参阅 MongoDbMessageStore.setCustomConverters(Object…​ customConverters) JavaDocs。spring-doc.cadn.net.cn

Spring Integration 3.0 引入了 ConfigurableMongoDbMessageStore。 它同时实现了 MessageStoreMessageGroupStore 接口。 此类可以接收一个 MongoTemplate 作为构造参数,您可以利用它来配置自定义的 WriteConcern。 另一个构造函数需要 MappingMongoConverterMongoDbFactory,这允许您为 Message 实例及其属性提供自定义转换。 请注意,默认情况下,ConfigurableMongoDbMessageStore 使用标准 Java 序列化将 Message 实例写入和读取到 MongoDB(参见 MongoDbMessageBytesConverter),并依赖 MongoTemplate 中的其他属性的默认值。 它根据提供的 MongoDbFactoryMappingMongoConverter 构建一个 MongoTemplate。 由 ConfigurableMongoDbMessageStore 存储的集合的默认名称是 configurableStoreMessages。 我们建议使用此实现来创建稳健且灵活的解决方案,特别是当消息包含复杂数据类型时。spring-doc.cadn.net.cn

从版本 6.0.8 开始,AbstractConfigurableMongoDbMessageStore 提供了一个 setCreateIndexes(boolean)(默认为 true)选项,可用于禁用自动索引的创建。 以下示例展示了如何声明一个 bean 并禁用自动索引的创建:spring-doc.cadn.net.cn

@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
    MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndexes(false);
    return mongoDbChannelMessageStore;
}

MongoDB 通道消息存储

版本 4.0 引入了新的 MongoDbChannelMessageStore。 它是专为 QueueChannel 实例优化的 MessageGroupStore。 使用 priorityEnabled = true,您可以在 <int:priority-queue> 实例中利用它来实现持久化消息的优先级轮询。 优先级的 MongoDB 文档字段由 IntegrationMessageHeaderAccessor.PRIORITY (priority) 消息头填充。spring-doc.cadn.net.cn

此外,所有 MongoDB MessageStore 实例现在都拥有一个用于 MessageGroup 文档的 sequence 字段。 sequence 的值是通过对来自同一集合的简单 sequence 文档执行 $inc 操作所得的结果,该文档按需创建。 sequence 字段用于 poll 操作中,以在消息存储于同一毫秒内时提供先进先出(FIFO)的消息顺序(如果已配置优先级)。spring-doc.cadn.net.cn

我们不建议将同一个MongoDbChannelMessageStore bean 同时用于优先级和非优先级场景,因为priorityEnabled选项适用于整个存储区。 然而,同一个collection可以同时用于这两种MongoDbChannelMessageStore类型,因为从存储区进行的消息轮询是有序的并使用索引。 要配置该场景,您可以让一个消息存储 bean 继承自另一个,如下例所示:
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="store"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

使用 AbstractConfigurableMongoDbMessageStore,自动索引创建已禁用

从版本 6.0.8 开始,AbstractConfigurableMongoDbMessageStore 实现了一个 setCreateIndex(boolean),可用于禁用或启用(默认)自动索引创建。 以下示例展示了如何声明一个 bean 并禁用自动索引创建:spring-doc.cadn.net.cn

@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
    AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndex(false);

    return mongoDbChannelMessageStore;
}

MongoDB 元数据存储

Spring Integration 4.2 引入了一种基于 MongoDB 的新MetadataStore(请参阅元数据存储)实现。 您可以使用MongoDbMetadataStore在应用程序重启之间维护元数据状态。 您可以将此新的MetadataStore实现与以下适配器配合使用:spring-doc.cadn.net.cn

To instruct these adapters to use the new MongoDbMetadataStore, declare a Spring bean with a bean name of metadataStore. The feed inbound channel adapter automatically picks up and use the declared MongoDbMetadataStore. The following example shows how to declare a bean with a name of metadataStore:spring-doc.cadn.net.cn

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
    return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}

The MongoDbMetadataStore 也实现了 ConcurrentMetadataStore,使其能够在多个应用程序实例之间可靠地共享,其中仅允许一个实例存储或修改键的值。 所有这些都是原子操作,得益于 MongoDB 的保障措施。spring-doc.cadn.net.cn

MongoDB 入站通道适配器

MongoDB 入站通道适配器是一个轮询消费者,它从 MongoDB 读取数据并将其作为 Message 有效负载发送。 以下示例展示了如何配置 MongoDB 入站通道适配器:spring-doc.cadn.net.cn

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
       channel="replyChannel"
       query="{'name' : 'Bob'}"
       entity-class="java.lang.Object"
       auto-startup="false">
		<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

如前面的配置所示,您通过使用 inbound-channel-adapter 元素并为其提供各种属性的值来配置 MongoDb 入站通道适配器,例如:spring-doc.cadn.net.cn

您不能同时设置 mongo-templatemongodb-factory

前面的示例相对简单且静态,因为它对 query 使用了字面量值,并对 collection 使用了默认名称。 有时,您可能需要根据某些条件在运行时更改这些值。 为此,请使用它们的 -expression 等价物(query-expressioncollection-name-expression),其中提供的表达式可以是任何有效的 SpEL 表达式。spring-doc.cadn.net.cn

此外,您可能希望对从 MongoDB 读取并成功处理的数据进行一些后处理。 例如,在处理完文档后,您可能希望移动或删除该文档。 您可以使用 Spring Integration 2.2 添加的事务同步功能来实现这一点,如下例所示:spring-doc.cadn.net.cn

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
    channel="replyChannel"
    query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
    entity-class="java.lang.Object"
    auto-startup="false">
        <int:poller fixed-rate="200" max-messages-per-poll="1">
            <int:transactional synchronization-factory="syncFactory"/>
        </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit
        expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
        channel="someChannel"/>
</int:transaction-synchronization-factory>

<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

以下示例展示了前一个示例中引用的 DocumentCleanerspring-doc.cadn.net.cn

public class DocumentCleaner {
    public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
        if (target instanceof List<?> documents){
            for (Object document : documents) {
                mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
            }
        }
    }
}

您可以通过使用 transactional 元素将您的轮询器声明为事务性的。 该元素可以引用一个真实的事务管理器,例如,如果流程中的其他部分调用了 JDBC。 如果您没有“真实”的事务,可以使用 o.s.i.transaction.PseudoTransactionManager 的实例,它是 Spring 的 PlatformTransactionManager 的实现,并在没有实际事务时启用 Mongo 适配器的事务同步功能。spring-doc.cadn.net.cn

这样做并不会使 MongoDB 本身具有事务性。 它允许在成功(提交)之前或之后,或在失败(回滚)之后执行操作的同步。

一旦您的轮询器变为事务性的,您就可以在transactional元素上设置o.s.i.transaction.TransactionSynchronizationFactory的实例。一个 TransactionSynchronizationFactory 创建 TransactionSynchronization 的实例。为了方便您使用,我们提供了一个默认的基于 SpEL 的 TransactionSynchronizationFactory,它允许您配置 SpEL 表达式,并使其执行与事务协调(同步)。支持提交前、提交后和回滚后的表达式,并为每个事件提供一个通道,用于发送评估结果(如果有)。对于每个子元素,您可以指定 expressionchannel 属性。如果仅存在channel属性,则接收到的消息将作为特定同步场景的一部分发送到该处。如果仅存在 expression 属性且表达式的结果为非空值,则会生成一条以该结果为有效负载的消息,并将其发送至默认通道(NullChannel),同时出现在日志中(在 DEBUG 级别)。如果您希望评估结果发送到特定通道,请添加 channel 属性。如果表达式的结果为 null 或 void,则不会生成任何消息。spring-doc.cadn.net.cn

有关事务同步的更多信息,请参见 事务同步spring-doc.cadn.net.cn

从版本 5.5 开始,MongoDbMessageSource 可以使用 updateExpression 进行配置,该值必须能够根据 MongoDB update 语法评估为 String,或者为一个 org.springframework.data.mongodb.core.query.Update 实例。 它可以用作上述后处理过程的替代方案,并修改从集合中获取的实体,以便在下一次轮询周期中不会再次从集合中拉取它们(假设更新更改了查询中使用的某些值)。 当在集群中对同一集合使用多个 MongoDbMessageSource 实例时,仍建议使用事务来实现执行隔离和数据一致性。spring-doc.cadn.net.cn

MongoDB 变更流入站通道适配器

从 5.3 版本开始,spring-integration-mongodb 模块引入了 MongoDbChangeStreamMessageProducer —— 这是 Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API 的响应式 MessageProducerSupport 实现。 该组件默认生成一个包含 ChangeStreamEvent 作为负载的消息 Flux,并附带一些与变更流相关的头部信息(参见 MongoHeaders)。 建议将此 MongoDbChangeStreamMessageProducerFluxMessageChannel 结合使用,作为下游按需订阅和事件消费的 outputChannelspring-doc.cadn.net.cn

此通道适配器的 Java DSL 配置可能如下所示:spring-doc.cadn.net.cn

@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
    return IntegrationFlow.from(
            MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
                    .domainType(Person.class)
                    .collection("person")
                    .extractBody(false))
            .channel(MessageChannels.flux())
            .get();
}

MongoDbChangeStreamMessageProducer 停止、下游取消订阅或 MongoDB 变更流产生 OperationType.INVALIDATE 时,Publisher 将完成。 通道适配器可以重新启动,并创建源数据的新 Publisher,它将在 MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>) 中自动订阅。 如果在启动之间需要配置新选项以从其他位置消费变更流事件,则可以重新配置此通道适配器。spring-doc.cadn.net.cn

有关 Spring Data MongoDB 中变更流支持的更多信息,请参阅 文档spring-doc.cadn.net.cn

MongoDB 出站通道适配器

MongoDB 出站通道适配器允许您将消息负载写入 MongoDB 文档存储,如下例所示:spring-doc.cadn.net.cn

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
	collection-name="myCollection"
	mongo-converter="mongoConverter"
	mongodb-factory="mongoDbFactory" />

如前所述配置所示,您可以通过使用 outbound-channel-adapter 元素来配置 MongoDB 出站通道适配器,并提供各种属性的值,例如:spring-doc.cadn.net.cn

  • collection-name or collection-name-expression: 标识要使用的 MongoDB 集合的名称。spring-doc.cadn.net.cn

  • mongo-converter: 对 o.s.data.mongodb.core.convert.MongoConverter 实例的引用,该实例协助将原始 Java 对象转换为 JSON 文档表示。spring-doc.cadn.net.cn

  • mongodb-factory: 对 o.s.data.mongodb.MongoDbFactory 实例的引用。spring-doc.cadn.net.cn

  • mongo-template: 对 o.s.data.mongodb.core.MongoTemplate 实例的引用。 注意:您不能同时设置 mongo-templatemongodb-factoryspring-doc.cadn.net.cn

  • 其他所有入站适配器通用的属性(如“channel”)。spring-doc.cadn.net.cn

前面的示例相对简单且静态,因为它为 collection-name 提供了字面量值。 有时,您可能需要根据某些条件在运行时更改此值。 为此,请使用 collection-name-expression,其中提供的表达式是任何有效的 SpEL 表达式。spring-doc.cadn.net.cn

MongoDB 出站网关

版本 5.0 引入了 MongoDB 出站网关。 它允许您通过向请求通道发送消息来查询数据库。 随后,网关将响应发送到回复通道。 您可以使用消息负载和标头来指定查询和集合名称,如下例所示:spring-doc.cadn.net.cn

@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory;

    @Autowired
    private MongoConverter;


    @Bean
    public IntegrationFlow gatewaySingleQueryFlow() {
        return f -> f
                .handle(queryOutboundGateway())
                .channel(c -> c.queue("retrieveResults"));
    }

    private MongoDbOutboundGatewaySpec queryOutboundGateway() {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
                .query("{name : 'Bob'}")
                .collectionNameFunction(m -> m.getHeaders().get("collection"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }

}
class MongoDbKotlinApplication {

    fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)

    @Autowired
    lateinit var mongoDbFactory: MongoDatabaseFactory

    @Autowired
    lateinit var mongoConverter: MongoConverter

    @Bean
    fun gatewaySingleQueryFlow() =
    integrationFlow {
        handle(queryOutboundGateway())
        channel { queue("retrieveResults") }
    }

    private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .query("{name : 'Bob'}")
            .collectionNameFunction<Any> { m -> m.headers["collection"] as String }
            .expectSingleResult(true)
            .entityClass(Person::class.java)
    }

}
@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory mongoDbFactory;

    @Bean
    @ServiceActivator(inputChannel = "requestChannel")
    public MessageHandler mongoDbOutboundGateway() {
        MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
        gateway.setCollectionNameExpressionString("'myCollection'");
        gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
        gateway.setExpectSingleResult(true);
        gateway.setEntityClass(Person.class);
        gateway.setOutputChannelName("replyChannel");
        return gateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public MessageHandler handler() {
        return message -> System.out.println(message.getPayload());
    }
}
<int-mongodb:outbound-gateway id="gatewayQuery"
    mongodb-factory="mongoDbFactory"
    mongo-converter="mongoConverter"
    query="{firstName: 'Bob'}"
    collection-name="myCollection"
    request-channel="in"
    reply-channel="out"
    entity-class="org.springframework.integration.mongodb.test.entity$Person"/>

您可以将以下属性与 MongoDB 出站网关配合使用:spring-doc.cadn.net.cn

  • collection-name or collection-name-expression: 标识要使用的 MongoDB 集合的名称。spring-doc.cadn.net.cn

  • mongo-converter: 对 o.s.data.mongodb.core.convert.MongoConverter 实例的引用,该实例协助将原始 Java 对象转换为 JSON 文档表示。spring-doc.cadn.net.cn

  • mongodb-factory: 对 o.s.data.mongodb.MongoDbFactory 实例的引用。spring-doc.cadn.net.cn

  • mongo-template: 引用 o.s.data.mongodb.core.MongoTemplate 的实例。 注意:您不能同时设置 mongo-templatemongodb-factoryspring-doc.cadn.net.cn

  • entity-class: 要传递给 MongoTemplate 中 find(..)findOne(..) 方法的实体类的完全限定名。 如果未提供此属性,则默认值为 org.bson.Documentspring-doc.cadn.net.cn

  • queryquery-expression:指定 MongoDB 查询。 有关更多查询示例,请参阅 MongoDB 文档spring-doc.cadn.net.cn

  • collection-callback: 对 org.springframework.data.mongodb.core.CollectionCallback 实例的引用。 自 5.0.11 版本起,建议使用 o.s.i.mongodb.outbound.MessageCollectionCallback 的实例,因为它支持请求消息上下文。 请参见其 Javadocs 以获取更多信息。 注意:您不能同时拥有 collection-callback 和任何查询属性。spring-doc.cadn.net.cn

作为 queryquery-expression 属性的替代方案,您可以使用 collectionCallback 属性引用 MessageCollectionCallback 函数式接口的实现来指定其他数据库操作。 以下示例指定了一个计数操作:spring-doc.cadn.net.cn

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
    return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .collectionCallback((collection, requestMessage) -> collection.count())
            .collectionName("myCollection");
}

MongoDB 响应式通道适配器

从 5.3 版本开始,提供了 ReactiveMongoDbStoringMessageHandlerReactiveMongoDbMessageSource 实现。 它们基于 Spring Data 中的 ReactiveMongoOperations,并需要 org.mongodb:mongodb-driver-reactivestreams 依赖。spring-doc.cadn.net.cn

The ReactiveMongoDbStoringMessageHandlerReactiveMessageHandler 的一个实现,该实现在响应式流组合涉及集成流定义时由框架原生支持。 有关更多信息,请参阅 ReactiveMessageHandlerspring-doc.cadn.net.cn

从配置角度来看,它与许多其他标准通道适配器没有区别。 例如,使用 Java DSL 时,可以像这样使用此类通道适配器:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return f -> f
            .channel(MessageChannels.flux())
            .handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}

在此示例中,我们将通过提供的 ReactiveMongoDatabaseFactory 连接到 MongoDB,并使用名为 data 的默认集合存储来自请求消息的数据。 实际操作将在内部创建的 ReactiveStreamsConsumer 中,通过响应式流组合按需执行。spring-doc.cadn.net.cn

The ReactiveMongoDbMessageSource 是一个基于提供的 ReactiveMongoDatabaseFactoryReactiveMongoOperations 以及 MongoDB 查询(或表达式)的 AbstractMessageSource 实现,它根据 expectSingleResult 选项调用 find()findOne() 操作,并期望将查询结果转换为 entityClass 类型。 查询执行和结果评估是在订阅消息负载中的 Publisher(根据 expectSingleResult 选项为 FluxMono)时按需进行的。 当下游使用拆分器(splitter)和 FluxMessageChannel 时,该框架可以自动订阅此类负载(本质上为 flatMap)。 否则,目标应用程序有责任在下游端点中订阅轮询发布器。spring-doc.cadn.net.cn

使用 Java DSL,这样的通道适配器可以配置如下:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return IntegrationFlow
            .from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
                            .entityClass(Person.class),
                    c -> c.poller(Pollers.fixedDelay(1000)))
            .split()
            .channel(c -> c.flux("output"))
            .get();
}

从版本 5.5 开始,ReactiveMongoDbMessageSource 可以使用 updateExpression 进行配置。 它具有与阻塞式 MongoDbMessageSource 相同的功能。 有关更多信息,请参阅 MongoDB 入站通道适配器AbstractMongoDbMessageSourceSpec JavaDocs。spring-doc.cadn.net.cn