提供的通知类

除了提供应用 AOP 通知类的通用机制外,Spring Integration 还提供了这些开箱即用的通知实现:spring-doc.cadn.net.cn

重试建议

重试通知(o.s.i.handler.advice.RequestHandlerRetryAdvice)利用了 Spring Framework 中 重试支持 提供的丰富重试机制。 该通知的核心组件是 RetryTemplate,它允许配置复杂的重试场景,包括 RetryPolicy 以及用于确定在重试耗尽时采取何种操作的 RecoveryCallback 策略。spring-doc.cadn.net.cn

无状态重试

无状态重试是指重试活动完全在通知(advice)内部处理的情况。 线程会暂停(如果配置为如此),然后重试该操作。spring-doc.cadn.net.cn

有状态重试

有状态重试是指重试状态在通知内部进行管理,但抛出异常后调用者重新提交请求的情况。有状态重试的一个示例是,我们希望消息发起者(例如 JMS)负责重新提交,而不是在当前线程上执行。有状态重试需要某种机制来检测重试的提交。为此,RequestHandlerRetryAdvice 暴露了 stateKeyFunctionnewMessagePredicatestateCacheSize 属性。其中后面两个只有在提供了第一个的情况下才有意义。本质上,stateKeyFunction 是一个指示器,用于将 RequestHandlerRetryAdvice 的逻辑从无状态切换为有状态。newMessagePredicate 的含义是:根据要处理的消息,刷新基于键值的现有重试状态两次。stateCacheSize 默认为 100,当有更多新的重试状态进入时,缓存中的旧条目将被移除。也许这些旧消息不再从上游流中重新投递,例如。g.,消息代理根据其重投策略将这些消息转至死信队列。spring-doc.cadn.net.cn

默认的回退行为是不进行回退。 重试会立即尝试。 使用导致线程在尝试之间暂停的回退策略可能会引发性能问题,包括内存过度使用和线程饥饿。 在高流量环境中,应谨慎使用回退策略。

配置重试建议

本节中的示例使用了以下 @ServiceActivator,该对象始终抛出异常:spring-doc.cadn.net.cn

public class FailingService {

	@ServiceActivator(inputChannel = "input", adviceChain = "retryAdvice")
    public void service(String message) {
        throw new RuntimeException("error");
    }
}
无状态重试

默认的 RetryPolicy 是尝试三次,加上对目标的原始调用共 MessageHandler 次。 默认情况下没有退避机制,因此三次尝试会连续进行,尝试之间没有任何延迟。 不存在 RecoveryCallback,因此最终结果是:在最后一次重试失败后,将异常抛出给调用者。 在 Spring Integration 环境中,可以通过在入站端点使用 error-channel 来处理此最终异常。 以下示例使用了 RequestHandlerRetryAdvice 的默认配置:spring-doc.cadn.net.cn

@Bean
RequestHandlerRetryAdvice retryAdvice() {
    return new RequestHandlerRetryAdvice();
}
带有恢复功能的简单无状态重试

以下示例在前一个示例的基础上添加了 RecoveryCallback,并使用 ErrorMessageSendingRecovererErrorMessage 发送到通道:spring-doc.cadn.net.cn

@Bean
RequestHandlerRetryAdvice retryAdvice(MessageChannel recoveryChannel) {
    RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
    requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel));
    return requestHandlerRetryAdvice;
}
带有自定义策略的无状态重试与恢复

为了更高级的功能,可以为 RequestHandlerRetryAdvice 提供自定义的 RetryPolicy。 本示例继续使用该简单的 RetryPolicy,但将重试次数增加至四次。 它还添加了一个 ExponentialBackoff,其中第一次重试等待一秒,第二次等待五秒,第三次等待二十五秒(总计四次尝试)。 以下列表展示了此类配置的示例:spring-doc.cadn.net.cn

@Bean
RequestHandlerRetryAdvice retryAdvice() {
    RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
    requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel()));
    RetryPolicy retryPolicy = RetryPolicy.builder()
            .maxRetries(4)
            .delay(Duration.ofSeconds(1))
            .multiplier(5.0)
            .maxDelay(Duration.ofMinutes(1))
            .build();
    requestHandlerRetryAdvice.setRetryPolicy(retryPolicy);
    return requestHandlerRetryAdvice;
}
无状态重试的命名空间支持

以下示例演示了如何使用 Spring Integration XML 命名空间及其自定义标签来配置 RequestHandlerRetryAdvicespring-doc.cadn.net.cn

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <ref bean="retrier" />
    </int:request-handler-advice-chain>
</int:service-activator>

