如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

MongoDB 支持

版本 2.1 引入对 MongoDB 的支持:一款“高性能、开源、面向文档的数据库”。spring-doc.cadn.net.cn

您需要将以下依赖项包含到您的项目中:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mongodb</artifactId>
    <version>6.4.10</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:6.4.10"

要下载、安装和运行 MongoDB,请参阅 MongoDB 文档spring-doc.cadn.net.cn

连接到 MongoDB

阻塞式还是响应式?

从 5.3 版本开始,Spring Integration 提供对响应式 MongoDB 驱动的支持,以便在访问 MongoDB 时实现非阻塞 I/O。 要启用响应式支持,请将 MongoDB 响应式流驱动添加到您的依赖项中:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"

对于常规的同步客户端,您需要将其对应的驱动程序添加到依赖项中:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"

两者在框架中均为 optional,以更好地支持最终用户的选择。spring-doc.cadn.net.cn

要开始与 MongoDB 交互,您首先需要连接到它。 Spring Integration 建立在另一个 Spring 项目 Spring Data MongoDB 提供的支持之上。 它提供了名为 MongoDatabaseFactoryReactiveMongoDatabaseFactory 的工厂类,可简化与 MongoDB Client API 的集成。spring-doc.cadn.net.cn

Spring Data 默认提供阻塞式 MongoDB 驱动程序,但您可以通过包含上述依赖项来选择使用响应式方式。

使用MongoDatabaseFactory

要连接到 MongoDB,您可以使用 MongoDatabaseFactory 接口的实现。spring-doc.cadn.net.cn

下面的例子展示了如何使用SimpleMongoClientDatabaseFactoryspring-doc.cadn.net.cn

MongoDatabaseFactory mongoDbFactory =
        new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

SimpleMongoClientDatabaseFactory 接受两个参数:一个 MongoClient 实例和一个指定数据库名称的 String。 如果您需要配置诸如 hostport 等属性,可以通过底层 MongoClients 类提供的其中一个构造函数传递这些属性。 有关如何配置 MongoDB 的更多信息,请参阅 Spring-Data-MongoDB 参考文档。spring-doc.cadn.net.cn

使用ReactiveMongoDatabaseFactory

要使用响应式驱动程序连接 MongoDB,您可以实现 ReactiveMongoDatabaseFactory 接口。spring-doc.cadn.net.cn

下面的例子展示了如何使用SimpleReactiveMongoDatabaseFactoryspring-doc.cadn.net.cn

ReactiveMongoDatabaseFactory mongoDbFactory =
        new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

MongoDB 消息存储

正如《企业集成模式》(EIP) 一书所述,消息存储 允许您持久化消息。 在处理具有消息缓冲能力的组件(QueueChannel, aggregator, resequencer 等)时,如果可靠性是一个关注点,这样做可能非常有用。 在 Spring Integration 中,MessageStore 策略还为断言检查 模式提供了基础,该模式也在 EIP 中进行了描述。spring-doc.cadn.net.cn

Spring Integration 的 MongoDB 模块提供了 MongoDbMessageStore,它同时实现了 MessageStore 策略(主要用于断言检查模式)和 MessageGroupStore 策略(主要用于聚合器和重排序器模式)。spring-doc.cadn.net.cn

以下示例配置了一个 MongoDbMessageStore 使用一个 QueueChannel 和一个 aggregatorspring-doc.cadn.net.cn

<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="mongoDbMessageStore"/>

上述示例是一个简单的 bean 配置,它期望一个MongoDbFactory作为构造函数参数。spring-doc.cadn.net.cn

MongoDbMessageStoreMessage扩展为Mongo文档,并利用Spring Data Mongo映射机制包含所有嵌套属性。 当您需要访问payloadheaders以进行审计或分析时(例如针对存储的消息),此功能非常有用。spring-doc.cadn.net.cn

The MongoDbMessageStore uses a custom MappingMongoConverter implementation to store Message instances as MongoDB documents, and there are some limitations for the properties (payload and header values) of the Message.

从 5.1.6 版本开始,MongoDbMessageStore 可以配置自定义转换器,这些转换器会被传播到内部的 MappingMongoConverter 实现中。 有关更多信息,请参阅 MongoDbMessageStore.setCustomConverters(Object…​ customConverters) JavaDocs。spring-doc.cadn.net.cn

