JDBC Channel 消息存储 JSON 序列化

版本 7.0 为 JdbcChannelMessageStore 引入了 JSON 序列化支持。 默认情况下,Spring Integration 使用 Java 序列化将消息存储在数据库中。 新的 JSON 序列化选项提供了一种替代的序列化机制。spring-doc.cadn.net.cn

安全考虑:JSON 序列化将消息内容以文本形式存储在数据库中,这可能会暴露敏感数据。 在使用 JSON 序列化之前,请确保实施适当的数据库访问控制、静态数据加密,并评估您组织的数据保护要求,特别是在生产环境中。spring-doc.cadn.net.cn

配置

两个组件可用于 JSON (反)序列化:spring-doc.cadn.net.cn

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
    store.setChannelMessageStoreQueryProvider(
        new PostgresChannelMessageStoreQueryProvider());

    // Enable JSON serialization
    store.setPreparedStatementSetter(
        new JsonChannelMessageStorePreparedStatementSetter());
    store.setMessageRowMapper(
        new JsonMessageRowMapper("com.example"));

    return store;
}

字符串参数 ("com.example") 指定用于反序列化的额外受信任包。 这些包将追加到默认受信任包中(参见 受信任包 部分)。 出于安全考虑,仅允许从受信任包中的类进行反序列化。spring-doc.cadn.net.cn

数据库架构修改

JSON 序列化需要修改数据库架构。 默认的具有BLOB/BYTEA列类型的架构无法用于 JSON 序列化。spring-doc.cadn.net.cn

MESSAGE_CONTENT 列必须更改为可存储 JSON 的文本类型。spring-doc.cadn.net.cn

对于 PostgreSQL,可以使用 JSONB 类型。spring-doc.cadn.net.cn

-- JSONB (enables JSON queries)
ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT TYPE JSONB
USING MESSAGE_CONTENT::text::jsonb;

对于 MySQL,可以使用 JSON 类型。spring-doc.cadn.net.cn

-- JSON type (enables JSON functions)
ALTER TABLE INT_CHANNEL_MESSAGE
MODIFY COLUMN MESSAGE_CONTENT JSON;

对于 H2 数据库,可以使用 CLOB 类型。spring-doc.cadn.net.cn

ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT CLOB;

对于支持大文本列(CLOB、TEXT 等)的任何数据库,可以将 MESSAGE_CONTENT 列修改为适当的文本类型。spring-doc.cadn.net.cn

JSON 序列化的示例模式

以下示例演示了如何为基于 JSON 的消息存储创建专用表。spring-doc.cadn.net.cn

CREATE TABLE JSON_CHANNEL_MESSAGE (
   MESSAGE_ID CHAR(36) NOT NULL,
   GROUP_KEY CHAR(36) NOT NULL,
   CREATED_DATE BIGINT NOT NULL,
   MESSAGE_PRIORITY BIGINT,
   MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('JSON_MESSAGE_SEQ'),
   MESSAGE_CONTENT JSONB, -- JSON message content
   REGION VARCHAR(100) NOT NULL,
   CONSTRAINT JSON_CHANNEL_MESSAGE_PK
      PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
   MESSAGE_ID CHAR(36) NOT NULL,
   GROUP_KEY CHAR(36) NOT NULL,
   CREATED_DATE BIGINT NOT NULL,
   MESSAGE_PRIORITY BIGINT,
   MESSAGE_SEQUENCE BIGINT NOT NULL AUTO_INCREMENT UNIQUE,
   MESSAGE_CONTENT JSON, -- JSON message content
   REGION VARCHAR(100) NOT NULL,
   CONSTRAINT JSON_CHANNEL_MESSAGE_PK
      PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
   MESSAGE_ID CHAR(36) NOT NULL,
   GROUP_KEY CHAR(36) NOT NULL,
   CREATED_DATE BIGINT NOT NULL,
   MESSAGE_PRIORITY BIGINT,
   MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT NEXT VALUE FOR JSON_MESSAGE_SEQ,
   MESSAGE_CONTENT CLOB, -- JSON message content
   REGION VARCHAR(100) NOT NULL,
   CONSTRAINT JSON_CHANNEL_MESSAGE_PK
      PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);

