如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

JDBC 消息存储

Spring Integration 提供了两种特定的 JDBC 消息存储实现。 JdbcMessageStore 适用于与聚合器和检查点模式一起使用。 JdbcChannelMessageStore 实现提供了一种更具针对性且可扩展的实现,专门用于消息通道。spring-doc.cadn.net.cn

请注意,您可以使用 JdbcMessageStore 来备份消息通道,JdbcChannelMessageStore 是为此目的而优化的。spring-doc.cadn.net.cn

从版本 5.0.11、5.1.2 开始,JdbcChannelMessageStore 的索引已得到优化。 如果您在这样的存储中有大量消息组,可能需要调整这些索引。 此外,PriorityChannel 的索引已被注释掉,因为除非您使用的是由 JDBC 支持的此类通道,否则不需要它。
当使用 OracleChannelMessageStoreQueryProvider 时,必须添加优先级通道索引 必须,因为它包含在查询的提示中。

初始化数据库

在使用 JDBC 消息存储组件之前,您应该先配置目标数据库并创建相应的对象。spring-doc.cadn.net.cn

Spring Integration 附带了一些示例脚本,可用于初始化数据库。 在 spring-integration-jdbc JAR 文件中,您可以在 org.springframework.integration.jdbc 包中找到这些脚本。 它为多种常见的数据库平台提供了示例创建脚本和示例删除脚本。 使用这些脚本的一种常见方式是在 Spring JDBC 数据源初始化器 中引用它们。 请注意,这些脚本仅提供为示例,并作为所需表名和列名的规范。 您可能会发现需要对其进行增强以用于生产环境(例如,通过添加索引声明)。spring-doc.cadn.net.cn

从版本 6.2 开始,JdbcMessageStoreJdbcChannelMessageStoreJdbcMetadataStoreDefaultLockRepository实现了SmartLifecycle,并在其各自的表上执行`SELECT COUNT`查询,在start()方法中确保根据提供的目标数据库中存在所需的表(前缀)。 如果所需的表不存在,则应用程序上下文无法启动。 可以通过setCheckDatabaseOnStart(false)禁用此检查。spring-doc.cadn.net.cn

通用 JDBC 消息存储

JDBC 模块提供了基于数据库的 Spring Integration MessageStore(在检查点模式中很重要)和 MessageGroupStore(在有状态模式如聚合器中很重要)的实现。 这两个接口均由 JdbcMessageStore 实现,并支持在 XML 中配置存储实例,如下例所示:spring-doc.cadn.net.cn

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

您可以指定一个 JdbcTemplate 而不是一个 DataSourcespring-doc.cadn.net.cn

以下示例展示了一些其他可选属性:spring-doc.cadn.net.cn

<int-jdbc:message-store id="messageStore" data-source="dataSource" table-prefix="MY_INT_"/>

在上面的示例中,我们指定了由 store 生成的查询中表名的前缀。 表名前缀默认为 INT_spring-doc.cadn.net.cn

支持消息通道

如果您打算使用 JDBC 来后端消息通道,我们推荐使用 JdbcChannelMessageStore 实现。 它仅与消息通道配合使用。spring-doc.cadn.net.cn

支持的数据库

The JdbcChannelMessageStore 使用特定于数据库的 SQL 查询从数据库中检索消息。 因此,您必须在 JdbcChannelMessageStore 上设置 ChannelMessageStoreQueryProvider 属性。 此 channelMessageStoreQueryProvider 为您提供指定的特定数据库的 SQL 查询。 Spring Integration 支持以下关系型数据库:spring-doc.cadn.net.cn

如果您的数据库未在列表中列出,您可以实现 ChannelMessageStoreQueryProvider 接口并提供您自己的自定义查询。spring-doc.cadn.net.cn

版本 4.0 在表格中添加了 MESSAGE_SEQUENCE 列,以确保即使消息存储在同一毫秒内也能实现先进先出(FIFO)队列。spring-doc.cadn.net.cn

从 6.2 版本开始,ChannelMessageStoreQueryProvider 暴露了一个 isSingleStatementForPoll 标志,其中 PostgresChannelMessageStoreQueryProvider 返回 true,并且其投票查询现在基于单个 DELETE…​RETURNING 语句。 The JdbcChannelMessageStore 会参考 isSingleStatementForPoll 选项,如果仅支持单个投票语句,则跳过单独的 DELETE 语句。spring-doc.cadn.net.cn

自定义消息插入

自 5.0 版本起,通过重载 ChannelMessageStorePreparedStatementSetter 类,您可以为 JdbcChannelMessageStore 中的消息插入提供自定义实现。 您可以使用它来设置不同的列、更改表结构或序列化策略。 例如,除了默认的 byte[] 序列化外,您还可以将其结构存储为 JSON 字符串。spring-doc.cadn.net.cn

以下示例使用 setValues 的默认实现来存储公共列,并覆盖其行为以将消息负载存储为 varcharspring-doc.cadn.net.cn

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}

通常,我们不建议使用关系型数据库进行消息队列。相反,如果可能,请考虑使用基于 JMS 或 AMQP 的通道。有关更多参考,请参阅以下资源:spring-doc.cadn.net.cn

如果您仍计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,该机制将在 后续章节 中介绍。spring-doc.cadn.net.cn

并发轮询

轮询消息通道时,您可以选择将关联的 Poller 配置为具有 TaskExecutor 引用。spring-doc.cadn.net.cn

