|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
事务支持
理解消息流中的事务
Spring Integration 提供了多个钩子机制,以满足消息流的事务性需求。 为了更好地理解这些钩子以及您如何从中受益,我们首先需要回顾您可以用来启动消息流的六种机制,并探讨如何在每种机制中满足这些消息流的事务性需求。
以下六种机制会启动消息流(每种机制的详细信息在本手册中均有提供):
-
网关代理:一个基础的消息网关。
-
消息通道:与
MessageChannel个方法(例如channel.send(message))的直接交互。 -
消息发布者:作为在 Spring Bean 上方法调用副产品而启动消息流的方式。
-
入站通道适配器和网关:通过连接第三方系统与 Spring Integration 消息系统来启动消息流的方法(例如,
[JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel)。 -
调度器:一种基于预配置调度器分发的调度事件来启动消息流的方式。
-
轮询器:与调度器类似,这是通过预配置的轮询器分发的基于调度或间隔事件来启动消息流的方式。
我们可以将这六种机制分为两大类:
-
由用户进程发起的消息流:此类示例场景包括调用网关方法或显式向
Message发送MessageChannel。 换句话说,这些消息流依赖于第三方进程(例如您编写的某些代码)来发起。 -
由守护进程启动的消息流:此类示例场景包括轮询器轮询消息队列以使用轮询到的消息启动新的消息流,或调度器通过创建新消息并在预定义时间启动消息流来调度该过程。
显然,网关代理、MessageChannel.send(…) 和 MessagePublisher 都属于第一类,而入站适配器与网关、调度器以及轮询器则属于第二类。
那么,如何在各类场景下满足事务性需求?Spring Integration 是否需要针对特定场景提供显式的事务支持?或者,您是否可以直接使用 Spring 的事务支持功能?
Spring 本身为事务管理提供了第一类支持。 因此,我们的目标不是提供新功能,而是利用 Spring 来受益于其现有的事务支持。 换句话说,作为一个框架,我们必须向 Spring 的事务管理功能暴露钩子。 然而,由于 Spring Integration 的配置基于 Spring 配置,我们并不总是需要暴露这些钩子,因为 Spring 已经暴露了它们。 毕竟,每个 Spring Integration 组件都是一个 Spring Bean。
出于这个目标,我们可以再次考虑两种场景:由用户进程发起的消息流和由守护进程发起的消息流。
由用户进程发起并在 Spring 应用上下文中配置的消息流,将遵循此类进程通常的事务性配置。
因此,Spring Integration 无需显式配置以支持事务。
事务应通过 Spring 的标准事务支持来启动。
由于 Spring Integration 本身是由 Spring 配置的,其消息流自然会尊重组件的事务语义。
例如,网关或服务激活器方法可以标注 @Transactional,或者在 XML 配置中定义一个 TransactionInterceptor,并配以指向特定需要事务处理的方法的点切表达式。
总之,在这些场景中,您可以完全掌控事务配置和边界。
然而,当涉及到由守护进程启动的消息流时,情况则略有不同。 尽管这些流是由开发者配置的,但它们并不直接涉及人类或其他进程来启动。 这些是基于触发器的流,由触发器进程(即守护进程)根据进程的配置来启动。 例如,我们可以配置一个调度程序在每周五晚上启动一次消息流。 我们还可以配置一个触发器每秒启动一次消息流,依此类推。 因此,我们需要一种方式让这些基于触发器的进程知晓我们的意图,使生成的消息流具有事务性,以便每当启动新的消息流时都能创建一个事务上下文。 换句话说,我们需要暴露一些事务配置,但只需足以将任务委托给 Spring 已提供的事务支持即可(正如我们在其他场景中做的那样)。
轮询事务支持
Spring Integration 为轮询器(pollers)提供了事务支持。
轮询器是一种特殊类型的组件,因为在轮询器任务内部,我们可以对本身具有事务性的资源调用 receive(),从而将 receive() 调用包含在事务边界内;这样在任务失败时就可以进行回滚。
如果我们为通道(channels)添加相同的支持,新增的事务将会影响从 send() 调用开始的所有下游组件。
这为事务界定提供了一个相当宽的范围,却缺乏充分的理由,尤其是当 Spring 已经提供了多种方式来解决任何下游组件的事务需求时。
然而,receive() 方法被包含在事务边界中,正是轮询器的“充分理由”所在。
每当您配置一个 Poller 时,都可以通过使用 transactional 子元素及其属性来提供事务配置,如下例所示:
<int:poller max-messages-per-poll="1" fixed-rate="1000">
<transactional transaction-manager="txManager"
isolation="DEFAULT"
propagation="REQUIRED"
read-only="true"
timeout="1000"/>
</poller>
上述配置类似于原生的 Spring 事务配置。
您仍需提供对事务管理器的引用,并指定事务属性或依赖默认值(例如,如果未指定'transaction-manager'属性,则默认为名为'transactionManager'的 bean)。
在内部,该过程被封装在 Spring 的原生事务中,其中TransactionInterceptor负责处理事务。
有关如何配置事务管理器、事务管理器的类型(如 JTA、DataSource 等)以及其他与事务配置相关的详细信息,请参阅Spring Framework Reference Guide。
使用上述配置,由该轮询器启动的所有消息流都是事务性的。 有关轮询器事务配置的更多信息和详情,请参见轮询与事务。
除了事务处理外,在运行轮询器(poller)时,您可能还需要处理其他几个横切关注点。
为此,<advice-chain> 子元素被添加到 poller 元素中,允许您定义一组自定义的建议实例链,并将其应用于 Poller。
(有关更多详细信息,请参见 可轮询消息源。)
在 Spring Integration 2.0 中,Poller 经过了重构,现在使用代理机制来处理事务问题以及其他横切关注点。
由此产生的一个重要变化是,我们使 <transactional> 和 <advice-chain> 元素互斥。
这样做的理由是:如果您需要多个建议,且其中一个是事务建议,您可以将其包含在 <advice-chain> 中,就像以前一样方便,但拥有更多的控制权,因为现在您可以将建议按所需顺序排列。
以下示例展示了如何实现这一点:
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<advice-chain>
<ref bean="txAdvice"/>
<ref bean="someOtherAdviceBean" />
<beans:bean class="foo.bar.SampleAdvice"/>
</advice-chain>
</poller>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
上述示例展示了 Spring 事务通知(txAdvice)的一种基本基于 XML 的配置方式,并将其包含在 Poller 定义的 <advice-chain> 中。
如果您仅需处理 Poller 的事务相关问题,仍可继续使用 <transactional> 元素作为便捷选项。
事务边界
消息流中事务的边界是另一个重要因素。当事务启动时,事务上下文会绑定到当前线程。因此,无论您的消息流中有多少端点和通道,只要您确保流在同一个线程上继续执行,事务上下文就会得以保留。一旦您通过引入 可轮询通道 或 执行器通道,或在某些服务中手动启动新线程而破坏了它,事务边界也会被破坏。本质上,事务会在此处结束;如果线程间已成功交接,则整个流程将被视为成功,即使流程继续执行并在下游某处仍可能引发异常,也会发送 COMMIT 信号。如果此类流程是同步的,那么该异常将被抛回给消息流的发起者,该发起者同时也是事务上下文的发起者,事务将导致回滚(ROLLBACK)。在中立地带,应在任何线程边界被打破的地方使用事务性通道。例如,您可以使用基于队列的通道来委托事务性消息存储策略,或者使用基于 JMS 的通道。
事务同步
在某些环境中,将操作与涵盖整个流程的事务进行同步会有所帮助。
例如,考虑在流程开始时有一个 <file:inbound-channel-adapter/> 执行若干数据库更新。
如果事务提交,我们可能希望将文件移动到 success 目录;而如果事务回滚,则可能希望将其移动到 failure 目录。
Spring Integration 2.2 引入了将这些操作与事务同步的能力。
此外,如果您没有“真实”的事务但仍希望在成功或失败时执行不同的操作,可以配置 PseudoTransactionManager。
更多信息,请参阅 伪事务。
以下代码清单展示了该功能的关键策略接口:
public interface TransactionSynchronizationFactory {
TransactionSynchronization create(Object key);
}
public interface TransactionSynchronizationProcessor {
void processBeforeCommit(IntegrationResourceHolder holder);
void processAfterCommit(IntegrationResourceHolder holder);
void processAfterRollback(IntegrationResourceHolder holder);
}
工厂负责创建一个 TransactionSynchronization 对象。
您可以实现自己的实现,也可以使用框架提供的实现:DefaultTransactionSynchronizationFactory。
该实现返回一个 TransactionSynchronization,它委托给 TransactionSynchronizationProcessor 的默认实现:ExpressionEvaluatingTransactionSynchronizationProcessor。
此处理器支持三个 SpEL 表达式:beforeCommitExpression、afterCommitExpression 和 afterRollbackExpression。
这些操作对于熟悉事务处理的人员来说应该是显而易见的。
在每种情况下,#root 变量都是原始的 Message。
在某些情况下,根据轮询器轮询的 MessageSource,其他 SpEL 变量也会被提供。
例如,MongoDbMessageSource 提供了 #mongoTemplate 变量,该变量引用消息源的 MongoTemplate。
同样,RedisStoreMessageSource 提供了 #store 变量,该变量引用由轮询创建的 RedisStore。
要为特定轮询器启用此功能,您可以通过使用 synchronization-factory 属性,在轮询器的 <transactional/> 元素上提供对 TransactionSynchronizationFactory 的引用。
从版本 5 开始。0, Spring Integration 提供了 PassThroughTransactionSynchronizationFactory,当未配置 TransactionSynchronizationFactory 但建议链中存在类型为 TransactionInterceptor 的建议时,该功能默认应用于轮询端点。当使用任何开箱即用的 TransactionSynchronizationFactory 实现时,轮询端点会将轮询的消息绑定到当前的事务上下文,并在抛出异常后提供作为 failedMessage 的 MessagingException。当使用未实现 TransactionInterceptor 的自定义事务通知时,您可以显式配置一个 PassThroughTransactionSynchronizationFactory 来实现此行为。无论哪种情况,MessagingException 都成为发送给 errorChannel 的 ErrorMessage 的有效负载,而原因则是 advice 抛出的原始异常。此前,ErrorMessage 的负载是 Advice 抛出的原始异常,并未提供对 failedMessage 信息的引用,导致难以确定事务提交问题的原因。
为了简化这些组件的配置,Spring Integration 为默认工厂提供了命名空间支持。 以下示例展示了如何使用该命名空间来配置一个文件入站通道适配器:
<int-file:inbound-channel-adapter id="inputDirPoller"
channel="someChannel"
directory="/foo/bar"
filter="filter"
comparator="testComparator">
<int:poller fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
</int:poller>
</int-file:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
channel="committedChannel" />
<int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
SpEL 评估的结果将作为有效负载发送到 committedChannel 或 rolledBackChannel(在本例中,这将是 Boolean.TRUE 或 Boolean.FALSE——即 java.io.File.renameTo() 方法调用的结果)。
如果您希望将有效载荷的整个内容用于后续的 Spring Integration 处理,请使用 'payload' 表达式。
|
重要的是要理解,这会将操作与事务同步。 它并不会使本质上非事务性的资源变为事务性。 相反,在轮询之前会启动事务(无论是 JDBC 还是其他类型),并在流程完成后提交或回滚该事务,随后执行同步操作。 如果您提供了一个自定义的 |
除了 after-commit 和 after-rollback 表达式外,还支持 before-commit。
在这种情况下,如果评估(或下游处理)抛出异常,事务将回滚而不是提交。
伪事务
在阅读了事务同步部分后,您可能会认为,即使轮询器下游没有“真正”的事务资源(例如 JDBC),在流程完成时执行这些“成功”或“失败”操作也是有用的。 例如,考虑一个"<file:inbound-channel-adapter/>"后跟一个"<ftp:outbout-channel-adapter/>"的情况。 这两个组件都不是事务性的,但我们可能希望根据 FTP 传输的成功或失败将输入文件移动到其他目录。
为了提供此功能,该框架提供了一个PseudoTransactionManager,即使没有涉及真实的交易资源,也能启用上述配置。
如果流程正常完成,将调用beforeCommit和afterCommit同步操作。
如果发生失败,则调用afterRollback同步操作。
由于这不是真实的交易,因此不会发生实际的提交或回滚。
伪事务是用于启用同步功能的载体。
要使用 PseudoTransactionManager,您可以将其定义为 <bean/>,配置方式与真实的事务管理器相同。
以下示例展示了如何操作:
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />
响应式事务
从 5.3 版本开始,对于返回响应式类型的端点,ReactiveTransactionManager 也可以与 TransactionInterceptor 建议配合使用。
这包括产生带有 Flux 或 Mono 负载的消息的 MessageSource 和 ReactiveMessageHandler 实现(例如 ReactiveMongoDbMessageSource)。
所有其他产生回复的消息处理器实现,当其回复负载也是某种响应式类型时,可以依赖 ReactiveTransactionManager。