<int:handler-retry-advice id="retrier" max-retries="4" recovery-channel="myErrorChannel">
    <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:handler-retry-advice>

在前面的示例中,通知被定义为顶层 Bean,因此它可以在多个 request-handler-advice-chain 实例中使用。 您也可以在链中直接定义通知,如下例所示:spring-doc.cadn.net.cn

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <int:retry-advice id="retrier" max-retries="4" recovery-channel="myErrorChannel">
            <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
        </int:retry-advice>
    </int:request-handler-advice-chain>
</int:service-activator>

一个 <handler-retry-advice> 可以拥有一个 <fixed-back-off><exponential-back-off> 子元素,也可以没有子元素。 一个没有子元素的 <handler-retry-advice> 不使用退避策略。 如果没有 recovery-channel,则在重试耗尽时抛出异常。 该命名空间仅可用于无状态重试。spring-doc.cadn.net.cn

对于更复杂的环境(自定义策略等),请使用标准的 <bean> 定义。spring-doc.cadn.net.cn

带有恢复功能的简单有状态重试

为了使重试具有状态,必须为 RequestHandlerRetryAdvice 实例提供 Function<Message<?>, Object> stateKeyFunction。此函数用于将消息标识为重新提交,以便 RequestHandlerRetryAdvice 能够确定该消息当前的重试状态。有状态重试的核心思想是不阻塞当前线程,而是缓存该消息的重试状态,并将 MessageHandler 次失败重新抛出给调用者。这通常适用于能够重新提交(或重新投递)事件的消息发起方,例如带有 nack 功能的消息代理如 RabbitMQ,或具有 seek 功能的 Apache Kafka;以及在消费回滚后的 JMS。如果尚未缓存任何状态(或者 Predicate<Message<?>> newMessagePredicate 返回的 true 对应该消息),则 MessageHandler 调用将被视为首次调用,并在其失败时,基于 BackOffExecution 的内部 RetryState 会被缓存到上述键下。在下一条消息到达时,缓存的状态会提供一个 Thread.sleep() 的退避间隔,然后再尝试调用 MessageHandler。如果此退避间隔等于 BackOffExecution.STOP(例如。g., maxAttempts 已达到),这意味着该消息不再重试:整个重试周期被视为已耗尽,相应的 RetryException 将被抛回给调用者,或者在提供时用于 RecoveryCallback 调用。一般来说,异常处理逻辑和退避执行与无状态行为类似,唯一的区别是线程不会在所有 maxAttempts 操作中被阻塞。消息的发起者负责为下一次重试调用重新投递消息。spring-doc.cadn.net.cn

断路器建议

断路器模式的基本思想是,如果某个服务当前不可用,就不要浪费时间和资源去尝试使用它。 o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice 实现了该模式。 当断路器处于关闭状态时,端点会尝试调用服务。 如果连续多次调用失败,断路器将进入打开状态。 当处于打开状态时,新请求会“快速失败”,在一段时间过期之前不会尝试调用服务。spring-doc.cadn.net.cn

当该时间到期后,断路器将设置为半开状态。 在此状态下,如果任何一次尝试失败,断路器将立即进入打开状态。 如果尝试成功,断路器将进入关闭状态;在这种情况下,直到再次发生配置数量的连续失败之前,它不会再次进入打开状态。 任何成功的尝试都会将状态重置为零次失败,以便确定断路器何时可能再次进入打开状态。spring-doc.cadn.net.cn

通常,此建议可能用于外部服务,例如在尝试建立网络连接时因超时而导致失败的情况。spring-doc.cadn.net.cn

RequestHandlerCircuitBreakerAdvice 有两个属性:thresholdhalfOpenAfterthreshold 属性表示在断路器打开之前需要发生的连续失败次数。 其默认值为 5halfOpenAfter 属性表示断路器在最后一次失败后等待尝试下一次请求的时间。 默认为 1000 毫秒。spring-doc.cadn.net.cn

以下示例配置了一个熔断器,并展示了其 DEBUGERROR 输出:spring-doc.cadn.net.cn

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2" />
            <property name="halfOpenAfter" value="12000" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator

在前面的示例中,阈值设置为 2halfOpenAfter 设置为 12 秒。 每 5 秒到达一个新请求。 前两次尝试调用了服务。 第三和第四次尝试失败,并抛出异常,表明断路器已打开。 第五次请求被尝试,因为该请求距离上次失败已过去 15 秒。 第六次尝试立即失败,因为断路器立即进入打开状态。spring-doc.cadn.net.cn

表达式求值通知

