|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
分布式锁
在许多情况下,必须以一种排他的方式对某些上下文(甚至是单条消息)执行操作。
一个例子是聚合器组件,我们需要检查当前消息的消息组状态,以确定是否可以释放该组,或者仅将该消息添加以供将来考虑。
为此,Java 提供了一个带有 java.util.concurrent.locks.Lock 个实现的 API。
然而,当应用程序是分布式的和/或在集群中运行时,问题变得更加复杂。
在这种情况下,锁定具有挑战性,需要一些共享状态及其特定方法来实现排他性要求。
Spring Integration 提供了一个基于 LockRegistry API 的内存中 DefaultLockRegistry 实现的 ReentrantLock 抽象。
LockRegistry 的 obtain(Object) 方法需要为特定上下文提供一个 lock key。
例如,聚合器使用 correlationKey 来锁定其组周围的操作。
这样可以使用不同的锁进行并发操作。
此 obtain(Object) 方法返回一个 java.util.concurrent.locks.Lock 实例(具体取决于 LockRegistry 的实现),因此其余逻辑与标准 Java 并发算法相同。
从 6.2 版本开始,LockRegistry 提供了一个 executeLocked() API(该接口中的 default 方法),用于在锁定状态下执行某些任务。
此 API 的行为类似于广为人知的 JdbcTemplate、JmsTemplate 或 RestTemplate。
以下示例演示了该 API 的用法:
LockRegistry registry = new DefaultLockRegistry();
...
registry.executeLocked("someLockKey", () -> someExclusiveResourceCall());
该方法将任务调用中抛出的异常重新抛出,如果 Lock 被中断,则抛出 InterruptedException。
此外,当 lock.tryLock() 返回 false 时,带有 Duration 的变体将抛出 java.util.concurrent.TimeoutException。
Spring Integration 为分布式锁提供了这些 LockRegistry 个实现:
Spring Integration AWS 扩展还实现了 DynamoDbLockRegistry。