分布式锁

在许多情况下,必须以一种排他的方式对某些上下文(甚至是单条消息)执行操作。 一个例子是聚合器组件,我们需要检查当前消息的消息组状态,以确定是否可以释放该组,或者仅将该消息添加以供将来考虑。 为此,Java 提供了一个带有 java.util.concurrent.locks.Lock 个实现的 API。 然而,当应用程序是分布式的和/或在集群中运行时,问题变得更加复杂。 在这种情况下,锁定具有挑战性,需要一些共享状态及其特定方法来实现排他性要求。spring-doc.cadn.net.cn

Spring Integration 提供了一个 LockRegistry 抽象,其基于 ReentrantLock API 的内存中 DefaultLockRegistry 实现。 LockRegistryobtain(Object) 方法需要针对特定上下文的 lock key。 例如,聚合器使用 correlationKey 对其组周围的锁操作进行锁定。 通过这种方式,不同的锁可以并发使用。 此 obtain(Object) 方法返回一个 java.util.concurrent.locks.Lock 实例(具体取决于 LockRegistry 实现),因此其余逻辑与标准 Java 并发算法相同。spring-doc.cadn.net.cn

从 6.2 版本开始,LockRegistry 提供了一个 executeLocked() API(该接口中的 default 方法),用于在锁定状态下执行某些任务。 此 API 的行为类似于广为人知的 JdbcTemplateJmsTemplateRestTemplate。 以下示例演示了该 API 的用法:spring-doc.cadn.net.cn

LockRegistry registry = new DefaultLockRegistry();
...
registry.executeLocked("someLockKey", () -> someExclusiveResourceCall());

该方法将任务调用中抛出的异常重新抛出,如果 Lock 被中断,则抛出 InterruptedException。 此外,当 lock.tryLock() 返回 false 时,带有 Duration 的变体将抛出 java.util.concurrent.TimeoutExceptionspring-doc.cadn.net.cn

Spring Integration 为分布式锁提供了这些 LockRegistry 个实现:spring-doc.cadn.net.cn

Spring Cloud AWS 也提供了一个 DynamoDbLockRegistryspring-doc.cadn.net.cn

从版本 7.0 开始,引入了 DistributedLock 接口,提供了新方法 lock(Duration ttl) 和 tryLock(long time, TimeUnit unit, Duration ttl),用于以自定义的生存时间(TTL)获取锁。 JdbcLockRedisLock 均实现了 DistributedLock 接口,以支持自定义生存时间的功能。 LockRegistry<L extends Lock> 现在是扩展 Lock 的类型的通用接口。 RenewableLockRegistry 接口现在提供了新的 renewLock(Object lockKey, Duration ttl) 方法,允许您以自定义的生存时间值续订锁。 JdbcLockRegistryRedisLockRegistry 均使用类型参数 DistributedLock 实现了 LockRegistryRenewableLockRegistry 接口。spring-doc.cadn.net.cn

以下是如何从注册表中获取 DistributedLock 并使用特定的存活时间值进行占用的示例:spring-doc.cadn.net.cn

DistributedLock lock = registry.obtain("foo");
Duration timeToLive = Duration.ofMillis(500);

if(lock.tryLock(100, TimeUnit.MILLISECONDS, timeToLive)){
    try {
        // do something
    } catch (Exception e) {
        // handle exception
    } finally{
        lock. unlock();
    }
}