如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

延迟器

延迟器是一个简单的端点,它允许消息流被延迟特定的时间间隔。 当消息被延迟时,原始发送者不会被阻塞。 相反,延迟的消息会被调度到一个 org.springframework.scheduling.TaskScheduler 实例,以便在延迟结束后发送到输出通道。 这种方法具有良好的可扩展性,即使对于较长的延迟也是如此,因为它不会导致大量发送线程被阻塞。 相反,在典型情况下,实际执行释放消息的操作会使用线程池。 本节包含几个配置延迟器的示例。spring-doc.cadn.net.cn

配置延迟器

<delayer>元素用于在两个消息通道之间延迟消息流。 与其他端点一样,您可以提供'input-channel'和'output-channel'属性,但delayer还具有'default-delay'和'expression'属性(以及'expression'元素),用于确定每条消息应延迟的毫秒数。 以下示例将所有消息延迟三秒:spring-doc.cadn.net.cn

<int:delayer id="delayer" input-channel="input"
             default-delay="3000" output-channel="output"/>

如果您需要为每条消息确定延迟,也可以使用 'expression' 属性提供 SpEL 表达式,如下所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from("input")
            .delay(d -> d
                    .messageGroupId("delayer.messageGroupId")
                    .defaultDelay(3_000L)
                    .delayExpression("headers['delay']"))
            .channel("output")
            .get();
}
@Bean
fun flow() =
    integrationFlow("input") {
        delay {
            messageGroupId("delayer.messageGroupId")
            defaultDelay(3000L)
            delayExpression("headers['delay']")
        }
        channel("output")
    }
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
    DelayHandler handler = new DelayHandler("delayer.messageGroupId");
    handler.setDefaultDelay(3_000L);
    handler.setDelayExpressionString("headers['delay']");
    handler.setOutputChannelName("output");
    return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
             default-delay="3000" expression="headers['delay']"/>

在前面的示例中,三秒的延迟仅适用于给定入站消息的表达式求值为 null 的情况。 如果您只想对表达式求值结果有效的消息应用延迟,可以使用 0(默认值)作为“默认延迟”(default-delay)。 对于任何延迟为 0(或更小)的消息,该消息将立即在调用线程上发送。spring-doc.cadn.net.cn

XML 解析器使用消息组 ID <beanName>.messageGroupId
延迟处理器支持表示毫秒间隔的表达式求值结果(任何 ObjecttoString() 方法产生的值可解析为 Long),以及表示绝对时间的 java.util.Date 实例。 在第一种情况下,毫秒从当前时间开始计算(例如,5000 的值会使消息在延迟器接收到该消息后至少延迟五秒)。 使用 Date 实例时,消息不会释放,直到达到该 Date 对象所表示的时间。 如果值等同于非正延迟或过去的日期,则不会产生延迟。 相反,它会直接在原始发送者的线程上发送到输出通道。 如果表达式求值结果不是 Date 且无法解析为 Long,则应用默认延迟(如果有——默认值为 0)。
表达式求值可能因各种原因抛出求值异常,包括表达式无效或其他条件。 默认情况下,此类异常会被忽略(尽管会在 DEBUG 级别记录日志),且延迟器会回退到默认延迟(如果有的话)。 您可以通过设置 ignore-expression-failures 属性来修改此行为。 默认情况下,该属性设置为 true,延迟器的行为如前所述。 然而,如果您希望不忽略表达式求值异常并将其抛给延迟器的调用者,请将 ignore-expression-failures 属性设置为 false

在上面的示例中,延迟表达式被指定为 headers['delay']。 这是 SpEL Indexer 语法,用于访问 Map 元素(MessageHeaders 实现了 Map)。 它调用:headers.get("delay")。 对于简单的映射元素名称(不包含“.”),您也可以使用 SpEL“点访问器”语法,其中前面显示的头部表达式可以指定为 headers.delay。 但是,如果缺少头部,则会得到不同的结果。 在第一种情况下,表达式的求值结果为 null。 第二种情况的结果类似于以下内容:spring-doc.cadn.net.cn

 org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
		   Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'

因此,如果存在表头可能被省略且您希望回退到默认延迟的情况,通常使用索引器语法而不是点属性访问器语法更为高效(且被推荐),因为检测 null 比捕获异常更快。spring-doc.cadn.net.cn

延迟器将任务委托给 Spring 的 TaskScheduler 抽象类的实例。 延迟器默认使用的调度器是 Spring Integration 在启动时提供的 ThreadPoolTaskScheduler 实例。 请参阅 配置任务调度器。 如果您想委托给不同的调度器,可以通过延迟器元素的 'scheduler' 属性提供引用,如下例所示:spring-doc.cadn.net.cn

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    scheduler="exampleTaskScheduler"/>

