如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

幂等接收者企业集成模式

从 4.1 版本开始,Spring Integration 提供了企业集成模式 幂等接收器 的实现。 这是一个功能型模式,整个幂等性逻辑应由应用程序实现。 然而,为了简化决策过程,提供了 IdempotentReceiverInterceptor 组件。 这是一个 AOP Advice,应用于 MessageHandler.handleMessage() 方法,并可根据其配置 filter 请求消息或将其标记为 duplicatespring-doc.cadn.net.cn

以前,您可以通过在<filter/>中使用自定义的MessageSelector来实现此模式(例如参见Filter)。 然而,由于此模式实际上定义了端点的行为而非端点本身,因此幂等接收器实现不提供端点组件。 相反,它被应用于应用程序中声明的端点。spring-doc.cadn.net.cn

IdempotentReceiverInterceptor的逻辑基于提供的MessageSelector,如果消息未被该选择器接受,则使用设置为trueduplicateMessage标头对其进行丰富。 目标MessageHandler(或下游流)可以查阅此标头以实现正确的幂等性逻辑。 如果IdempotentReceiverInterceptor配置了discardChannelthrowExceptionOnRejection = true,重复消息将不会发送到目标MessageHandler.handleMessage()。 相反,它将被丢弃。 如果您想丢弃(不处理)重复消息,则应将discardChannel配置为NullChannel,例如默认的nullChannel bean。spring-doc.cadn.net.cn

为了在消息之间保持状态并提供比较消息以实现幂等性的能力,我们提供了 MetadataStoreSelector。 它接受一个 MessageProcessor 实现(该实现基于 Message 创建查找键)和一个可选的 ConcurrentMetadataStore元数据存储)。 有关更多信息,请参阅 MetadataStoreSelector Javadoc。 您还可以使用额外的 MessageProcessor 来自定义 ConcurrentMetadataStorevalue。 默认情况下,MetadataStoreSelector 使用 timestamp 消息头。spring-doc.cadn.net.cn

通常情况下,如果键没有现有值,选择器会选择一条消息进行接受。 在某些情况下,比较键的当前值和新生成的值对于确定是否应接受该消息很有用。 从 5.3 版本开始,提供了 compareValues 属性,它引用了 BiPredicate<String, String>;第一个参数是旧值;返回 true 以接受该消息,并在 MetadataStore 中将旧值替换为新值。 这有助于减少键的数量;例如,在处理文件中的行时,您可以将文件名存储在键中,将当前行号存储在值中。 然后,在重启后,您可以跳过已经处理过的行。 有关示例,请参阅 幂等下游处理分割文件spring-doc.cadn.net.cn

为了方便,MetadataStoreSelector 选项可直接在 <idempotent-receiver> 组件上进行配置。 以下列表展示了所有可能的属性:spring-doc.cadn.net.cn

<idempotent-receiver
        id=""  (1)
        endpoint=""  (2)
        selector=""  (3)
        discard-channel=""  (4)
        metadata-store=""  (5)
        key-strategy=""  (6)
        key-expression=""  (7)
        value-strategy=""  (8)
        value-expression=""  (9)
        compare-values="" (10)
        throw-exception-on-rejection="" />  (11)
1 IdempotentReceiverInterceptor Bean 的 ID。 可选。
2 此拦截器应用的消费者端点名称或模式。 使用逗号分隔名称(模式)(,),例如 endpoint="aaa, bbb*, ccc, *ddd, eee*fff"。 匹配这些模式的端点 Bean 名称将用于检索目标端点的 MessageHandler Bean(使用其 .handler 后缀),并将 IdempotentReceiverInterceptor 应用于这些 Bean。 必填。
3 一个 MessageSelector Bean 引用。 与 metadata-storekey-strategy (key-expression) 互斥。 当未提供 selector 时,必须提供 key-strategykey-strategy-expression 中的一个。
4 标识当 IdempotentReceiverInterceptor 不接受消息时,应将消息发送至哪个通道。 如果省略此属性,重复的消息将被转发至带有 duplicateMessage 头的处理器。 可选。
5 一个 ConcurrentMetadataStore 引用。 由底层的 MetadataStoreSelector 使用。 与 selector 互斥。 可选。 默认的 MetadataStoreSelector 使用一个内部 SimpleMetadataStore,该组件不会在应用程序执行之间维护状态。
6 一个 MessageProcessor 引用。 由底层的 MetadataStoreSelector 使用。 从请求消息中评估一个 idempotentKey。 与 selectorkey-expression 互斥。 当未提供 selector 时,必须指定 key-strategykey-strategy-expression 中的一个。
7 用于填充 ExpressionEvaluatingMessageProcessor 的 SpEL 表达式。 由底层的 MetadataStoreSelector 使用。 使用请求消息作为求值上下文根对象来求值 idempotentKey。 与 selectorkey-strategy 互斥。 当未提供 selector 时,必须指定 key-strategykey-strategy-expression 中的一个。
8 一个 MessageProcessor 引用。 由底层的 MetadataStoreSelector 使用。 评估来自请求消息的 idempotentKeyvalue。 与 selectorvalue-expression 互斥。 默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息头作为 Metadata 的'value'。
9 一个用于填充ExpressionEvaluatingMessageProcessor的 SpEL 表达式。 由底层的MetadataStoreSelector使用。 通过使用请求消息作为评估上下文根对象,为idempotentKey评估value。 与selectorvalue-strategy互斥。 默认情况下,'MetadataStoreSelector'使用'timestamp'消息头作为元数据的'value'。
10 BiPredicate<String, String> bean 的引用,允许您通过比较键的旧值和新值来可选地选择消息;默认值为 null
11 如果 IdempotentReceiverInterceptor 拒绝消息,是否抛出异常。 默认为 false。 无论是否提供 discard-channel,此设置均适用。

对于 Java 配置,Spring Integration 提供了方法级别的 @IdempotentReceiver 注解。 它用于标记具有消息注解的 method@ServiceActivator@Router, and others) to specify which `IdempotentReceiverInterceptor 对象应用于此端点)。 以下示例展示了如何使用 @IdempotentReceiver 注解:spring-doc.cadn.net.cn

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}

当您使用 Java DSL 时,可以将拦截器添加到端点的建议链中,如下示例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}
IdempotentReceiverInterceptor 仅设计用于 MessageHandler.handleMessage(Message<?>) 方法。 从 4.3.1 版本开始,它实现了 HandleMessageAdvice,并以 AbstractHandleMessageAdvice 作为基类,以实现更好的解耦。 有关更多信息,请参阅 处理消息通知