Spring Integration 3.0 引入了 ConfigurableMongoDbMessageStore。 它同时实现了 MessageStoreMessageGroupStore 接口。 此类可以接收一个 MongoTemplate 作为构造参数,您可以利用它来配置自定义的 WriteConcern。 另一个构造函数需要 MappingMongoConverterMongoDbFactory,这允许您为 Message 实例及其属性提供自定义转换。 请注意,默认情况下,ConfigurableMongoDbMessageStore 使用标准 Java 序列化将 Message 实例写入和读取到 MongoDB(参见 MongoDbMessageBytesConverter),并依赖 MongoTemplate 中的其他属性的默认值。 它根据提供的 MongoDbFactoryMappingMongoConverter 构建一个 MongoTemplate。 由 ConfigurableMongoDbMessageStore 存储的集合的默认名称是 configurableStoreMessages。 我们建议使用此实现来创建稳健且灵活的解决方案,特别是当消息包含复杂数据类型时。spring-doc.cadn.net.cn

从版本 6.0.8 开始,AbstractConfigurableMongoDbMessageStore 提供了一个 setCreateIndexes(boolean)(默认为 true)选项,可用于禁用自动索引的创建。 以下示例展示了如何声明一个 bean 并禁用自动索引的创建:spring-doc.cadn.net.cn

@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
    MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndexes(false);
    return mongoDbChannelMessageStore;
}

MongoDB 通道消息存储

版本 4.0 引入了新的 MongoDbChannelMessageStore。 它是专为 QueueChannel 实例优化的 MessageGroupStore。 使用 priorityEnabled = true,您可以在 <int:priority-queue> 实例中利用它来实现持久化消息的优先级轮询。 优先级的 MongoDB 文档字段由 IntegrationMessageHeaderAccessor.PRIORITY (priority) 消息头填充。spring-doc.cadn.net.cn

此外,所有 MongoDB MessageStore 实例现在都拥有一个用于 MessageGroup 文档的 sequence 字段。 sequence 的值是通过对来自同一集合的简单 sequence 文档执行 $inc 操作所得的结果,该文档按需创建。 sequence 字段用于 poll 操作中,以在消息存储于同一毫秒内时提供先进先出(FIFO)的消息顺序(如果已配置优先级)。spring-doc.cadn.net.cn

我们不建议将同一个MongoDbChannelMessageStore bean 同时用于优先级和非优先级场景,因为priorityEnabled选项适用于整个存储区。 然而,同一个collection可以同时用于这两种MongoDbChannelMessageStore类型,因为从存储区进行的消息轮询是有序的并使用索引。 要配置该场景,您可以让一个消息存储 bean 继承自另一个,如下例所示:
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="store"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

使用 AbstractConfigurableMongoDbMessageStore,自动索引创建已禁用

从版本 6.0.8 开始,AbstractConfigurableMongoDbMessageStore 实现了一个 setCreateIndex(boolean),可用于禁用或启用(默认)自动索引创建。 以下示例展示了如何声明一个 bean 并禁用自动索引创建:spring-doc.cadn.net.cn

@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
    AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndex(false);

    return mongoDbChannelMessageStore;
}

MongoDB 元数据存储

Spring Integration 4.2 引入了一种基于 MongoDB 的新MetadataStore(请参阅元数据存储)实现。 您可以使用MongoDbMetadataStore在应用程序重启之间维护元数据状态。 您可以将此新的MetadataStore实现与以下适配器配合使用:spring-doc.cadn.net.cn

To instruct these adapters to use the new MongoDbMetadataStore, declare a Spring bean with a bean name of metadataStore. The feed inbound channel adapter automatically picks up and use the declared MongoDbMetadataStore. The following example shows how to declare a bean with a name of metadataStore:spring-doc.cadn.net.cn

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
    return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}

The MongoDbMetadataStore 也实现了 ConcurrentMetadataStore,使其能够在多个应用程序实例之间可靠地共享,其中仅允许一个实例存储或修改键的值。 所有这些都是原子操作,得益于 MongoDB 的保障措施。spring-doc.cadn.net.cn

MongoDB 入站通道适配器

MongoDB 入站通道适配器是一个轮询消费者,它从 MongoDB 读取数据并将其作为 Message 有效负载发送。 以下示例展示了如何配置 MongoDB 入站通道适配器:spring-doc.cadn.net.cn

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
       channel="replyChannel"
       query="{'name' : 'Bob'}"
       entity-class="java.lang.Object"
       auto-startup="false">
		<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