不过请记住,如果您使用基于 JDBC 的消息通道,并且计划以多个线程轮询该通道(从而事务性地轮询消息存储),您应确保使用支持多版本并发控制(MVCC)的关系型数据库。 否则,锁可能成为问题,在使用多个线程时,性能可能无法达到预期效果。 例如,Apache Derby 在这方面就存在问题。spring-doc.cadn.net.cn

为了获得更好的 JDBC 队列吞吐量,并避免在不同线程可能从队列中轮询相同的 Message 时出现问题,在使用不支持 MVCC 的数据库时,重要的是将 JdbcChannelMessageStoreusingIdCache 属性设置为 true。 以下示例展示了如何操作:spring-doc.cadn.net.cn

<bean id="queryProvider"
    class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
    queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="TX_TIMEOUT"/>
    <property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
    <int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
    <int:poller fixed-delay="500" receive-timeout="500"
        max-messages-per-poll="1" task-executor="pool">
        <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
        isolation="READ_COMMITTED" transaction-manager="transactionManager" />
    </int:poller>
</int:bridge>

<int:channel id="outputChannel" />

优先通道

从版本 4.0 开始,JdbcChannelMessageStore 实现了 PriorityCapableChannelMessageStore 并提供了 priorityEnabled 选项,使其可用作 message-store 引用以指向 priority-queue 实例。 为此,INT_CHANNEL_MESSAGE 表包含一个 MESSAGE_PRIORITY 列,用于存储 PRIORITY 消息头的值。 此外,新增的 MESSAGE_SEQUENCE 列使我们能够实现健壮的先进先出(FIFO)轮询机制,即使在同一毫秒内将具有相同优先级的多条消息进行存储。 消息通过 order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE 从数据库中被轮询(选取)出来。spring-doc.cadn.net.cn

我们不推荐使用同一个 JdbcChannelMessageStore bean 同时用于优先级队列通道和非优先级队列通道,因为 priorityEnabled 选项适用于整个存储,且无法为队列通道保留正确的 FIFO 队列语义。 然而,同一个 INT_CHANNEL_MESSAGE 表(甚至 region)可以用于这两种 JdbcChannelMessageStore 类型。 要配置该场景,您可以让一个消息存储 bean 继承自另一个,如下例所示:
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

分区消息存储

在同一个应用或同一应用的多个节点中,通常使用JdbcMessageStore作为全局存储来存放一组应用程序或节点的数据。 为了在一定程度上防止名称冲突并控制数据库元数据配置,消息存储支持以两种方式进行表分区。 一种方式是通过更改前缀(如前面所述)来使用独立的表名。 另一种方式是为单个表内的数据分区指定一个region名称。 第二种方法的一个重要用例是当MessageStore管理为Spring Integration消息通道提供支持的持久化队列时。 持久化通道的消息数据以通道名称为键存储在存储中。 因此,如果通道名称不是全局唯一的,这些通道可能会获取到本不属于它们的数据。 为了避免这种风险,您可以使用消息存储的region功能,将具有相同逻辑名称但不同物理通道的数据进行隔离。spring-doc.cadn.net.cn

PostgreSQL: 接收推送通知

PostgreSQL 提供了一个监听和通知框架,用于在数据库表发生操作时接收推送通知。 Spring Integration(从 6.0 版本开始)利用此机制,允许在将新消息添加到 JdbcChannelMessageStore 时接收推送通知。 使用此功能时,必须定义一个数据库触发器,该触发器可包含在 Spring Integration JDBC 模块中包含的 schema-postgresql.sql 文件的注释中。spring-doc.cadn.net.cn

推送通知通过 PostgresChannelMessageTableSubscriber 类接收,该类允许其订阅者在针对任意给定的 regiongroupId 有新消息到达时收到回调。 即使消息是在不同的 JVM 中追加到同一个数据库的,也会接收到这些通知。 PostgresSubscribableChannel 实现使用 PostgresChannelMessageTableSubscriber.Subscription 契约,作为对上述 PostgresChannelMessageTableSubscriber 通知的反应,从存储中拉取消息。spring-doc.cadn.net.cn

例如,可以按以下方式接收some group的推送通知:spring-doc.cadn.net.cn

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
    messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
    return messageStore;
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(
      @Value("${spring.datasource.url}") String url,
      @Value("${spring.datasource.username}") String username,
      @Value("${spring.datasource.password}") String password) {
    return new PostgresChannelMessageTableSubscriber(() ->
        DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}

@Bean
public PostgresSubscribableChannel channel(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore) {
  return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}

从版本 6.0.5 开始,在 PlatformTransactionManager 上指定 PostgresSubscribableChannel 将在事务中通知订阅者。 订阅者中的异常将导致事务回滚,并将消息放回消息存储区。 默认情况下不会激活事务支持。spring-doc.cadn.net.cn

从版本 6.0.5 开始,可以通过向 PostgresSubscribableChannel 提供 RetryTemplate 来指定重试策略。 默认情况下,不会执行任何重试操作。spring-doc.cadn.net.cn

任何活动的 PostgresChannelMessageTableSubscriber 在其整个活跃生命周期内都会独占一个 JDBC Connection。 因此,该连接不应源自连接池 DataSource。 此类连接池通常期望已发放的连接在预定义的超时窗口内被关闭。spring-doc.cadn.net.cn

为了满足独占连接的需求,还建议一个 JVM 只运行单个 PostgresChannelMessageTableSubscriber,该实例可用于注册任意数量的订阅。spring-doc.cadn.net.cn