|
此版本仍在开发中,尚未被视为稳定版。如需最新稳定版本,请使用 Spring Integration 7.0.4! |
重新排序器
重组器与聚合器相关,但用途不同。 虽然聚合器会合并消息,但重组器只是传递消息而不对其进行更改。
功能
重组器的工作方式与聚合器类似,因为它使用 CORRELATION_ID 将消息分组存储。
区别在于,重组器不会对消息进行任何处理。
相反,它会按照其 SEQUENCE_NUMBER 头部值的顺序释放这些消息。
对此,您可以选择一次性释放所有消息(根据 SEQUENCE_SIZE 及其他选项,在整个序列完成后),或者在获得有效序列后立即释放。
(我们将在本章稍后解释“有效序列”的含义。)
| 重组器旨在对具有较小间隔的相对较短的消息序列进行重新排序。 如果您有大量存在许多间隔的不连续序列,可能会遇到性能问题。 |
配置重组器
请参阅 聚合器和重排序器 以了解如何在 Java DSL 中配置重排序器。
配置重组器只需在 XML 中包含相应的元素。
以下示例展示了重新排序器的配置:
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" (1)
input-channel="inputChannel" (2)
output-channel="outputChannel" (3)
discard-channel="discardChannel" (4)
release-partial-sequences="true" (5)
message-store="messageStore" (6)
send-partial-result-on-expiry="true" (7)
send-timeout="86420000" (8)
correlation-strategy="correlationStrategyBean" (9)
correlation-strategy-method="correlate" (10)
correlation-strategy-expression="headers['something']" (11)
release-strategy="releaseStrategyBean" (12)
release-strategy-method="release" (13)
release-strategy-expression="size() == 10" (14)
empty-group-min-timeout="60000" (15)
lock-registry="lockRegistry" (16)
group-timeout="60000" (17)
group-timeout-expression="size() ge 2 ? 100 : -1" (18)
scheduler="taskScheduler" /> (19)
expire-group-upon-timeout="false" /> (20)
| 1 | 重排序器的 ID 是可选的。 |
| 2 | 重组器的输入通道。 必需。 |
| 3 | 重排序器发送重新排序消息的通道。 可选。 |
| 4 | 重新排序器发送超时消息的通道(如果将send-partial-result-on-timeout设置为false)。
可选。 |
| 5 | 是立即发送已排序的序列,还是等到整个消息组到达后再发送。
可选。
(默认值为 false。) |
| 6 | 对 MessageGroupStore 的引用,可用于在消息组完成之前,根据其关联键存储这些消息组。
可选。
(默认值为易失性内存存储。) |
| 7 | 无论分组到期时,是否应发送已排序的分组(即使某些消息缺失)。
可选。
(默认为 false。)
请参阅 聚合器中的状态管理:MessageGroupStore。 |
| 8 | 发送回复时等待的超时间隔,将 Message 发送到 output-channel 或 discard-channel。
仅当输出通道存在某些“发送”限制(例如具有固定“容量”的 QueueChannel)时才适用此设置。
在这种情况下,将抛出 MessageDeliveryException。
对于 AbstractSubscribableChannel 实现,send-timeout 将被忽略。
对于 group-timeout(-expression),来自计划过期任务的 MessageDeliveryException 会导致该任务被重新调度。
可选。 |
| 9 | 对实现消息关联(分组)算法的 bean 的引用。
该 bean 可以是 CorrelationStrategy 接口的实现,也可以是一个 POJO。
在后一种情况下,还必须定义 correlation-strategy-method 属性。
可选。
(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 标头。) |
| 10 | 由 correlation-strategy 引用的 Bean 上定义的、实现关联决策算法的方法。
可选,但有约束条件(要求必须存在 correlation-strategy)。 |
| 11 | 表示关联策略的 SpEL 表达式。
示例:"headers['something']"。
只允许使用 correlation-strategy 或 correlation-strategy-expression 中的一个。 |
| 12 | 对实现发布策略的 bean 的引用。
该 bean 可以是 ReleaseStrategy 接口的实现,也可以是一个 POJO。
在后一种情况下,还必须定义 release-strategy-method 属性。
可选(默认情况下,聚合器将使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头部属性)。 |
| 13 | 由 release-strategy 引用的 bean 上定义并实现完成决策算法的方法。
可选,但有限制(要求必须存在 release-strategy)。 |
| 14 | 表示发布策略的 SpEL 表达式。
表达式的根对象为 MessageGroup。
示例:"size() == 5"。
仅允许使用 release-strategy 或 release-strategy-expression 中的一个。 |
| 15 | 仅当为 MessageGroupStoreReaper 配置了 <resequencer> MessageStore 时适用。
默认情况下,当配置 MessageGroupStoreReaper 以过期部分组时,空组也会被移除。
空组在组正常释放后存在。
这是为了能够检测并丢弃迟到的消息。
如果您希望以比过期部分组更长的时间间隔来过期空组,请设置此属性。
这样,空组将不会从 MessageStore 中移除,直到它们至少在此数值的毫秒内未被修改。
请注意,实际过期空组的时间也会受到收割者(reaper)超时属性的影响,可能长达该值加上超时时间。 |
| 16 | 请参阅 使用 XML 配置聚合器。 |
| 17 | 请参阅 使用 XML 配置聚合器。 |
| 18 | 请参阅 使用 XML 配置聚合器。 |
| 19 | 请参阅 使用 XML 配置聚合器。 |
| 20 | 默认情况下,当组因超时(或由 MessageGroupStoreReaper)完成时,空组的元数据将被保留。
迟到的消息会被立即丢弃。
将此值设置为 true 以完全移除该组。
随后,迟到的消息将启动一个新组,直到该组再次超时前不会被丢弃。
由于导致超时的序列范围中存在“空洞”,新组永远不会被正常释放。
之后可以使用 MessageGroupStoreReaper 配合 empty-group-min-timeout 属性来使空组过期(完全移除)。
从 5.0 版本开始,空组也会在 empty-group-min-timeout 过期后被安排移除。
默认值为 'false'。 |
另请参阅 聚合器过期组 以获取更多信息。
| 由于重排序器在 Java 类中无需实现任何自定义行为,因此不提供相应的注解支持。 |