|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
提供的通知类
除了提供应用 AOP 通知类的通用机制外,Spring Integration 还提供了这些开箱即用的通知实现:
-
RequestHandlerRetryAdvice(在 重试建议 中描述) -
RequestHandlerCircuitBreakerAdvice(在 熔断器建议 中描述) -
ExpressionEvaluatingRequestHandlerAdvice(在表达式建议中描述) -
RateLimiterRequestHandlerAdvice(在 限流器建议 中描述) -
CacheRequestHandlerAdvice(在 缓存建议 中描述) -
ReactiveRequestHandlerAdvice(在响应式通知中描述) -
ContextHolderRequestHandlerAdvice(在 上下文持有者建议 中描述)
重试建议
重试建议 (o.s.i.handler.advice.RequestHandlerRetryAdvice) 利用了 Spring Retry 项目提供的丰富重试机制。
spring-retry 的核心组件是 RetryTemplate,它允许配置复杂的重试场景,包括 RetryPolicy 和 BackoffPolicy 策略(具有多种实现),以及一个 RecoveryCallback 策略,用于确定在重试耗尽时应采取的操作。
- 无状态重试
-
无状态重试是指重试活动完全在通知(advice)内部处理的情况。 线程会暂停(如果配置为如此),然后重试该操作。
- 有状态重试
-
状态重试是指重试状态在通知中管理,但抛出异常且调用方重新提交请求的情况。 状态重试的一个例子是,我们希望消息发起者(例如 JMS)负责重新提交,而不是在当前线程上执行。 状态重试需要某种机制来检测已重试的提交。
有关spring-retry的更多信息,请参阅该项目的 Javadoc以及Spring Batch的参考文档,其中spring-retry即源自于此。
| 默认的回退行为是不进行回退。 重试会立即尝试。 使用导致线程在尝试之间暂停的回退策略可能会引发性能问题,包括内存过度使用和线程饥饿。 在高流量环境中,应谨慎使用回退策略。 |
配置重试建议
本节中的示例使用了以下 <service-activator>,该对象始终抛出异常:
public class FailingService {
public void service(String message) {
throw new RuntimeException("error");
}
}
- 无状态重试
-
默认的
RetryTemplate具有一个SimpleRetryPolicy,它会尝试三次。 没有BackOffPolicy,因此这三次尝试会连续进行,且每次尝试之间没有延迟。 没有RecoveryCallback,因此在最后一次重试失败后,会将异常抛给调用方。 在 Spring Integration 环境中,可以使用入站端点上的error-channel来处理此最终异常。 以下示例使用了RetryTemplate并展示了其DEBUG输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 - 带有恢复功能的简单无状态重试
-
以下示例在前一个示例的基础上添加了
RecoveryCallback,并使用ErrorMessageSendingRecoverer将ErrorMessage发送到通道:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...] - 带有自定义策略的无状态重试与恢复
-
为了更高级的用法,我们可以提供带有自定义
RetryTemplate的建议。 本示例继续使用SimpleRetryPolicy,但将重试次数增加到四次。 它还添加了一个ExponentialBackoffPolicy,其中第一次重试等待一秒,第二次等待五秒,第三次等待 25 秒(总共四次重试)。 以下代码示例展示了该示例及其DEBUG输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> <property name="retryTemplate" ref="retryTemplate" /> </bean> </int:request-handler-advice-chain> </int:service-activator> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="4" /> </bean> </property> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="5.0" /> <property name="maxInterval" value="60000" /> </bean> </property> </bean> 27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...] 27.071 DEBUG [task-scheduler-1]Retry: count=0 27.080 DEBUG [task-scheduler-1]Sleeping for 1000 28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1 28.081 DEBUG [task-scheduler-1]Retry: count=1 28.081 DEBUG [task-scheduler-1]Sleeping for 5000 33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2 33.082 DEBUG [task-scheduler-1]Retry: count=2 33.083 DEBUG [task-scheduler-1]Sleeping for 25000 58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3 58.083 DEBUG [task-scheduler-1]Retry: count=3 58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4 58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4 58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...] - 无状态重试的命名空间支持
-
从 4.0 版本开始,得益于对重试通知的命名空间支持,上述配置可以大大简化,如下例所示:
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>在前面的示例中,通知被定义为顶层 Bean,因此它可以在多个
request-handler-advice-chain实例中使用。 您也可以在链中直接定义通知,如下例所示:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator>一个
<handler-retry-advice>可以拥有一个<fixed-back-off>或<exponential-back-off>子元素,也可以没有子元素。 一个没有子元素的<handler-retry-advice>不使用退避策略。 如果没有recovery-channel,则在重试耗尽时抛出异常。 该命名空间仅可用于无状态重试。对于更复杂的环境(自定义策略等),请使用标准的
<bean>定义。 - 带有恢复功能的简单有状态重试
-
为了使重试状态化,我们需要为建议提供一个
RetryStateGenerator实现。 此类用于将消息标识为重试提交,以便RetryTemplate能够确定该消息的重试当前状态。 框架提供了一个SpelExpressionRetryStateGenerator,它通过使用 SpEL 表达式来确定消息标识符。 本示例再次使用默认策略(三次尝试且无退避)。 与无状态重试一样,这些策略也可以进行自定义。 以下代码块展示了示例及其DEBUG输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="retryStateGenerator"> <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator"> <constructor-arg value="headers['jms_messageId']" /> </bean> </property> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> 24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 24.368 DEBUG [Container#0-1]Retry: count=0 24.387 DEBUG [Container#0-1]Checking for rethrow: count=1 24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1 24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 25.412 DEBUG [Container#0-1]Retry: count=1 25.413 DEBUG [Container#0-1]Checking for rethrow: count=2 25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2 25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 26.418 DEBUG [Container#0-1]Retry: count=2 26.419 DEBUG [Container#0-1]Checking for rethrow: count=3 26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3 26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3 27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]如果您将前面的示例与无状态示例进行比较,您会发现,在使用有状态重试时,每次失败都会向调用者抛出异常。
- 重试的异常分类
-
Spring Retry 在确定哪些异常可以触发重试方面具有极大的灵活性。 默认配置会对所有异常进行重试,且异常分类器会检查顶层异常。 如果您将其配置为仅在
MyException时重试,而您的应用程序抛出了SomeOtherException(其原因是MyException),则不会发生重试。自 Spring Retry 1.0.3 起,
BinaryExceptionClassifier拥有一个名为traverseCauses的属性(默认值为false)。 当启用true时,它将遍历异常原因,直到找到匹配项或遍历完所有异常原因。要使用此分类器进行重试,请使用通过接受最大尝试次数、
Map个Exception对象以及traverseCauses布尔值的构造函数创建的SimpleRetryPolicy。 然后将此策略注入到RetryTemplate中。
traverseCauses 在这种情况下是必需的,因为用户异常可能会被包装在 MessagingException 中。 |
断路器建议
断路器模式的基本思想是,如果某个服务当前不可用,就不要浪费时间和资源去尝试使用它。
o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice 实现了该模式。
当断路器处于关闭状态时,端点会尝试调用服务。
如果连续多次调用失败,断路器将进入打开状态。
当处于打开状态时,新请求会“快速失败”,在一段时间过期之前不会尝试调用服务。
当该时间过期后,断路器将被设置为半开状态。 处于此状态时,如果即使单次尝试失败,断路器会立即进入打开状态。 如果尝试成功,断路器将回到关闭状态;在这种情况下,除非再次发生配置数量的连续失败,否则不会再次进入打开状态。 任何成功的尝试都会将失败计数重置为零,以便确定断路器何时可能再次进入打开状态。
通常,此类建议可用于外部服务,其中可能需要一段时间才会失败(例如尝试建立网络连接时出现超时)。
RequestHandlerCircuitBreakerAdvice 有两个属性:threshold 和 halfOpenAfter。
threshold 属性表示在断路器打开之前需要发生的连续失败次数。
其默认值为 5。
halfOpenAfter 属性表示断路器在最后一次失败后等待尝试下一次请求的时间。
默认为 1000 毫秒。
以下示例配置了一个熔断器,并展示了其 DEBUG 和 ERROR 输出:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
在前面的示例中,阈值设置为 2,halfOpenAfter 设置为 12 秒。
每 5 秒到达一个新请求。
前两次尝试调用了服务。
第三和第四次尝试失败,并抛出异常,表明断路器已打开。
第五次请求被尝试,因为该请求距离上次失败已过去 15 秒。
第六次尝试立即失败,因为断路器立即进入打开状态。
表达式求值通知
最终提供的通知类是 o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice。
此通知比其他两个通知更为通用。
它提供了一种机制,用于评估发送到端点的原始传入消息中的表达式。
分别提供了在成功或失败后评估的表达式。
可选地,包含评估结果和输入消息的消息可以被发送到消息通道。
此通知的一个典型用例可能与<ftp:outbound-channel-adapter/>相关,例如:如果传输成功则将文件移动到某个目录,或者如果失败则移动到另一个目录:
该建议具有属性,用于在成功时设置表达式、在失败时设置表达式,并为每种情况提供相应的通道。
对于成功的情况,发送到 successChannel 的消息是一个 AdviceMessage,其负载为表达式评估的结果。
另一个名为 inputMessage 的属性包含发送给处理器的原始消息。
当处理器抛出异常时,发送到 failureChannel 的消息是一个 ErrorMessage,其负载为 MessageHandlingExpressionEvaluatingAdviceException。
与所有 MessagingException 实例一样,此负载具有 failedMessage 和 cause 属性,此外还有一个名为 evaluationResult 的额外属性,其中包含表达式评估的结果。
从版本 5.1.3 开始,如果配置了通道但未提供表达式,则使用默认表达式对消息的 payload 进行求值。 |
当在通知的作用域内抛出异常时,默认情况下,在任何 failureExpression 被求值后,该异常会被抛给调用者。
如果您希望抑制异常的抛出,请将 trapException 属性设置为 true。
以下示例展示了如何使用 Java DSL 配置 advice:
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
速率限制器建议
速率限制器建议(RateLimiterRequestHandlerAdvice)可确保端点不会因请求过多而过载。
当突破速率限制时,请求将进入阻塞状态。
此类建议的一个典型用例是外部服务提供商不允许每分钟超过 n 次请求。
RateLimiterRequestHandlerAdvice 的实现完全基于 Resilience4j 项目,需要注入 RateLimiter 或 RateLimiterConfig。
也可以使用默认配置和/或自定义名称进行配置。
以下示例配置了一个限流建议,限制为每秒一个请求:
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
缓存建议
从版本 5 开始。2,CacheRequestHandlerAdvice 已引入。它基于 Spring Framework 中的缓存抽象,并与 @Caching 注解系列提供的概念和功能保持一致。内部逻辑基于 CacheAspectSupport 扩展,其中缓存操作的代理围绕 AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage 方法执行,并以请求 Message<?> 作为参数。此建议可以通过 SpEL 表达式或 Function 进行配置,以评估缓存键。请求 Message<?> 可作为 SpEL 求值上下文的根对象,或作为 Function 输入参数使用。默认情况下,请求消息中的payload用作缓存键。CacheRequestHandlerAdvice 必须与 cacheNames 配合配置,当默认缓存操作为 CacheableOperation 时,或者与任意一组 CacheOperation 配合使用。每个 CacheOperation 都可以单独配置,也可以拥有共享选项,例如 CacheManager、CacheResolver 和 CacheErrorHandler,这些可以从 CacheRequestHandlerAdvice 配置中复用。此配置功能类似于 Spring Framework 的 @CacheConfig 和 @Caching 注解组合。如果未提供 CacheManager,则默认从 CacheAspectSupport 中的 BeanFactory 解析单个 Bean。
以下示例配置了两个具有不同缓存操作集的增强(advices):
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}