|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
聚合器
聚合器本质上是拆分器的镜像,它是一种消息处理器,接收多条消息并将它们合并为一条消息。 实际上,聚合器通常是包含拆分器的管道中的下游消费者。
从技术角度看,聚合器比拆分器更复杂,因为它是有状态的。
它必须保存待聚合的消息,并确定何时完整的消息组已准备好进行聚合。
为此,它需要一个 MessageStore。
功能
聚合器通过关联和存储一组相关的消息,直到该组被视为完整,从而将这些消息合并。 此时,聚合器处理整个组并创建单条消息,然后将聚合后的消息作为输出发送。
实现聚合器需要提供执行聚合的逻辑(即从多条消息创建单条消息)。 两个相关的概念是关联和释放。
相关性决定了消息如何被分组以进行聚合。
在 Spring Integration 中,默认基于 IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头执行相关性处理。
具有相同 IntegrationMessageHeaderAccessor.CORRELATION_ID 的消息会被归为一组。
然而,您可以自定义相关性策略,以支持其他指定消息分组方式的方法。
为此,您可以实现一个 CorrelationStrategy(本章后续将对此进行介绍)。
要确定一组消息何时准备好进行处理,需查阅 ReleaseStrategy。
聚合器的默认释放策略是:当基于 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头信息的所有序列中包含的消息均已到达时,释放该组消息。
您可以通过提供自定义 ReleaseStrategy 实现的引用来覆盖此默认策略。
编程模型
聚合 API 由多个类组成:
-
接口
MessageGroupProcessor,及其子类:MethodInvokingAggregatingMessageGroupProcessor和ExpressionEvaluatingMessageGroupProcessor -
ReleaseStrategy接口及其默认实现:SimpleSequenceSizeReleaseStrategy -
CorrelationStrategy接口及其默认实现:HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
The AggregatingMessageHandler(AbstractCorrelatingMessageHandler 的一个子类)是一个 MessageHandler 实现,封装了聚合器(以及其他相关用例)的通用功能,具体如下:
-
将消息关联到组中进行聚合
-
在组可以被释放之前,将这些消息保留在
MessageStore中 -
决定何时可以发布该组
-
将已发布的组聚合为单条消息
-
识别并响应已过期的组
决定消息应如何分组的职责被委托给 CorrelationStrategy 实例。
决定消息组是否可以释放的职责被委托给 ReleaseStrategy 实例。
以下列表展示了基础 AbstractAggregatingMessageGroupProcessor 的简要亮点(实现 aggregatePayloads 方法的职责留给开发者):
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
请参阅 DefaultAggregatingMessageGroupProcessor、ExpressionEvaluatingMessageGroupProcessor 和 MethodInvokingMessageGroupProcessor,它们是 AbstractAggregatingMessageGroupProcessor 的开箱即用实现。
从版本 5 开始。2, 一个 Function<MessageGroup, Map<String, Object>> 策略可用于 AbstractAggregatingMessageGroupProcessor,以合并和计算(聚合)输出消息的标头。DefaultAggregateHeadersFunction 的实现可用,其逻辑是返回组内无冲突的所有标头;组内一条或多条消息中缺失的标头不被视为冲突。冲突的标头已被省略。随着新引入的DelegatingMessageGroupProcessor,此函数用于任何任意(非AbstractAggregatingMessageGroupProcessor)的MessageGroupProcessor实现。本质上,该框架将提供的函数注入到 AbstractAggregatingMessageGroupProcessor 实例中,并将所有其他实现包装到 DelegatingMessageGroupProcessor 中。AbstractAggregatingMessageGroupProcessor 和 DelegatingMessageGroupProcessor 在逻辑上的区别在于:后者不会在调用委托策略之前预先计算请求头,并且如果委托返回 Message 或 AbstractIntegrationMessageBuilder,则不会执行该函数。在这种情况下,框架假设目标实现已负责生成一组正确的标头并填充到返回结果中。Function<MessageGroup, Map<String, Object>>策略可作为 XML 配置的headers-function参考属性、Java DSL 的AggregatorSpec.headersFunction()选项,以及纯 Java 配置的AggregatorFactoryBean.setHeadersFunction()使用。
CorrelationStrategy 由 AbstractCorrelatingMessageHandler 拥有,并且其默认值基于 IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头,如下示例所示:
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
对于消息组的实际处理,默认实现是 DefaultAggregatingMessageGroupProcessor。
它会创建一个单一的 Message,其负载是为给定组接收的负载的 List。
这对于具有上游拆分器、发布 - 订阅通道或收件人列表路由器的简单分散 - 聚集实现效果良好。
在使用发布 - 订阅通道或收件人列表路由器处理此类场景时,请务必启用 apply-sequence 标志。
这样做会添加必要的标头:CORRELATION_ID、SEQUENCE_NUMBER 和 SEQUENCE_SIZE。
在 Spring Integration 中,该行为对拆分器(splitters)默认启用,但对发布 - 订阅通道或收件人列表路由器并未默认启用,因为这些组件可能用于各种上下文,而在这些上下文中并不需要这些标头。 |
在为应用程序实现特定的聚合器策略时,您可以扩展 AbstractAggregatingMessageGroupProcessor 并实现 aggregatePayloads 方法。
然而,存在更优的解决方案,它们与 API 的耦合度更低,可用于实现聚合逻辑,这些逻辑可以通过 XML 或注解进行配置。
一般来说,任何 POJO 都可以实现聚合算法,只要它提供一个接受单个 java.util.List 作为参数的方法(也支持参数化列表)。
该方法按以下方式被调用以聚合消息:
-
如果参数为
java.util.Collection<T>,且参数类型 T 可分配给Message,则会将为聚合累积的整个消息列表发送给聚合器。 -
如果参数是非泛型
java.util.Collection或参数类型不可分配给Message,则该方法接收累积消息的负载。 -
如果返回类型无法分配给
Message,则它将被视为由框架自动创建的Message的有效载荷。
| 为了代码简洁并推广最佳实践(如低耦合、可测试性等),实现聚合逻辑的首选方式是通过 POJO,并使用 XML 或注解支持在应用程序中对其进行配置。 |
从 5.3 版本开始,在处理消息组后,AbstractCorrelatingMessageHandler会对 MessageBuilder.popSequenceDetails() 消息头进行修改,以适配具有多个嵌套层级的拆分器 - 聚合器场景。
此操作仅在消息组的释放结果不是消息集合时才会执行。
在这种情况下,目标 MessageGroupProcessor 负责在构建这些消息时调用 MessageBuilder.popSequenceDetails()。
如果 MessageGroupProcessor 返回 Message,则仅当 sequenceDetails 与组中的第一条消息匹配时,才会对输出消息执行 MessageBuilder.popSequenceDetails()。
(此前,仅在从 MessageGroupProcessor 返回纯负载或 AbstractIntegrationMessageBuilder 时才执行此操作。)
此功能可通过新的 popSequence boolean 属性进行控制,因此在某些场景下,当标准拆分器未填充关联详情时,可以禁用 MessageBuilder.popSequenceDetails()。
该属性实质上会撤销上游最近的 applySequence = true 在 AbstractMessageSplitter 中所执行的操作。
有关更多信息,请参阅 拆分器。
The SimpleMessageGroup.getMessages() method returns an unmodifiableCollection.
Therefore, if an aggregating POJO method has a Collection<Message> parameter, the argument passed in is exactly that Collection instance and, when you use a SimpleMessageStore for the aggregator, that original Collection<Message> is cleared after releasing the group.
Consequently, the Collection<Message> variable in the POJO is cleared too, if it is passed out of the aggregator.
If you wish to simply release that collection as-is for further processing, you must build a new Collection (for example, new ArrayList<Message>(messages)).
Starting with version 4.3, the framework no longer copies the messages to a new collection, to avoid undesired extra object creation. |
在 4.2 版本之前,无法通过 XML 配置提供 MessageGroupProcessor。
仅可使用 POJO 方法进行聚合。
现在,如果框架检测到引用的(或内部)Bean 实现了 MessageProcessor,则将其用作聚合器的输出处理器。
如果您希望从自定义 MessageGroupProcessor 中释放对象集合作为消息的有效负载,您的类应扩展 AbstractAggregatingMessageGroupProcessor 并实现 aggregatePayloads()。
此外,自 4.2 版本起,提供了 SimpleMessageGroupProcessor。
它返回消息组的消息集合,如前所述,这将导致已释放的消息单独发送。
这使得聚合器能够作为消息屏障工作,到达的消息将被保留,直到释放策略触发,并且该组将作为一系列单独的消息被释放。
从版本 6.0 开始,上述描述的拆分行为仅在分组处理器为 SimpleMessageGroupProcessor 时生效。
否则,对于任何其他返回 Collection<Message> 的 MessageGroupProcessor 实现,只会发送一条包含整个消息集合作为其负载的回复消息。
这种逻辑由聚合器的核心目的决定:按某个键收集请求消息并生成单个分组消息。
ReleaseStrategy
ReleaseStrategy 接口定义如下:
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
通常,任何 POJO 都可以实现完成决策逻辑,前提是该类提供一个接受单个 java.util.List(参数化列表也支持)作为参数的方法,并返回一个布尔值。
该方法在每条新消息到达后被调用,以判断组是否已完成,如下所示:
-
如果参数为
java.util.List<T>,且参数类型T可分配给Message,则将该组中累积的所有消息发送到该方法。 -
如果参数是非参数化的
java.util.List,或者参数类型无法分配给Message,则该方法将接收累积消息的负载。 -
如果消息组已准备好进行聚合,则该方法必须返回
true,否则返回 false。
以下示例展示了如何为类型为 Message 的 List 使用 @ReleaseStrategy 注解:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例展示了如何为类型为 String 的 List 使用 @ReleaseStrategy 注解:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
基于前两个示例中的签名,基于 POJO 的发布策略会传递 Collection 个尚未发布的消息(如果您需要访问整个 Message),或者传递 Collection 个负载对象(如果类型参数不是 Message)。
这满足了大多数用例。
然而,如果出于某种原因您需要访问完整的 MessageGroup,则应提供 ReleaseStrategy 接口的实现。
|
在处理可能较大的组时,您应了解这些方法的调用方式,因为在组被释放之前,释放策略可能会被多次调用。
最高效的是实现 出于这些原因,对于大型团队,我们建议您实施 |
当组被释放以进行聚合时,该组中所有尚未释放的消息都会被处理并从组中移除。
如果该组也已完整(即,来自某个序列的所有消息都已到达,或者未定义序列),则该组被标记为已完成。
任何针对此组的新消息都将被发送到丢弃通道(如果已定义)。
将 expire-groups-upon-completion 设置为 true(默认值为 false)会移除整个组,而任何新消息(其关联 ID 与被移除组的关联 ID 相同)将形成一个新的组。
您可以通过使用 MessageGroupStoreReaper 并将 send-partial-result-on-expiry 设置为 true 来释放部分序列。
为了便于丢弃迟到的消息,聚合器在释放组后必须维护该组的状态。
这最终可能导致内存溢出(out-of-memory)情况。
为避免此类情况,您应该考虑将 MessageGroupStoreReaper 配置为移除组元数据。
过期参数应设置为:一旦达到某个时间点(此后预期不再有迟到消息到达),即让组过期。
有关配置回收器(reaper)的详细信息,请参见 管理聚合器中的状态:MessageGroupStore。 |
Spring Integration 为 ReleaseStrategy 提供了实现:SimpleSequenceSizeReleaseStrategy。
该实现会检查每条到达消息的 SEQUENCE_NUMBER 和 SEQUENCE_SIZE 标头,以决定消息组何时完成并准备进行聚合。
如前所述,它也是默认策略。
在 5.0 版本之前,默认的发布策略是 SequenceSizeReleaseStrategy,该策略在处理大型组时性能不佳。
使用该策略时,会检测并拒绝重复的序列号。
此操作可能代价较高。 |
如果您正在聚合大型组,则无需释放部分组,也无需检测/拒绝重复序列,请考虑使用 SimpleSequenceSizeReleaseStrategy 替代——对于这些用例,它的效率更高,并且自 5.0 版本 起,在未指定部分组释放的情况下默认为此选项。
聚合大型组
4.3 版本将 Collection 中消息的默认值从 SimpleMessageGroup 改为 HashSet(此前为 BlockingQueue)。
当从大型组中移除单个消息时,此操作开销较大(需要 O(n) 线性扫描)。
虽然哈希集在移除操作上通常更快,但对于大型消息而言可能开销较大,因为在插入和移除时都需要计算哈希值。
如果您有哈希计算成本较高的消息,请考虑使用其他集合类型。
正如在 使用 MessageGroupFactory 中讨论的那样,提供了一个 SimpleMessageGroupFactory,以便您可以选择最适合您需求的 Collection。
您还可以提供自己的工厂实现来创建其他 Collection<Message<?>>。
以下示例展示了如何配置聚合器,使用之前的实现和一个 SimpleSequenceSizeReleaseStrategy:
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果过滤器端点参与聚合器上游的流程,序列大小释放策略(固定或基于 sequenceSize 标头)将无法达到预期目的,因为序列中的某些消息可能会被过滤器丢弃。
在这种情况下,建议选择另一个 ReleaseStrategy,或使用从丢弃子流程发送的补偿消息,这些消息在其内容中携带一些信息,以便在自定义的完整组函数中跳过处理。
有关更多信息,请参阅 过滤器。 |
关联策略
CorrelationStrategy 接口定义如下:
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回一个 Object,表示用于将消息与消息组关联的相关键。
该键必须满足在实现 equals() 和 hashCode() 时,针对 Map 中键所使用的标准。
通常,任何 POJO 都可以实现关联逻辑,将消息映射到方法参数(或多个参数)的规则与 ServiceActivator 相同(包括对 @Header 注解的支持)。
该方法必须返回一个值,且该值不能为 null。
Spring Integration 为 CorrelationStrategy:提供了实现 HeaderAttributeCorrelationStrategy。
此实现将消息头之一的值(其名称由构造函数参数指定)作为关联键返回。
默认情况下,关联策略是一个 HeaderAttributeCorrelationStrategy,它返回 CORRELATION_ID 头属性的值。
如果您有希望用于关联的自定义头名称,可以在 HeaderAttributeCorrelationStrategy 的实例上进行配置,并将其作为聚合器关联策略的引用提供。
锁注册表
对组的更改是线程安全的。
因此,当您并发发送具有相同关联 ID 的消息时,聚合器中只会处理其中一条消息,使其实际上成为每个消息组单线程。
使用 LockRegistry 来获取已解析关联 ID 的锁。
默认使用 DefaultLockRegistry(内存中)。
对于在使用共享 MessageGroupStore 的情况下跨服务器同步更新,您必须配置共享锁注册表。
避免死锁
如上所述,当消息组发生突变(添加或释放消息)时,会持有锁。
考虑以下流程:
...->aggregator1-> ... ->aggregator2-> ...
如果存在多个线程,且聚合器共享一个公共锁注册表,则可能发生死锁。
这将导致线程挂起,并且 jstack <pid> 可能会呈现如下结果:
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免这个问题:
-
确保每个聚合器都有其自己的锁注册表(这可以是跨应用程序实例共享的注册表,但流程中的两个或更多聚合器必须各自拥有独立的注册表)
-
使用
ExecutorChannel或QueueChannel作为聚合器的输出通道,以便下游流程在新线程上运行 -
从版本 5.1.1 开始,将
releaseLockBeforeSend聚合器属性设置为true
| 如果出于某种原因,单个聚合器的输出最终被路由回同一个聚合器,也可能导致此问题。 当然,上述第一种解决方案在这种情况下并不适用。 |
在 Java DSL 中配置聚合器
请参阅 聚合器和重组器,了解如何在 Java DSL 中配置聚合器。
使用 XML 配置聚合器
Spring Integration 支持通过 <aggregator/> 元素使用 XML 配置聚合器。
以下示例展示了聚合器的用法:
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
| 1 | 聚合器的 ID 是可选的。 |
| 2 | 生命周期属性,指示聚合器是否应在应用程序上下文启动期间启动。 可选(默认为'true')。 |
| 3 | 聚合器接收消息的通道。 必填。 |
| 4 | 聚合器发送聚合结果的通道。 可选(因为传入的消息本身可以在'replyChannel'消息头中指定回复通道)。 |
| 5 | 聚合器发送超时消息的目标通道(如果 send-partial-result-on-expiry 为 false)。
可选。 |
| 6 | 对 MessageGroupStore 的引用,用于在消息完整之前,根据其关联键存储消息组。
可选。
默认情况下,它是一个易失性内存存储。
有关更多信息,请参阅 消息存储。 |
| 7 | 当多个处理器订阅到同一个 DirectChannel 时,此聚合器的顺序(用于负载均衡目的)。
可选。 |
| 8 | 表示过期的消息应在其包含的MessageGroup过期后,被聚合并发送到“output-channel”或“replyChannel”(参见MessageGroupStore.expireMessageGroups(long))。
使MessageGroup过期的一种方法是配置MessageGroupStoreReaper。
然而,您也可以通过调用MessageGroupStore.expireMessageGroups(timeout)来使MessageGroup过期。
您可以通过控制总线操作实现这一点,或者如果您拥有MessageGroupStore实例的引用,可以通过调用expireMessageGroups(timeout)来实现。
否则,该属性本身不会执行任何操作。
它仅用作指示器,用于判断是否应丢弃或将仍位于即将过期的MessageGroup中的任何消息发送到输出通道或回复通道。
可选(默认为false)。
注意:此属性更恰当的命名应为send-partial-result-on-timeout,因为如果expire-groups-upon-timeout设置为false,则分组可能实际上不会过期。 |
| 9 | 发送回复时等待的超时间隔 Message 到 output-channel 或 discard-channel。
默认为 30 秒。
仅当输出通道存在某些“发送”限制(例如具有固定“容量”的 QueueChannel)时才适用此设置。
在这种情况下,将抛出 MessageDeliveryException。
对于 AbstractSubscribableChannel 实现,send-timeout 将被忽略。
对于 group-timeout(-expression),来自计划过期任务的 MessageDeliveryException 会导致该任务被重新调度。
可选。 |
| 10 | 对实现消息关联(分组)算法的 Bean 的引用。
该 Bean 可以是 CorrelationStrategy 接口的实现,也可以是一个 POJO。
在后一种情况下,还必须定义 correlation-strategy-method 属性。
可选(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 标头)。 |
| 11 | 由 correlation-strategy 引用的 Bean 上定义的方法。
它实现了关联决策算法。
可选,但有限制(必须存在 correlation-strategy)。 |
| 12 | 表示关联策略的 SpEL 表达式。
示例:"headers['something']"。
只允许使用 correlation-strategy 或 correlation-strategy-expression 中的一个。 |
| 13 | 对应用上下文中定义的 Bean 的引用。 该 Bean 必须实现前文所述的聚合逻辑。 可选(默认情况下,聚合消息列表将成为输出消息的有效载荷)。 |
| 14 | 由 ref 属性引用的 Bean 上定义的方法。
它实现了消息聚合算法。
可选(取决于是否定义了 ref 属性)。 |
| 15 | 对实现发布策略的 Bean 的引用。
该 Bean 可以是 ReleaseStrategy 接口的实现类,也可以是一个 POJO。
在后一种情况下,还必须定义 release-strategy-method 属性。
可选(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头属性)。 |
| 16 | 由 release-strategy 属性引用的 Bean 上定义的方法。
它实现了完成决策算法。
可选,但有限制(必须存在 release-strategy)。 |
| 17 | 表示发布策略的 SpEL 表达式。
表达式的根对象为 MessageGroup。
示例:"size() == 5"。
仅允许使用 release-strategy 或 release-strategy-expression 中的一个。 |
| 18 | 当设置为 true(默认为 false)时,已完成的组将从消息存储中移除,允许具有相同关联的后续消息形成新组。
默认行为是将与已完成组具有相同关联的消息发送到 discard-channel。 |
| 19 | 仅当为 <aggregator> 的 MessageStore 配置了 MessageGroupStoreReaper 时生效。
默认情况下,当配置 MessageGroupStoreReaper 以过期部分组时,空组也会被移除。
空组在组正常释放后存在。
空组可用于检测和丢弃迟到的消息。
如果您希望以比过期部分组更长的周期来过期空组,请设置此属性。
此时,空组将不会被从 MessageStore 中移除,直到它们至少未被修改过指定的毫秒数。
请注意,实际过期空组的时间也会受到收割器(reaper)的 timeout 属性的影响,最长可能为该值加上超时时间。 |
| 20 | 对 org.springframework.integration.util.LockRegistry Bean 的引用。
它用于基于 groupId 获取 Lock,以支持在 MessageGroup 上的并发操作。
默认情况下,使用内部的 DefaultLockRegistry。
使用分布式 LockRegistry(例如 ZookeeperLockRegistry)可确保聚合器在同一组上仅有一个实例能够并发运行。
有关更多信息,请参阅 Redis 锁注册表 或 Zookeeper 锁注册表。 |
| 21 | 一个超时时间(以毫秒为单位),用于在当前消息到达时,如果ReleaseStrategy未释放组,则强制MessageGroup完成。此属性为聚合器提供了一种基于时间的内置发布策略,当在从最后一条消息到达开始计算的超时时间内没有新消息到达时,如果需要发出部分结果(或丢弃该组)。要设置从 MessageGroup 创建时刻开始计时的超时,请参阅 group-timeout-expression 相关信息。当新消息到达聚合器时,其 MessageGroup 对应的任何现有 ScheduledFuture<?> 将被取消。如果 ReleaseStrategy 返回 false(表示不释放)和 groupTimeout > 0,则计划一个新任务来使该组过期。我们不建议将此属性设置为零(或负值)。这样做会有效地禁用聚合器,因为每个消息组都会立即完成。不过,您可以通过使用表达式有条件地将其设置为零(或负值)。请参阅 group-timeout-expression 获取相关信息。完成期间采取的操作取决于 ReleaseStrategy 和 send-partial-group-on-expiry 属性。参见 聚合器和组超时 以获取更多信息。它与 'group-timeout-expression' 属性互斥。 |
| 22 | 求值为 groupTimeout 的 SpEL 表达式,其中 MessageGroup 作为 #root 评估上下文对象。用于调度强制完成 MessageGroup。如果表达式计算结果为 null,则不会安排完成。如果评估结果为零,则该组将立即在当前线程上完成。实际上,这提供了一个动态的 group-timeout 属性。作为一个示例,如果您希望在组创建后经过 10 秒强制完成 MessageGroup,可以考虑使用以下 SpEL 表达式:timestamp + 10000 - T(System).currentTimeMillis(),其中 timestamp 由 MessageGroup.getTimestamp() 提供,而 MessageGroup 此处是 #root 求值上下文对象。请注意,组的创建时间可能因其他组过期属性的配置而与第一条消息到达的时间不同。参见 group-timeout 以获取更多信息。与 'group-timeout' 属性互斥。 |
| 23 | 当组因超时(或通过MessageGroupStoreReaper)完成时,默认情况下该组会过期(被完全移除)。
迟到的消息将启动一个新组。
您可以将其设置为false以完成该组,但保留其元数据,以便丢弃迟到的消息。
空组可以稍后使用MessageGroupStoreReaper与empty-group-min-timeout属性一起进行过期处理。
默认为'true'。 |
| 24 | 一个 TaskScheduler Bean 引用,用于在 groupTimeout 内没有新消息到达时,强制完成 MessageGroup。如果未提供,则使用在 ApplicationContext (ThreadPoolTaskScheduler) 中注册的默认调度器 (taskScheduler)。如果未指定 group-timeout 或 group-timeout-expression,则此属性不适用。 |
| 25 | 自 4.1 版本起。
它允许为 forceComplete 操作启动事务。
该事务由 group-timeout(-expression) 或 MessageGroupStoreReaper 发起,且不适用于正常的 add、release 和 discard 操作。
仅允许此子元素或 <expire-advice-chain/>。 |
| 26 | 自版本 4.1起。
它允许为forceComplete操作配置任意Advice值。
它由group-timeout(-expression)或MessageGroupStoreReaper触发,不适用于正常的add、release和discard操作。
仅允许此子元素或<expire-transactional/>。
也可以使用 Spring tx命名空间在此处配置事务Advice。 |
|
即将过期的组
有两个与过期(完全移除)组相关的属性。
当组过期时,不会保留其记录;如果收到具有相同关联的新消息,将启动一个新组。
当组完成(未过期)时,空组会保留,后续到达的消息将被丢弃。
可以稍后通过使用
如果一个组没有正常完成,而是因超时而被释放或丢弃,则该组通常会被标记为过期。
从 4.1 版本开始,您可以通过使用
自 5.0 版本起,空组也会在 从版本 5 开始。4. 聚合器(以及重排序器)可配置为过期孤儿组(即持久化消息存储中可能不会被释放的组)。 |
我们通常建议在自定义聚合器处理器实现可能在其他 <aggregator> 定义中被引用时,使用 ref 属性。
然而,如果自定义聚合器实现仅被单个 <aggregator> 定义使用,则可以使用内部 bean 定义(从版本 1.0.3 开始)在 <aggregator> 元素中配置聚合 POJO,如下示例所示:
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
在同一个 <aggregator> 配置中同时使用 ref 属性和内部 Bean 定义是不允许的,因为这会产生歧义条件。
在这种情况下,将抛出异常。 |
以下示例展示了聚合器 bean 的实现:
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前述示例的完成策略 Bean 的实现可能如下:
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
| 在合理的情况下,发布策略方法和聚合器方法可以合并为单个 Bean。 |
上述示例的相关策略 Bean 的一种实现可能如下所示:
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前述示例中的聚合器会根据某些条件(在本例中为除以十后的余数)对数字进行分组,并保留该组,直到有效负载提供的数字总和超过某个值。
| 在合理的情况下,发布策略方法、关联策略方法和聚合器方法可以合并到同一个 Bean 中。 (实际上,它们全部或任意两个都可以合并。) |
聚合器与 Spring 表达式语言 (SpEL)
自 Spring Integration 2.0 起,您可以使用 SpEL 处理各种策略(关联、释放和聚合),如果此类释放策略背后的逻辑相对简单,我们推荐使用 SpEL。
假设您有一个遗留组件,其设计目的是接收对象数组。
我们知道,默认的释放策略会将所有聚合消息组装到 List 中。
现在我们要解决两个问题。
首先,我们需要从列表中提取单个消息。
其次,我们需要提取每条消息的有效载荷并组装成对象数组。
以下示例解决了这两个问题:
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
然而,借助 SpEL,此类需求实际上可以通过一行表达式相对轻松地处理,从而避免您编写自定义类并将其配置为 Bean。 以下示例展示了如何实现:
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在前面的配置中,我们使用集合投影表达式从列表中所有消息的负载组装一个新集合,然后将其转换为数组,从而实现与早期 Java 代码相同的结果。
在处理自定义发布和相关性策略时,您可以应用相同的基于表达式的做法。
无需在 correlation-strategy 属性中为自定义 CorrelationStrategy 定义 bean,您可以将简单的关联逻辑实现为 SpEL 表达式,并在 correlation-strategy-expression 属性中进行配置,如下示例所示:
correlation-strategy-expression="payload.person.id"
在前面的示例中,我们假设有效载荷具有一个 person 属性,其值为 id,该值将用于关联消息。
同样,对于 ReleaseStrategy,您可以将发布逻辑实现为 SpEL 表达式,并将其配置在 release-strategy-expression 属性中。
评估上下文的根对象是 MessageGroup 本身。
消息的 List 可以通过表达式中使用组的 message 属性进行引用。
在 5.0 版本之前的发布版中,根对象是 Message<?> 的集合,如前面的示例所示: |
release-strategy-expression="!messages.?[payload==5].empty"
在前面的示例中,SpEL 评估上下文的根对象是 MessageGroup 本身,您声明的是,只要该组中存在有效载荷为 5 的消息,该组即应被释放。
聚合器与分组超时
从版本 4.0 开始,引入了两个新的互斥属性:group-timeout 和 group-timeout-expression。
请参阅 使用 XML 配置聚合器。
在某些情况下,如果当前消息到达时 ReleaseStrategy 未释放,您可能需要在超时后发出聚合结果(或丢弃该组)。
为此,groupTimeout 选项允许强制调度 MessageGroup 完成,如下示例所示:
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
通过此示例,如果聚合器按release-strategy-expression定义的顺序接收到最后一条消息,则正常发布是可行的。
如果该特定消息未到达,只要组中包含至少两条消息,groupTimeout将在十秒后强制完成该组。
强制组完成的结果取决于 ReleaseStrategy 和 send-partial-result-on-expiry。
首先,再次查阅发布策略,以确定是否进行正常发布。
由于组未发生变化,ReleaseStrategy 可以决定在此时释放该组。
如果发布策略仍未释放该组,则视为已过期。
如果 send-partial-result-on-expiry 为 true,则(部分)MessageGroup 中的现有消息将作为正常的聚合器回复消息释放给 output-channel。
否则,将被丢弃。
groupTimeout 的行为与 MessageGroupStoreReaper 存在差异(请参阅 使用 XML 配置聚合器)。
收割器会定期为 MessageGroupStore 中的所有 MessageGroup 强制完成操作。
如果在 groupTimeout 期间未收到新消息,groupTimeout 会对每个 MessageGroup 单独执行此操作。
此外,收割器还可用于移除空组(保留空组是为了在 expire-groups-upon-completion 为 false 时丢弃延迟消息)。
从版本 5.5 开始,groupTimeoutExpression 可以求值为 java.util.Date 实例。
这在某些情况下非常有用,例如根据组创建时间(MessageGroup.getTimestamp())而不是当前消息到达时间来确定计划任务的时刻,因为这是在将 groupTimeoutExpression 求值为 long 时计算的:
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用注解配置聚合器
以下示例展示了使用注解配置的聚合器:
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}
| 1 | 一个注解,表示此方法应作为聚合器使用。 如果此类用作聚合器,则必须指定该注解。 |
| 2 | 一个注解,表示该方法被用作聚合器的释放策略。
如果未在任何方法上出现,则聚合器使用 SimpleSequenceSizeReleaseStrategy。 |
| 3 | 一个注解,指示该方法应作为聚合器的关联策略使用。
如果未指定关联策略,则聚合器将使用基于CORRELATION_ID的HeaderAttributeCorrelationStrategy。 |
XML 元素提供的所有配置选项也适用于 @Aggregator 注解。
聚合器可以从 XML 中显式引用,或者如果类上定义了 @MessageEndpoint,则可以通过类路径扫描自动检测。
聚合组件的注解配置(@Aggregator 及其他)仅适用于简单用例,其中大多数默认选项已足够。
如果您在使用注解配置时需要对这些选项进行更多控制,请考虑为 AggregatingMessageHandler 使用 @Bean 定义,并将其 @Bean 方法标记为 @ServiceActivator,如下示例所示:
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
请参阅 编程模型 和 @Bean 方法上的注解 以获取更多信息。
从版本 4.2 开始,AggregatorFactoryBean 已可用,以简化 AggregatingMessageHandler 的 Java 配置。 |
在聚合器中管理状态:MessageGroupStore
聚合器(以及 Spring Integration 中的一些其他模式)是一种有状态的模式,它需要根据一段时间内到达的、具有相同相关键的一组消息做出决策。
有状态模式中接口的的设计(例如 ReleaseStrategy)遵循一个原则:组件(无论是框架定义的还是用户定义的)应当能够保持无状态。
所有状态都由 MessageGroup 承载,其管理被委托给 MessageGroupStore。MessageGroupStore 接口定义如下:
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
欲了解更多信息,请参阅 Javadoc。
MessageGroupStore在等待释放策略被触发时,会在MessageGroups中累积状态信息,而该事件可能永远不会发生。
因此,为了防止陈旧消息滞留,并让易失性存储提供在应用程序关闭时进行清理的钩子,MessageGroupStore允许您注册回调,以便在其MessageGroups过期时应用这些回调。
该接口非常直接,如下列表所示:
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调函数可以直接访问存储和消息组,以便管理持久状态(例如,通过完全从存储中移除该组)。
The MessageGroupStore 维护这些回调的列表,它根据需求将这些回调应用于时间戳早于作为参数提供的时间的所有消息(参见前面描述的 registerMessageGroupExpiryCallback(..) 和 expireMessageGroups(..) 方法)。
重要的是,当您希望依赖 expireMessageGroups 功能时,不要在多个聚合器组件中使用相同的 MessageGroupStore 实例。
每个 AbstractCorrelatingMessageHandler 都会基于 forceComplete() 回调注册其自己的 MessageGroupCallback。
这样,每个过期组可能会被错误的聚合器完成或丢弃。
从版本 5.0.10 开始,MessageGroupStore 中的注册回调会使用来自 AbstractCorrelatingMessageHandler 的 UniqueExpiryCallback。
反过来,MessageGroupStore 会检查是否存在该类的实例,如果回调集中已存在该实例,则记录带有相应消息的错误。
通过这种方式,框架禁止在不同的聚合器/重组器中使用 MessageGroupStore 实例,以避免因非特定关联处理器创建的组而过期的副作用。 |
您可以使用超时值调用 expireMessageGroups 方法。
任何早于当前时间减去该值的消息均已过期,并已应用回调。
因此,存储的使用者定义了消息组“过期”的含义。
为了方便用户使用,Spring Integration 提供了以 MessageGroupStoreReaper 形式表示的消息过期包装功能,如下示例所示:
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
收割者是一个 Runnable。
在前面的示例中,消息组存储的 expire 方法每十秒被调用一次。
超时时间本身为 30 秒。
重要的是要理解,MessageGroupStoreReaper的'timeout'属性是一个近似值,并且会受到任务调度器速率的影响,因为该属性仅在MessageGroupStoreReaper任务的下次计划执行时进行检查。
例如,如果超时设置为十分钟,但MessageGroupStoreReaper任务被安排每小时运行一次,且MessageGroupStoreReaper任务的最后一次执行发生在超时前一分钟,那么MessageGroup在接下来的59分钟内不会过期。
因此,我们建议将速率设置为至少等于超时的值或更短。 |
除了收割器之外,当应用程序通过AbstractCorrelatingMessageHandler中的生命周期回调关闭时,也会调用过期回调。
The AbstractCorrelatingMessageHandler 注册其自身的过期回调,这与 XML 配置中聚合器的布尔标志 send-partial-result-on-expiry 相关联。
如果该标志设置为 true,则在调用过期回调时,任何尚未释放的组中的未标记消息都可以被发送到输出通道。
由于 MessageGroupStoreReaper 是从计划任务调用的,并且可能会根据 sendPartialResultOnExpiry 选项生成消息并发送到下游集成流,因此建议提供一个自定义的 TaskScheduler,并通过 errorChannel 使用 MessagePublishingErrorHandler 来处理异常,因为常规聚合器释放功能可能期望如此。
相同的逻辑也适用于组超时功能,该功能同样依赖于 TaskScheduler。
有关更多信息,请参阅 错误处理。 |
|
当共享的 一些 有关 |
Flux 聚合器
在 5.2 版本中,引入了 FluxAggregatorMessageHandler 组件。
它基于 Project Reactor 的 Flux.groupBy() 和 Flux.window() 操作符。
传入的消息由该组件构造函数中的 Flux.create() 启动的 FluxSink 发出。
如果未提供 outputChannel 或其不是 ReactiveStreamsSubscribableChannel 的实例,则对主 Flux 的订阅将由 Lifecycle.start() 实现执行。
否则,订阅将推迟到由 ReactiveStreamsSubscribableChannel 实现完成。
消息按 Flux.groupBy() 进行分组,使用 CorrelationStrategy 作为分组键。
默认情况下,将检查消息的 IntegrationMessageHeaderAccessor.CORRELATION_ID 标头。
默认情况下,每个已关闭的窗口在要生成的消息载荷中都会作为 Flux 被释放。
此消息包含该窗口中第一条消息的所有标头。
输出消息载荷中的此 Flux 必须被订阅并在下游进行处理。
此类逻辑可以通过 FluxAggregatorMessageHandler 的 setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>) 配置选项进行自定义(或替换)。
例如,如果我们希望最终消息中包含多个 List 的载荷,可以按如下方式配置一个 Flux.collectList():
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
FluxAggregatorMessageHandler 中有多个选项可用于选择合适的时间窗口策略:
-
setBoundaryTrigger(Predicate<Message<?>>)- 被传播到Flux.windowUntil()运算符。 有关更多信息,请参阅其 Javadoc。 优先级高于所有其他窗口选项。 -
setWindowSize(int)和setWindowSizeFunction(Function<Message<?>, Integer>)- 会传播到Flux.window(int)或windowTimeout(int, Duration)。 默认情况下,窗口大小根据组中的第一条消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE头计算得出。 -
setWindowTimespan(Duration)- 根据窗口大小配置传播到Flux.window(Duration)或windowTimeout(int, Duration)。 -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)- 一个函数,用于将转换应用于分组后的 Flux,以支持任何未由公开选项涵盖的自定义窗口操作。
由于此组件是 MessageHandler 的实现,它可以简单地与 @ServiceActivator 消息注解一起用作 @Bean 定义。
使用 Java DSL,可以从 .handle() EIP 方法中使用它。
下面的示例演示了如何在运行时注册一个 IntegrationFlow,以及如何将 FluxAggregatorMessageHandler 与上游的拆分器进行关联:
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);