|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
幂等接收者企业集成模式
从 4.1 版本开始,Spring Integration 提供了企业集成模式 幂等接收器 的实现。
这是一个功能型模式,整个幂等性逻辑应由应用程序实现。
然而,为了简化决策过程,提供了 IdempotentReceiverInterceptor 组件。
这是一个 AOP Advice,应用于 MessageHandler.handleMessage() 方法,并可根据其配置 filter 请求消息或将其标记为 duplicate。
以前,您可以通过在<filter/>中使用自定义的MessageSelector来实现此模式(例如参见Filter)。
然而,由于此模式实际上定义了端点的行为而非端点本身,因此幂等接收器实现不提供端点组件。
相反,它被应用于应用程序中声明的端点。
IdempotentReceiverInterceptor的逻辑基于提供的MessageSelector,如果消息未被该选择器接受,则使用设置为true的duplicateMessage标头对其进行丰富。
目标MessageHandler(或下游流)可以查阅此标头以实现正确的幂等性逻辑。
如果IdempotentReceiverInterceptor配置了discardChannel或throwExceptionOnRejection = true,重复消息将不会发送到目标MessageHandler.handleMessage()。
相反,它将被丢弃。
如果您想丢弃(不处理)重复消息,则应将discardChannel配置为NullChannel,例如默认的nullChannel bean。
为了在消息之间保持状态并提供比较消息以实现幂等性的能力,我们提供了 MetadataStoreSelector。
它接受一个 MessageProcessor 实现(该实现基于 Message 创建查找键)和一个可选的 ConcurrentMetadataStore(元数据存储)。
有关更多信息,请参阅 MetadataStoreSelector Javadoc。
您还可以使用额外的 MessageProcessor 来自定义 ConcurrentMetadataStore 的 value。
默认情况下,MetadataStoreSelector 使用 timestamp 消息头。
通常情况下,如果键没有现有值,选择器会选择一条消息进行接受。
在某些情况下,比较键的当前值和新生成的值对于确定是否应接受该消息很有用。
从 5.3 版本开始,提供了 compareValues 属性,它引用了 BiPredicate<String, String>;第一个参数是旧值;返回 true 以接受该消息,并在 MetadataStore 中将旧值替换为新值。
这有助于减少键的数量;例如,在处理文件中的行时,您可以将文件名存储在键中,将当前行号存储在值中。
然后,在重启后,您可以跳过已经处理过的行。
有关示例,请参阅 幂等下游处理分割文件。
为了方便,MetadataStoreSelector 选项可直接在 <idempotent-receiver> 组件上进行配置。
以下列表展示了所有可能的属性:
<idempotent-receiver
id="" (1)
endpoint="" (2)
selector="" (3)
discard-channel="" (4)
metadata-store="" (5)
key-strategy="" (6)
key-expression="" (7)
value-strategy="" (8)
value-expression="" (9)
compare-values="" (10)
throw-exception-on-rejection="" /> (11)
| 1 | IdempotentReceiverInterceptor Bean 的 ID。
可选。 |
| 2 | 此拦截器应用的消费者端点名称或模式。
使用逗号分隔名称(模式)(,),例如 endpoint="aaa, bbb*, ccc, *ddd, eee*fff"。
匹配这些模式的端点 Bean 名称将用于检索目标端点的 MessageHandler Bean(使用其 .handler 后缀),并将 IdempotentReceiverInterceptor 应用于这些 Bean。
必填。 |
| 3 | 一个 MessageSelector Bean 引用。
与 metadata-store 和 key-strategy (key-expression) 互斥。
当未提供 selector 时,必须提供 key-strategy 或 key-strategy-expression 中的一个。 |
| 4 | 标识当 IdempotentReceiverInterceptor 不接受消息时,应将消息发送至哪个通道。
如果省略此属性,重复的消息将被转发至带有 duplicateMessage 头的处理器。
可选。 |
| 5 | 一个 ConcurrentMetadataStore 引用。
由底层的 MetadataStoreSelector 使用。
与 selector 互斥。
可选。
默认的 MetadataStoreSelector 使用一个内部 SimpleMetadataStore,该组件不会在应用程序执行之间维护状态。 |
| 6 | 一个 MessageProcessor 引用。
由底层的 MetadataStoreSelector 使用。
从请求消息中评估一个 idempotentKey。
与 selector 和 key-expression 互斥。
当未提供 selector 时,必须指定 key-strategy 或 key-strategy-expression 中的一个。 |
| 7 | 用于填充 ExpressionEvaluatingMessageProcessor 的 SpEL 表达式。
由底层的 MetadataStoreSelector 使用。
使用请求消息作为求值上下文根对象来求值 idempotentKey。
与 selector 和 key-strategy 互斥。
当未提供 selector 时,必须指定 key-strategy 或 key-strategy-expression 中的一个。 |
| 8 | 一个 MessageProcessor 引用。
由底层的 MetadataStoreSelector 使用。
评估来自请求消息的 idempotentKey 的 value。
与 selector 和 value-expression 互斥。
默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息头作为 Metadata 的'value'。 |
| 9 | 一个用于填充ExpressionEvaluatingMessageProcessor的 SpEL 表达式。
由底层的MetadataStoreSelector使用。
通过使用请求消息作为评估上下文根对象,为idempotentKey评估value。
与selector和value-strategy互斥。
默认情况下,'MetadataStoreSelector'使用'timestamp'消息头作为元数据的'value'。 |
| 10 | 对 BiPredicate<String, String> bean 的引用,允许您通过比较键的旧值和新值来可选地选择消息;默认值为 null。 |
| 11 | 如果 IdempotentReceiverInterceptor 拒绝消息,是否抛出异常。
默认为 false。
无论是否提供 discard-channel,此设置均适用。 |
对于 Java 配置,Spring Integration 提供了方法级别的 @IdempotentReceiver 注解。
它用于标记具有消息注解的 method(@ServiceActivator、@Router, and others) to specify which `IdempotentReceiverInterceptor 对象应用于此端点)。
以下示例展示了如何使用 @IdempotentReceiver 注解:
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
当您使用 Java DSL 时,可以将拦截器添加到端点的建议链中,如下示例所示:
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
IdempotentReceiverInterceptor 仅设计用于 MessageHandler.handleMessage(Message<?>) 方法。
从 4.3.1 版本开始,它实现了 HandleMessageAdvice,并以 AbstractHandleMessageAdvice 作为基类,以实现更好的解耦。
有关更多信息,请参阅 处理消息通知。 |