|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
消息存储
《企业集成模式》(EIP)一书指出了几种能够缓冲消息的模式。
例如,聚合器会缓冲消息直到它们可以被释放,而 QueueChannel 会缓冲消息直到消费者显式地从该通道接收这些消息。
由于消息流中的任何位置都可能发生故障,因此缓冲消息的 EIP 组件也会引入消息可能丢失的风险点。
为了降低消息丢失的风险,EIP 定义了 消息存储 模式,该模式允许 EIP 组件将消息存储在某种持久化存储中(例如 RDBMS)。
Spring Integration 通过以下方式支持消息存储模式:
-
定义
org.springframework.integration.store.MessageStore策略接口 -
提供该接口的多种实现
-
在所有具备消息缓冲能力的组件上公开一个
message-store属性,以便您可以注入任何实现MessageStore接口的实例。
有关如何配置特定的消息存储实现以及如何将MessageStore实现注入到特定缓冲组件的详细信息,请参见手册中的相关部分(例如QueueChannel、Aggregator、Delayer等具体组件)。
以下示例对展示了如何为QueueChannel和聚合器添加对消息存储的引用:
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator message-store="refToMessageStore"/>
默认情况下,消息通过使用 o.s.i.store.SimpleMessageStore(即 MessageStore 的一个实现)存储在内存中。
对于开发环境或简单的低流量环境,如果非持久化消息的潜在丢失不是问题,这可能已经足够。
然而,典型的生产应用程序需要更稳健的选项,不仅要降低消息丢失的风险,还要避免潜在的内存溢出错误。
因此,我们还为各种数据存储提供了 MessageStore 的实现。
以下是支持的实现的完整列表:
-
Hazelcast 消息存储:使用 Hazelcast 分布式缓存来存储消息
-
JDBC 消息存储:使用关系型数据库管理系统(RDBMS)存储消息
-
Redis 消息存储:使用 Redis 键/值数据存储来存储消息
-
MongoDB 消息存储:使用 MongoDB 文档存储来保存消息
|
然而,在使用 消息数据(负载和标头)根据 请特别注意代表特定类型数据的标头。
例如,如果其中一个标头包含某个 Spring Bean 的实例,在反序列化时,您可能会得到该 Bean 的不同实例,这将直接影响框架创建的一些隐式标头(如 从 Spring Integration 3.0 版本开始,您可以通过配置一个头部增强器来解决此问题,该增强器在将通道注册到 此外,请考虑当您将消息流配置为以下形式时会发生什么:gateway → queue-channel(由持久化消息存储支持)→ service-activator。
该网关会创建一个临时回复通道,但在 service-activator 的轮询器从队列读取时,该通道已丢失。
同样,您可以使用头信息增强器将头信息替换为 有关更多信息,请参阅 Header Enricher。 |
Spring Integration 4.0 引入了两个新接口:
-
ChannelMessageStore: 为QueueChannel实例实现特定操作 -
PriorityCapableChannelMessageStore: 用于标记MessageStore实现,以便为PriorityChannel实例使用,并为持久化消息提供优先级顺序。
实际行为取决于具体实现。
该框架提供以下实现,可用作 MessageStore、QueueChannel 和 PriorityChannel 的持久化存储:
|
关于
SimpleMessageStore 的注意从版本 4.1 开始, 现在,在聚合器等组件之外访问组存储的用户将直接获得聚合器所使用的组的引用,而不是副本。 在聚合器之外操作该组可能会导致不可预测的结果。 出于此原因,您要么不应执行此类操作,要么将 |
使用MessageGroupFactory
从版本 4.3 开始,某些 MessageGroupStore 实现可以注入自定义的 MessageGroupFactory 策略,以创建和定制 MessageGroupStore 所使用的 MessageGroup 实例。
默认情况下使用 SimpleMessageGroupFactory,它基于 GroupType.HASH_SET(LinkedHashSet)内部集合生成 SimpleMessageGroup 实例。
其他可选方案包括 SYNCHRONISED_SET 和 BLOCKING_QUEUE,其中后者可用于恢复之前的 SimpleMessageGroup 行为。
此外,还提供了 PERSISTENT 选项。
更多信息请参阅下一节。
从版本 5.0.1 开始,当组内消息的顺序和唯一性不重要时,也可使用 LIST 选项。
持久的MessageGroupStore和延迟加载
从 4.3 版本开始,所有持久化的 MessageGroupStore 实例将以惰性加载方式从存储中检索 MessageGroup 实例及其 messages。
在大多数情况下,对于关联的 MessageHandler 实例(请参阅 聚合器 和 重排序器),这种方式非常有用,因为每次关联操作都从存储中加载整个 MessageGroup 会带来额外开销。
您可以使用 AbstractMessageGroupStore.setLazyLoadMessageGroups(false) 选项从配置中关闭延迟加载行为。
我们对 MongoDB 的延迟加载进行的性能测试 MessageStore(MongoDB 消息存储)和 <aggregator>(聚合器)使用了类似于以下的自定义 release-strategy:
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
对于 1000 条简单消息,它会产生类似以下结果:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
然而,从版本 5.5 开始,所有持久的 MessageGroupStore 实现都提供了基于目标数据库流式 API 的 streamMessagesForGroup(Object groupId) 契约。
当存储中的组非常大时,这提高了资源利用率。
在框架内部,该新 API 被用于 Delayer(例如)在启动时重新调度持久化消息时。
返回的 Stream<Message<?>> 必须在处理结束时关闭,例如通过 try-with-resources 的自动关闭。
每当使用 PersistentMessageGroup 时,其 streamMessages() 会委托给 MessageGroupStore.streamMessagesForGroup()。
消息组条件
从版本 5 开始。5,MessageGroup 抽象提供了一个 condition 字符串选项。该选项的值可以是任何内容,以便后续根据需要进行解析,从而为分组做出决策。例如,来自关联消息处理器的ReleaseStrategy可以从组中查询此属性,而不是遍历组中的所有消息。MessageGroupStore 公开了一个 setGroupCondition(Object groupId, String condition) API。为此,已在AbstractCorrelatingMessageHandler中添加了setGroupConditionSupplier(BiFunction<Message<?>, String, String>)选项。此函数在消息添加到组后以及组的现有条件下进行评估。实现可以决定返回一个新值、现有值,或将目标条件重置为 null。condition 的值可以是一个 JSON、SpEL 表达式、数字,或任何可被序列化为字符串并在之后进行解析的内容。例如,来自 文件聚合器 组件的 FileMarkerReleaseStrategy,会从 FileSplitter.FileMarker.Mark.END 消息的 FileHeaders.LINE_COUNT 标头中填充一个条件到某个组,并通过将其与组大小进行比较来咨询该条件中的值。这样它就不会遍历组中的所有消息来查找带有 FileHeaders.LINE_COUNT 头的 FileSplitter.FileMarker.Mark.END 消息。它允许结束标记在聚合器中比其他记录更早到达;例如,当在多线程环境中处理文件时。
此外,为了配置方便,引入了 GroupConditionProvider 契约。
AbstractCorrelatingMessageHandler 检查提供的 ReleaseStrategy 是否实现了该接口,并提取一个 conditionSupplier 用于组条件评估逻辑。