如前面的配置所示,您通过使用 inbound-channel-adapter 元素并为其提供各种属性的值来配置 MongoDb 入站通道适配器,例如:spring-doc.cadn.net.cn

您不能同时设置 mongo-templatemongodb-factory

前面的示例相对简单且静态,因为它对 query 使用了字面量值,并对 collection 使用了默认名称。 有时,您可能需要根据某些条件在运行时更改这些值。 为此,请使用它们的 -expression 等价物(query-expressioncollection-name-expression),其中提供的表达式可以是任何有效的 SpEL 表达式。spring-doc.cadn.net.cn

此外,您可能希望对从 MongoDB 成功读取并处理的数据进行一些后处理。 例如;您希望在文档处理完成后将其移动或删除。 您可以使用 Spring Integration 2.2 添加的事务同步功能来实现这一点,如下例所示:spring-doc.cadn.net.cn

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
    channel="replyChannel"
    query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
    entity-class="java.lang.Object"
    auto-startup="false">
        <int:poller fixed-rate="200" max-messages-per-poll="1">
            <int:transactional synchronization-factory="syncFactory"/>
        </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit
        expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
        channel="someChannel"/>
</int:transaction-synchronization-factory>

<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

以下示例展示了前一个示例中引用的 DocumentCleanerspring-doc.cadn.net.cn

public class DocumentCleaner {
    public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
        if (target instanceof List<?> documents){
            for (Object document : documents) {
                mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
            }
        }
    }
}

您可以通过使用transactional元素将您的轮询器声明为事务性的。 该元素可以引用一个真实的事务管理器(例如,如果流程的其他部分调用了JDBC)。 如果您没有“真实”的事务,可以使用o.s.i.transaction.PseudoTransactionManager的实例,它是Spring PlatformTransactionManager的一个实现,并能够在没有实际事务的情况下启用Mongo适配器的事务同步功能。spring-doc.cadn.net.cn

这样做并不会使 MongoDB 本身具有事务性。 它允许在成功(提交)之前或之后,或在失败(回滚)之后执行操作的同步。

一旦您的轮询器变为事务性的,您就可以在transactional元素上设置o.s.i.transaction.TransactionSynchronizationFactory的实例。一个 TransactionSynchronizationFactory 创建 TransactionSynchronization 的实例。为了方便您使用,我们提供了一个默认的基于 SpEL 的 TransactionSynchronizationFactory,它允许您配置 SpEL 表达式,并使其执行与事务协调(同步)。支持在提交前、提交后和回滚后的表达式,并为每个事件提供一个通道,用于发送评估结果(如果有)。对于每个子元素,您可以指定 expressionchannel 属性。如果仅存在channel属性,则接收到的消息将作为特定同步场景的一部分发送到该处。如果仅存在 expression 属性且表达式的结果为非空值,则会生成一条以该结果为有效负载的消息,并将其发送至默认通道(NullChannel),同时出现在日志中(在 DEBUG 级别)。如果您希望评估结果发送到特定通道,请添加 channel 属性。如果表达式的结果为 null 或 void,则不会生成任何消息。spring-doc.cadn.net.cn

有关事务同步的更多信息,请参见 事务同步spring-doc.cadn.net.cn

从 5.5 版本开始,MongoDbMessageSource 可以配置为 updateExpression,该值必须评估为符合 MongoDb update 语法的 String,或为一个 org.springframework.data.mongodb.core.query.Update 实例。 它可以作为上述后处理过程的替代方案,并会修改从集合中获取的实体,使得在下次轮询周期中(假设更新操作更改了查询中使用的某个值)这些实体不会再次从集合中拉取。 当集群中使用多个针对同一集合的 MongoDbMessageSource 实例时,仍建议使用事务来实现执行隔离和数据一致性。spring-doc.cadn.net.cn

MongoDB 变更流入站通道适配器

从 5.3 版本开始,spring-integration-mongodb模块引入了MongoDbChangeStreamMessageProducer——这是针对 Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API 的响应式MessageProducerSupport实现。 该组件默认生成包含ChangeStreamEvent类型负载(payload)的消息Flux,并附带一些变更流相关的头部信息(参见MongoHeaders)。 建议将此MongoDbChangeStreamMessageProducerFluxMessageChannel结合使用,作为下游按需订阅和事件消费的outputChannelspring-doc.cadn.net.cn

此通道适配器的 Java DSL 配置可能如下所示:spring-doc.cadn.net.cn

