|
对于最新稳定版本,请使用 Spring Integration 7.0.0! |
消息存储
《企业集成模式(EIP)》一书指出了若干能够缓冲消息的模式。
例如,聚合器会缓冲消息直到可以发布,以及队列通道在用户明确接收该信道的消息之前,会缓冲消息。
由于消息流中任何节点都可能发生故障,缓冲消息的EIP组件也引入了一个消息丢失的点。
为了降低消息丢失的风险,EIP定义了消息存储模式,允许EIP组件存储消息,通常存储在某种持久存储(如关系数据库管理系统)中。
Spring 集成通过以下方式支持消息存储模式:
-
定义一个
org.springframework.integration.store.MessageStore策略界面 -
提供该接口的多个实现
-
暴露一个
消息存储属性,所有组件都具备缓冲消息的能力,这样你可以注入任何实现消息商店接口。
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator message-store="refToMessageStore"/>
默认情况下,消息通过以下方式存储在内存中o.s.i.store.SimpleMessageStore,是 的实现消息商店.
这对开发或简单低流量环境来说可能没问题,因为这些环境不会担心非持久性消息的丢失。
然而,典型的生产应用需要更强大的选项,不仅为了降低消息丢失的风险,还能避免潜在的内存不足错误。
因此,我们也提供消息商店为多种数据存储实现。
以下是支持的完整实现列表:
-
Hazelcast消息存储:使用Hazelcast分布式缓存存储消息
-
JDBC消息存储:使用关系数据库管理系统(RDBMS)来存储消息
-
Redis 消息存储:使用 Redis 键值数据存储来存储消息
-
MongoDB 消息存储:使用 MongoDB 文档存储来存储消息
|
然而,在使用持久实现时需要注意一些限制 消息数据(有效载荷和头部)根据不同实现方式,通过不同的序列化策略进行序列化和反序列化 特别注意代表特定类型数据的头部。
例如,如果某个头包含某个 Spring Bean 的实例,反序列化时,你可能会得到该 Bean 的不同实例,这直接影响框架创建的一些隐式头部(例如 从 Spring Integration 3.0 版本开始,你可以通过配置一个头部丰富器来解决这个问题,该填充器在注册通道后用名称替换这些头部 此外,考虑当你配置消息流时会发生什么:网关→队列通道(由持久消息存储支持)→服务激活器。
该网关创建一个临时回复通道,但当服务激活器的轮询器读取队列时,该通道会丢失。
同样,你也可以用标题增强器替换标题,用 更多信息请参见“头部增益器”。 |
Spring Integration 4.0引入了两个新接口:
-
频道信息存储: 以实现特定的作队列通道实例 -
优先能力通道消息存储:标记消息商店将用于的实现优先频道实例,并为持久消息提供优先级排序。
真实行为取决于实现方式。
该框架提供了以下实现,可作为持久化使用消息商店为队列通道和优先频道:
|
注意事项
简易消息商店从4.1版本开始, 在组件(如聚合器)之外访问组存储的用户,现在直接引用聚合器正在使用的组,而非副本。 对聚合器外部群体的控可能导致不可预测的结果。 因此,你要么不进行这种作,要么设置 |
用消息组工厂
从4.3版本开始,有些MessageGroupStore实现可以注入自定义消息组工厂用于创建和定制消息组用于MessageGroupStore.
这默认为简易消息组工厂,得简易消息组基于GroupType.HASH_SET (LinkedHashSet)内部收集。
其他可能的选项包括SYNCHRONISED_SET和BLOCKING_QUEUE,其中最后一个可以用来恢复之前的简易消息组行为。
另外,还有持续有选项可选。
更多信息请见下一节。
从5.0.1版本开始,列表当组中消息的顺序和唯一性无关紧要时,也提供选项。
持续MessageGroupStore以及懒散加载
从4.3版本开始,全部为持久化MessageGroupStore实例检索消息组实例及其消息从商店里懒散地买东西。
在大多数情况下,它对相关性消息处理器实例(参见聚合器和重序器),当加载整个消息组从存储中对每个相关作进行。
你可以使用AbstractMessageGroupStore.setLazyLoadMessageGroups(false)可以选择关闭配置中的懒加载行为。
我们在MongoDB上对懒散负载的性能测试消息商店 (MongoDB消息库)和<聚合器> (聚合器)使用自定义释放策略类似于以下内容:
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
它对1000条简单消息产生类似的结果:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
然而,从5.5版本开始,所有持久化MessageGroupStore实现提供了streamMessagesForGroup(Object groupId)基于目标数据库流式API的合同。
当店里人数众多时,这能提高资源利用率。
在框架内部,这个新API被用于移除层(Delayer)中,当它在启动时重新调度持久消息时。
A回应了Stream<Message<?>>必须在处理结束时闭合,例如通过自动闭合资源尝试.
每当PersistentMessageGroup被使用,streamMessages()代表们MessageGroupStore.streamMessagesForGroup().
消息组条件
从5.5版本开始,消息组抽象提供条件弦选项。
这个选项的价值可以是任何可以以后为团队做出决策而解析的内容。
例如:发布策略从相关消息处理器中,可以参考该组的该属性,而不必遍历组内所有消息。
这MessageGroupStore揭露了setGroupCondition(Object groupId, String condition)应用程序接口。
为此,一个setGroupConditionSupplier(BiFunction<Message<?>, String, String>)选项已被添加到摘要相关消息处理.
该功能会根据每条消息加入组后,以及组的现有状态进行评估。
实现可以决定返回一个新值、现有值,或重置目标条件为零.
对于 的值条件可以是 JSON、SpEL 表达式、数字或任何可以序列化为字符串并随后解析的。
例如,FileMarker释放策略从文件聚合器组件中填充一个条件到一组中,FileHeaders.LINE_COUNT该项目的首部FileSplitter.FileMarker.Mark.END并从其处咨询信息canRelease()将群规模与该条件下的值进行比较。
这样它就不会遍历组内的所有消息来寻找FileSplitter.FileMarker.Mark.END带有FileHeaders.LINE_COUNT页眉。
它还允许终点标记在所有其他记录之前到达聚合器;例如,在多线程环境中处理文件时。
此外,为了配置方便,还有一个群体条件提供者合同已经提出。
这摘要相关消息处理检查是否提供发布策略实现该接口并提取条件提供商对于群组条件评估逻辑。