<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果您配置了外部 ThreadPoolTaskScheduler,则可以在此属性上设置 waitForTasksToCompleteOnShutdown = true。 它允许在应用程序关闭时成功完成处于执行状态的“延迟”任务(释放消息)。 在 Spring Integration 2.2 之前,此属性可用於 <delayer> 元素,因为 DelayHandler 可以在后台创建自己的调度器。 自 2.2 版本起,延迟器需要外部调度器实例,waitForTasksToCompleteOnShutdown 已被删除。 您应使用调度器自身的配置。
ThreadPoolTaskScheduler 拥有一个属性 errorHandler,该属性可以注入 org.springframework.util.ErrorHandler 的某个实现。 此处理器允许从发送延迟消息的计划任务线程中处理 Exception。 默认情况下,它使用一个 org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler,您可以在日志中看到堆栈跟踪信息。 您可能需要考虑使用 org.springframework.integration.channel.MessagePublishingErrorHandler,它将 ErrorMessage 发送到 error-channel(无论是来自失败消息的头部还是默认的 error-channel)。 此错误处理在事务回滚后执行(如果存在事务)。 请参阅 释放故障

延迟器和消息存储

The DelayHandler 将延迟消息持久化到提供的 MessageStore 中的消息组。('groupId'基于<delayer>元素所需的'id'属性。参见 DelayHandler.setMessageGroupId(String)。) 延迟消息会在 DelayHandleroutput-channel 发送消息之前,由定时任务立即从 MessageStore 中移除。如果提供的 MessageStore 是持久的(例如 JdbcMessageStore),则它提供了在应用程序关闭时不丢失消息的能力。应用程序启动后,DelayHandler 会从 MessageStore 中的消息组读取消息,并根据消息的原始到达时间(如果延迟为数值)重新调度这些消息。对于延迟头为 Date 的消息,在重新调度时将使用 Date。如果延迟消息在 MessageStore 中停留的时间超过其设定的“延迟”时间,它将在启动后立即发送。messageGroupId 是必需的,不能依赖可生成的 DelayHandler Bean 名称。这样,在应用程序重启后,DelayHandler 可能会获得一个新生成的 Bean 名称。因此,由于消息组不再由应用程序管理,延迟消息可能会在重新调度时丢失。spring-doc.cadn.net.cn

<delayer> 可以通过两个互斥元素中的任意一个进行增强:<transactional><advice-chain>。 这些 AOP 通知的 List 被应用于代理的内部 DelayHandler.ReleaseMessageHandler,该组件负责在延迟后于调度任务的 Thread 上释放消息。 例如,当下游消息流抛出异常且 ReleaseMessageHandler 的事务回滚时,可能会使用此功能。 在这种情况下,延迟的消息将保留在持久化的 MessageStore 中。 您可以在 <advice-chain> 内使用任何自定义的 org.aopalliance.aop.Advice 实现。 <transactional> 元素定义了一个仅包含事务性通知的简单通知链。 以下示例展示了 <delayer> 内的 advice-chainspring-doc.cadn.net.cn

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    message-store="jdbcMessageStore">
    <int:advice-chain>
        <beans:ref bean="customAdviceBean"/>
        <tx:advice>
            <tx:attributes>
                <tx:method name="*" read-only="true"/>
            </tx:attributes>
        </tx:advice>
    </int:advice-chain>
</int:delayer>

DelayHandler可以导出为带有管理操作(getDelayedMessageCountreschedulePersistedMessages)的JMX MBean,从而允许在运行时重新调度延迟持久化的消息——例如,如果之前已停止TaskScheduler。 这些操作可以通过Control Bus命令调用,如下示例所示:spring-doc.cadn.net.cn

Message<String> delayerReschedulingMessage =
    MessageBuilder.withPayload("'delayer.handler'.reschedulePersistedMessages").build();
controlBusChannel.send(delayerReschedulingMessage);
有关消息存储、JMX 和控制总线的更多信息,请参阅 系统管理

从版本 5.3.7 开始,如果将消息存储到 MessageStore 时存在活动事务,则释放任务将在 TransactionSynchronization.afterCommit() 回调中调度。 这是为了防止竞态条件,即调度的释放操作可能在事务提交之前运行,导致找不到该消息。 在这种情况下,消息将在延迟后或事务提交后(以较晚者为准)被释放。spring-doc.cadn.net.cn

发布失败

从版本 5.0.8 开始,延迟器(delayer)新增了两个属性:spring-doc.cadn.net.cn

当消息被释放时,如果下游流失败,将在 retryDelay 后尝试重新释放。 如果达到 maxAttempts,消息将被丢弃(除非释放是事务性的,在这种情况下,消息将保留在存储中,但不再计划释放,直到应用程序重新启动或调用上述的 reschedulePersistedMessages() 方法)。spring-doc.cadn.net.cn

此外,您可以配置一个 delayedMessageErrorChannel;当发布失败时,会向该通道发送一个 ErrorMessage,其中异常作为负载,并具有 originalMessage 属性。 ErrorMessage 包含一个标头 IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,其中包含当前计数。spring-doc.cadn.net.cn

如果错误流消费了错误消息并正常退出,则不再采取进一步操作;如果发布是事务性的,则事务将提交,并且消息将从存储中删除。 如果错误流抛出异常,则发布将根据上述讨论最多重试 maxAttempts 次。spring-doc.cadn.net.cn