@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
    return IntegrationFlow.from(
            MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
                    .domainType(Person.class)
                    .collection("person")
                    .extractBody(false))
            .channel(MessageChannels.flux())
            .get();
}

MongoDbChangeStreamMessageProducer 停止,或下游取消订阅,或 MongoDB 变更流产生 OperationType.INVALIDATE 时,Publisher 将完成。 通道适配器可以重新启动,并创建新的源数据 Publisher,它会自动在 MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>) 中订阅。 如果需要在启动之间从其他位置消费变更流事件,则可以在启动之间重新配置此通道适配器以使用新选项。spring-doc.cadn.net.cn

有关 Spring Data MongoDb 中变更流支持的更多信息,请参阅 文档spring-doc.cadn.net.cn

MongoDB 出站通道适配器

MongoDB 出站通道适配器允许您将消息负载写入 MongoDB 文档存储,如下例所示:spring-doc.cadn.net.cn

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
	collection-name="myCollection"
	mongo-converter="mongoConverter"
	mongodb-factory="mongoDbFactory" />

如前所述配置所示,您可以通过使用 outbound-channel-adapter 元素来配置 MongoDB 出站通道适配器,并提供各种属性的值,例如:spring-doc.cadn.net.cn

  • collection-name or collection-name-expression: 标识要使用的 MongoDB 集合的名称。spring-doc.cadn.net.cn

  • mongo-converter: 对 o.s.data.mongodb.core.convert.MongoConverter 实例的引用,该实例协助将原始 Java 对象转换为 JSON 文档表示。spring-doc.cadn.net.cn

  • mongodb-factory: 对 o.s.data.mongodb.MongoDbFactory 实例的引用。spring-doc.cadn.net.cn

  • mongo-template: 对 o.s.data.mongodb.core.MongoTemplate 实例的引用。 注意:您不能同时设置 mongo-template 和 mongodb-factory。spring-doc.cadn.net.cn

  • 其他所有入站适配器通用的属性(如“channel”)。spring-doc.cadn.net.cn

前面的示例相对简单且静态,因为它为 collection-name 提供了字面量值。 有时,您可能需要根据某些条件在运行时更改此值。 为此,请使用 collection-name-expression,其中提供的表达式是任何有效的 SpEL 表达式。spring-doc.cadn.net.cn

MongoDB 出站网关

版本 5.0 引入了 MongoDB 出站网关。 它允许您通过向请求通道发送消息来查询数据库。 然后,网关将响应发送到回复通道。 您可以使用消息负载和头信息指定查询和集合名称,如下例所示:spring-doc.cadn.net.cn

@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory;

    @Autowired
    private MongoConverter;


    @Bean
    public IntegrationFlow gatewaySingleQueryFlow() {
        return f -> f
                .handle(queryOutboundGateway())
                .channel(c -> c.queue("retrieveResults"));
    }

    private MongoDbOutboundGatewaySpec queryOutboundGateway() {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
                .query("{name : 'Bob'}")
                .collectionNameFunction(m -> m.getHeaders().get("collection"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }

}
class MongoDbKotlinApplication {

    fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)

    @Autowired
    lateinit var mongoDbFactory: MongoDatabaseFactory

    @Autowired
    lateinit var mongoConverter: MongoConverter

    @Bean
    fun gatewaySingleQueryFlow() =
    integrationFlow {
        handle(queryOutboundGateway())
        channel { queue("retrieveResults") }
    }

    private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .query("{name : 'Bob'}")
            .collectionNameFunction<Any> { m -> m.headers["collection"] as String }
            .expectSingleResult(true)
            .entityClass(Person::class.java)
    }

}
@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory mongoDbFactory;

    @Bean
    @ServiceActivator(inputChannel = "requestChannel")
    public MessageHandler mongoDbOutboundGateway() {
        MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
        gateway.setCollectionNameExpressionString("'myCollection'");
        gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
        gateway.setExpectSingleResult(true);
        gateway.setEntityClass(Person.class);
        gateway.setOutputChannelName("replyChannel");
        return gateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public MessageHandler handler() {
        return message -> System.out.println(message.getPayload());
    }
}
<int-mongodb:outbound-gateway id="gatewayQuery"
    mongodb-factory="mongoDbFactory"
    mongo-converter="mongoConverter"
    query="{firstName: 'Bob'}"
    collection-name="myCollection"
    request-channel="in"
    reply-channel="out"
    entity-class="org.springframework.integration.mongodb.test.entity$Person"/>