JSON 结构

使用基于 Jackson 的序列化时,消息会使用 Jackson 的多态类型处理功能以以下 JSON 结构进行存储:spring-doc.cadn.net.cn

{
  "@class": "org.springframework.messaging.support.GenericMessage",
  "payload": {
    "@class": "com.example.OrderMessage",
    "orderId": "ORDER-12345",
    "amount": 1299.99
  },
  "headers": {
    "@class": "java.util.HashMap",
    "priority": ["java.lang.String", "HIGH"],
    "id": ["java.util.UUID", "a1b2c3d4-..."],
    "timestamp": ["java.lang.Long", 1234567890]
  }
}

The @class 属性提供了多态类型正确反序列化所需的类型信息。spring-doc.cadn.net.cn

查询 JSON 内容(可选)

如果使用原生 JSON 列类型(PostgreSQL JSONB 或 MySQL JSON),则可以直接查询消息内容。spring-doc.cadn.net.cn

PostgreSQL JSONB 查询

-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT @> '{"payload": {"orderId": "ORDER-12345"}}';

-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT -> 'headers' @> '{"priority": ["java.lang.String", "HIGH"]}';

MySQL JSON 函数

-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.payload.orderId') = 'ORDER-12345';

-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.headers.priority[1]') = 'HIGH';

如果使用了 TEXTCLOB 列类型,则这些 JSON 特定的查询不可用。 不过,通过 Spring Integration 进行存储和检索时,JSON 序列化仍然有效。spring-doc.cadn.net.cn

可信赖的软件包

The JacksonMessagingUtils.messagingAwareMapper() 将所有反序列化的类与受信任的包列表进行验证,以防止安全漏洞。spring-doc.cadn.net.cn

默认信任的包包括: - java.util - java.lang - org.springframework.messaging.support - org.springframework.integration.support - org.springframework.integration.message - org.springframework.integration.store - org.springframework.integration.history - org.springframework.integration.handlerspring-doc.cadn.net.cn

额外的包可以在构造函数中指定,并追加到此列表中:spring-doc.cadn.net.cn

// Trust additional packages for custom payload types
new JsonMessageRowMapper("com.example.orders", "com.example.payments")

自定义 JsonObjectMapper

对于高级场景,可以提供一个自定义的JsonObjectMapperspring-doc.cadn.net.cn

import org.springframework.integration.support.json.JacksonJsonObjectMapper;
import org.springframework.integration.support.json.JacksonMessagingUtils;
import tools.jackson.databind.ObjectMapper;
import tools.jackson.databind.SerializationFeature;

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    ObjectMapper customMapper = JacksonMessagingUtils.messagingAwareMapper("com.example");
    customMapper.enable(SerializationFeature.INDENT_OUTPUT);
    customMapper.registerModule(new CustomModule());

    JacksonJsonObjectMapper jsonObjectMapper = new JacksonJsonObjectMapper(customMapper);

    JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
    store.setPreparedStatementSetter(
        new JsonChannelMessageStorePreparedStatementSetter(jsonObjectMapper));
    store.setMessageRowMapper(
        new JsonMessageRowMapper(jsonObjectMapper));

    return store;
}

自定义的 JsonObjectMapper 应根据 Spring Integration 消息序列化要求进行适当配置。 建议从 JacksonMessagingUtils.messagingAwareMapper() 开始,并在此基础上进行自定义。 为确保序列化和反序列化的一致性,JsonChannelMessageStorePreparedStatementSetterJsonMessageRowMapper 中必须使用相同的配置。spring-doc.cadn.net.cn