JDBC Channel 消息存储 JSON 序列化
版本 7.0 为 JdbcChannelMessageStore 引入了 JSON 序列化支持。
默认情况下,Spring Integration 使用 Java 序列化将消息存储在数据库中。
新的 JSON 序列化选项提供了一种替代的序列化机制。
|
安全考虑:JSON 序列化将消息内容以文本形式存储在数据库中,这可能会暴露敏感数据。 在使用 JSON 序列化之前,请确保实施适当的数据库访问控制、静态数据加密,并评估您组织的数据保护要求,特别是在生产环境中。 |
配置
两个组件可用于 JSON (反)序列化:
-
JsonChannelMessageStorePreparedStatementSetter- 将消息序列化为 JSON -
JsonMessageRowMapper- 从 JSON 反序列化消息
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
store.setChannelMessageStoreQueryProvider(
new PostgresChannelMessageStoreQueryProvider());
// Enable JSON serialization
store.setPreparedStatementSetter(
new JsonChannelMessageStorePreparedStatementSetter());
store.setMessageRowMapper(
new JsonMessageRowMapper("com.example"));
return store;
}
字符串参数 ("com.example") 指定用于反序列化的额外受信任包。
这些包将追加到默认受信任包中(参见 受信任包 部分)。
出于安全考虑,仅允许从受信任包中的类进行反序列化。
数据库架构修改
|
JSON 序列化需要修改数据库架构。
默认的具有 |
第 MESSAGE_CONTENT 列必须更改为可存储 JSON 的文本类型。
-
PostgreSQL
-
MySQL
-
H2
-
Other Databases
对于 PostgreSQL,可以使用 JSONB 类型。
-- JSONB (enables JSON queries)
ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT TYPE JSONB
USING MESSAGE_CONTENT::text::jsonb;
对于 MySQL,可以使用 JSON 类型。
-- JSON type (enables JSON functions)
ALTER TABLE INT_CHANNEL_MESSAGE
MODIFY COLUMN MESSAGE_CONTENT JSON;
对于 H2 数据库,可以使用 CLOB 类型。
ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT CLOB;
对于支持大文本列(CLOB、TEXT 等)的任何数据库,可以将 MESSAGE_CONTENT 列修改为适当的文本类型。
JSON 序列化的示例模式
以下示例演示了如何为基于 JSON 的消息存储创建专用表。
-
PostgreSQL
-
MySQL
-
H2
CREATE TABLE JSON_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('JSON_MESSAGE_SEQ'),
MESSAGE_CONTENT JSONB, -- JSON message content
REGION VARCHAR(100) NOT NULL,
CONSTRAINT JSON_CHANNEL_MESSAGE_PK
PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL AUTO_INCREMENT UNIQUE,
MESSAGE_CONTENT JSON, -- JSON message content
REGION VARCHAR(100) NOT NULL,
CONSTRAINT JSON_CHANNEL_MESSAGE_PK
PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT NEXT VALUE FOR JSON_MESSAGE_SEQ,
MESSAGE_CONTENT CLOB, -- JSON message content
REGION VARCHAR(100) NOT NULL,
CONSTRAINT JSON_CHANNEL_MESSAGE_PK
PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
JSON 结构
使用基于 Jackson 的序列化时,消息会使用 Jackson 的多态类型处理功能以以下 JSON 结构进行存储:
{
"@class": "org.springframework.messaging.support.GenericMessage",
"payload": {
"@class": "com.example.OrderMessage",
"orderId": "ORDER-12345",
"amount": 1299.99
},
"headers": {
"@class": "java.util.HashMap",
"priority": ["java.lang.String", "HIGH"],
"id": ["java.util.UUID", "a1b2c3d4-..."],
"timestamp": ["java.lang.Long", 1234567890]
}
}
The @class 属性提供了多态类型正确反序列化所需的类型信息。
查询 JSON 内容(可选)
如果使用原生 JSON 列类型(PostgreSQL JSONB 或 MySQL JSON),则可以直接查询消息内容。
PostgreSQL JSONB 查询
-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT @> '{"payload": {"orderId": "ORDER-12345"}}';
-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT -> 'headers' @> '{"priority": ["java.lang.String", "HIGH"]}';
MySQL JSON 函数
-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.payload.orderId') = 'ORDER-12345';
-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.headers.priority[1]') = 'HIGH';
|
如果使用了 |
可信赖的软件包
The JacksonMessagingUtils.messagingAwareMapper() 将所有反序列化的类与受信任的包列表进行验证,以防止安全漏洞。
默认信任的包包括:
- java.util
- java.lang
- org.springframework.messaging.support
- org.springframework.integration.support
- org.springframework.integration.message
- org.springframework.integration.store
- org.springframework.integration.history
- org.springframework.integration.handler
额外的包可以在构造函数中指定,并追加到此列表中:
// Trust additional packages for custom payload types
new JsonMessageRowMapper("com.example.orders", "com.example.payments")
自定义 JsonObjectMapper
对于高级场景,可以提供一个自定义的JsonObjectMapper:
import org.springframework.integration.support.json.JacksonJsonObjectMapper;
import org.springframework.integration.support.json.JacksonMessagingUtils;
import tools.jackson.databind.ObjectMapper;
import tools.jackson.databind.SerializationFeature;
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
ObjectMapper customMapper = JacksonMessagingUtils.messagingAwareMapper("com.example");
customMapper.enable(SerializationFeature.INDENT_OUTPUT);
customMapper.registerModule(new CustomModule());
JacksonJsonObjectMapper jsonObjectMapper = new JacksonJsonObjectMapper(customMapper);
JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
store.setPreparedStatementSetter(
new JsonChannelMessageStorePreparedStatementSetter(jsonObjectMapper));
store.setMessageRowMapper(
new JsonMessageRowMapper(jsonObjectMapper));
return store;
}
|
自定义的 |