最终提供的通知类是 o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice。 此通知比其他两个通知更为通用。 它提供了一种机制,用于评估发送到端点的原始传入消息中的表达式。 分别提供了在成功或失败后评估的表达式。 可选地,包含评估结果和输入消息的消息可以被发送到消息通道。spring-doc.cadn.net.cn

此通知的一个典型用例可能与<ftp:outbound-channel-adapter/>相关,例如:如果传输成功则将文件移动到某个目录,或者如果失败则移动到另一个目录:spring-doc.cadn.net.cn

该建议具有属性,用于在成功时设置表达式、在失败时设置表达式,并为每种情况提供相应的通道。 对于成功的情况,发送到 successChannel 的消息是一个 AdviceMessage,其负载为表达式评估的结果。 另一个名为 inputMessage 的属性包含发送给处理器的原始消息。 当处理器抛出异常时,发送到 failureChannel 的消息是一个 ErrorMessage,其负载为 MessageHandlingExpressionEvaluatingAdviceException。 与所有 MessagingException 实例一样,此负载具有 failedMessagecause 属性,此外还有一个名为 evaluationResult 的额外属性,其中包含表达式评估的结果。spring-doc.cadn.net.cn

从版本 5.1.3 开始,如果配置了通道但未提供表达式,则使用默认表达式对消息的 payload 进行求值。

当在通知的作用域内抛出异常时,默认情况下,在任何 failureExpression 被求值后,该异常会被抛给调用者。 如果您希望抑制异常的抛出,请将 trapException 属性设置为 true。 以下示例展示了如何使用 Java DSL 配置 advicespring-doc.cadn.net.cn

@SpringBootApplication
public class EerhaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
        MessageChannel in = context.getBean("advised.input", MessageChannel.class);
        in.send(new GenericMessage<>("good"));
        in.send(new GenericMessage<>("bad"));
        context.close();
    }

    @Bean
    public IntegrationFlow advised() {
        return f -> f.<String>handle((payload, headers) -> {
            if (payload.equals("good")) {
                return null;
            }
            else {
                throw new RuntimeException("some failure");
            }
        }, c -> c.advice(expressionAdvice()));
    }

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString(
                "payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }

    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }

}

速率限制器建议

速率限制器建议(RateLimiterRequestHandlerAdvice)可确保端点不会因请求过多而过载。 当超出速率限制时,请求将进入被阻止状态。spring-doc.cadn.net.cn

此类建议的一个典型用例是外部服务提供商不允许每分钟超过 n 次请求。spring-doc.cadn.net.cn

RateLimiterRequestHandlerAdvice 的实现完全基于 Resilience4j 项目,需要注入 RateLimiterRateLimiterConfig。 也可以使用默认配置和/或自定义名称进行配置。spring-doc.cadn.net.cn

以下示例配置了一个限流建议,限制为每秒一个请求:spring-doc.cadn.net.cn

@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
    return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .limitForPeriod(1)
            .build());
}

@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
		adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
    ...
}

缓存建议

从版本 5 开始。2,CacheRequestHandlerAdvice 已引入。它基于 Spring Framework 中的缓存抽象,并与 @Caching 注解系列提供的概念和功能保持一致。内部逻辑基于 CacheAspectSupport 扩展,其中缓存操作的代理围绕 AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage 方法执行,并以请求 Message<?> 作为参数。此建议可以通过 SpEL 表达式或 Function 进行配置,以评估缓存键。请求 Message<?> 可作为 SpEL 求值上下文的根对象,或作为 Function 输入参数使用。默认情况下,请求消息中的payload用作缓存键。CacheRequestHandlerAdvice 必须与 cacheNames 配合配置,当默认缓存操作为 CacheableOperation 时,或者与任意一组 CacheOperation 配合使用。每个 CacheOperation 都可以单独配置,也可以拥有共享选项,例如 CacheManagerCacheResolverCacheErrorHandler,这些可以从 CacheRequestHandlerAdvice 配置中复用。此配置功能类似于 Spring Framework 的 @CacheConfig@Caching 注解组合。如果未提供 CacheManager,则默认从 CacheAspectSupport 中的 BeanFactory 解析单个 Bean。spring-doc.cadn.net.cn

以下示例配置了两个通知,每个通知使用不同的缓存操作集:spring-doc.cadn.net.cn

@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    return cacheRequestHandlerAdvice;
}

@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
    ...
}

@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
    cachePutBuilder.setCacheName(TEST_PUT_CACHE);
    CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
    cacheEvictBuilder.setCacheName(TEST_CACHE);
    cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
    return cacheRequestHandlerAdvice;
}

@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
    adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
    ...
}