您可以将以下属性与 MongoDB 出站网关配合使用:spring-doc.cadn.net.cn

  • collection-name or collection-name-expression: 标识要使用的 MongoDB 集合的名称。spring-doc.cadn.net.cn

  • mongo-converter: 对 o.s.data.mongodb.core.convert.MongoConverter 实例的引用,该实例协助将原始 Java 对象转换为 JSON 文档表示。spring-doc.cadn.net.cn

  • mongodb-factory: 对 o.s.data.mongodb.MongoDbFactory 实例的引用。spring-doc.cadn.net.cn

  • mongo-template: 引用 o.s.data.mongodb.core.MongoTemplate 的实例。 注意:您不能同时设置 mongo-templatemongodb-factoryspring-doc.cadn.net.cn

  • entity-class: 要传递给 MongoTemplate 中 find(..)findOne(..) 方法的实体类的完全限定名。 如果未提供此属性,则默认值为 org.bson.Documentspring-doc.cadn.net.cn

  • queryquery-expression:指定 MongoDB 查询。 有关更多查询示例,请参阅 MongoDB 文档spring-doc.cadn.net.cn

  • collection-callback: 对 org.springframework.data.mongodb.core.CollectionCallback 实例的引用。 自 5.0.11 版本起,建议使用 o.s.i.mongodb.outbound.MessageCollectionCallback 的实例,因为它支持请求消息上下文。 请参见其 Javadocs 以获取更多信息。 注意:您不能同时拥有 collection-callback 和任何查询属性。spring-doc.cadn.net.cn

作为 queryquery-expression 属性的替代方案,您可以使用 collectionCallback 属性引用 MessageCollectionCallback 函数式接口的实现来指定其他数据库操作。 以下示例指定了一个计数操作:spring-doc.cadn.net.cn

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
    return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .collectionCallback((collection, requestMessage) -> collection.count())
            .collectionName("myCollection");
}

MongoDB 响应式通道适配器

从 5.3 版本开始,提供了 ReactiveMongoDbStoringMessageHandlerReactiveMongoDbMessageSource 实现。 它们基于 Spring Data 中的 ReactiveMongoOperations,并需要 org.mongodb:mongodb-driver-reactivestreams 依赖。spring-doc.cadn.net.cn

The ReactiveMongoDbStoringMessageHandlerReactiveMessageHandler 的一个实现,该实现在响应式流组合涉及集成流定义时由框架原生支持。 有关更多信息,请参阅 ReactiveMessageHandlerspring-doc.cadn.net.cn

从配置角度来看,它与许多其他标准通道适配器没有区别。 例如,使用 Java DSL 时,这样的通道适配器可以这样使用:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return f -> f
            .channel(MessageChannels.flux())
            .handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}

在此示例中,我们将通过提供的 ReactiveMongoDatabaseFactory 连接到 MongoDB,并使用名为 data 的默认集合存储来自请求消息的数据。 实际操作将在内部创建的 ReactiveStreamsConsumer 中,通过响应式流组合按需执行。spring-doc.cadn.net.cn

The ReactiveMongoDbMessageSource is an AbstractMessageSource implementation based on the provided ReactiveMongoDatabaseFactory or ReactiveMongoOperations and MongoDb query (or expression), calls find() or findOne() operation according an expectSingleResult option with an expected entityClass type to convert a query result. A query execution and result evaluation is performed on demand when Publisher (Flux or Mono according expectSingleResult option) in the payload of produced message is subscribed. The framework can subscribe to such a payload automatically (essentially flatMap) when splitter and FluxMessageChannel are used downstream. Otherwise, it is target application responsibility to subscribe into a polled publishers in downstream endpoints.spring-doc.cadn.net.cn

使用 Java DSL,这样的通道适配器可以配置如下:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return IntegrationFlow
            .from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
                            .entityClass(Person.class),
                    c -> c.poller(Pollers.fixedDelay(1000)))
            .split()
            .channel(c -> c.flux("output"))
            .get();
}

从版本 5.5 开始,ReactiveMongoDbMessageSource 可以使用 updateExpression 进行配置。 它具有与阻塞式 MongoDbMessageSource 相同的功能。 有关更多信息,请参阅 MongoDB 入站通道适配器AbstractMongoDbMessageSourceSpec JavaDocs。spring-doc.cadn.net.cn