精尽 Redisson 源码分析 —— 可重入分布式锁 ReentrantLock
1. 概述
在 Redisson 中,提供了 8 种分布锁的实现,具体我们可以在 《Redisson 文档 —— 分布式锁和同步器》 中看到。绝大数情况下,我们使用可重入锁(Reentrant Lock)就够了,对应到就是 org.redisson.RedissonLock 类,具体的使用示例可以看看 《芋道 Spring Boot Redis 入门》 的「6.2 Redis 分布式锁」小节 。
在 《精尽 Redis 面试题》 的问题中,我们在聊到“如何使用 Redis 实现分布式锁?”这个题目中,提到了需要考虑的 7 个方面,这里我们再来重复看下:
1、正确的获得锁
set 指令附带 nx 参数,保证有且只有一个进程获得到。
2、正确的释放锁
使用 Lua 脚本,比对锁持有的是不是自己。如果是,则进行删除来释放。
3、超时的自动释放锁
set 指令附带 expire 参数,通过过期机制来实现超时释放。
4、未获得到锁的等待机制
sleep 或者基于 Redis 的订阅 Pub/Sub 机制。
一些业务场景,可能需要支持获得不到锁,直接返回 false ,不等待。
5、【可选】锁的重入性
通过 ThreadLocal 记录是第几次获得相同的锁。
1)有且第一次计数为 1 && 获得锁时,才向 Redis 发起获得锁的操作。
2)有且计数为 0 && 释放锁时,才向 Redis 发起释放锁的操作。6、锁超时的处理
一般情况下,可以考虑告警 + 后台线程自动续锁的超时时间。通过这样的机制,保证有且仅有一个线程,正在持有锁。
7、Redis 分布式锁丢失问题
具体看「方案二:Redlock」。
RedissonLock 实现了前 6 点,而第 7 点需要通过 org.redisson.RedissonRedLock 来实现,这个话题,我们后面在聊。
2. 整体一览
我们来看看 Redisson 锁相关的整体类图,如下:
RLock 接口
- org.redisson.api.RLockAsync ,定义了异步操作的接口。
- org.redisson.api.RLock ,继承 RLockAsync 的基础上,定义了同步操作的接口。比较有意思的是,RLock 同时实现继承 JDK 的
java.util.concurrent.locks.Lock
接口,从而符合 Java 的 Lock 的标准。 - 本文的主角 RedissonLock ,实现 RLock 接口,提供可重入的分布式锁实现。
RLockAsync 和 RLock 定义的接口,差别就在于同步和异步,所以我们就只看看 RLock 接口。代码如下:
String getName();// 锁定相关的接口方法,还有部分在 Lock 接口上 void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException; void lock(long leaseTime, TimeUnit unit); boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;// 解锁相关的接口方法,还有部分在 Lock 接口上 boolean forceUnlock();// 其它非关键方法 boolean isLocked(); boolean isHeldByThread(long threadId); boolean isHeldByCurrentThread(); int getHoldCount(); long remainTimeToLive(); |
3. Lua 脚本
在我们看具体的代码实现,我们先来看核心的使用到的 Lua 脚本,方便我们后续更好的理解 RedissonLock 的实现。
3.1 tryLockInnerAsync
#tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command)
方法,实现加锁逻辑,并且支持可重入性。代码如下:
FROM 《慢谈 Redis 实现分布式锁 以及 Redisson 源码解析》
加锁流程图
// RedissonLock.java1: <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {2: internalLockLeaseTime = unit.toMillis(leaseTime);3:4: return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,5: "if (redis.call('exists', KEYS[1]) == 0) then " + // 情况一,当前分布式锁被未被获得6: "redis.call('hset', KEYS[1], ARGV[2], 1); " + // 写入分布锁被 ARGV[2] 获取到了,设置数量为 1 。7: "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 设置分布式的过期时间为 ARGV[1]8: "return nil; " + // 返回 null ,表示成功9: "end; " +10: "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 情况二,如果当前分布锁已经被 ARGV[2] 持有11: "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 写入持有计数字 + 1 。12: "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 设置分布式的过期时间为 ARGV[1]13: "return nil; " + // 返回 null ,表示成功14: "end; " +15: "return redis.call('pttl', KEYS[1]);", // 情况三,获取不到分布式锁,则返回锁的过期时间。16: Collections.<Object>singletonList(getName()), // KEYS[分布式锁名]17: internalLockLeaseTime, getLockName(threadId)); // ARGV[锁超时时间,获得的锁名]18: } |
<2>
处,根据传入的leaseTime
+unit
参数,设置到internalLockLeaseTime
属性上,表示锁的时长。代码如下:// RedissonLock.java/*** 锁的时长*/ protected long internalLockLeaseTime;public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {// ... 省略其它代码this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); }
- 默认情况下,
internalLockLeaseTime
属性,使用 Lock 的 WatchDog 的超时时长30 * 1000
毫秒。默认的值,当且仅当我们未显示传入锁的时长时,才有用。例如说,稍后我们会看到的#lock()
等等方法中。 - 有一点,我们要特别注意,
internalLockLeaseTime
是 RedissonLock 的成员变量,并且也未声明volatile
修饰,所以跨线程使用同一个 RedissonLock 对象,可能会存在internalLockLeaseTime
读取不到最新值的情况。
- 默认情况下,
还是熟悉的配方,通过 Lua 脚本实现。具体传入的参数,朋友看下第 16 和 17 行的代码,对应的
KEYS
和ARGV
。KEYS[1]
:调用#getName()
方法获得分布式锁的名字。稍后,我们会看到分布式锁在 Redis 使用是以KEYS[1]
分布式锁为 KEY ,VALUE 为 HASH 类型。ARGV[1]
:锁的时长。ARGV[2]
:调用#getLockName(threadId)
方法,获得的锁名。该名字,用于表示该分布式锁正在被哪个进程的线程所持有。代码如下:// RedissonLock.java /*** ID ,就是 {@link ConnectionManager#getId()}*/ final String id;protected String getLockName(long threadId) {return id + ":" + threadId; }
- 可能描述看起来不是很好理解,我们来看一个获取到分布式锁的示例:
分布式锁的示例
- 可能描述看起来不是很好理解,我们来看一个获取到分布式锁的示例:
- 第 4 至 15 行:Lua 脚本,一共分成 3 种情况,已经添加了完整的注释。
不同于我们在市面上看到的 Redis 通过 SET 命令带上 NX 和 EXPIRE 的方式实现获得分布式锁,RedissonLock 提供重入性,所以需要 Lua 脚本来实现。当然,实际上,也可以通过 ThreadLocal 来实现重入性的技术。
3.2 unlockInnerAsync
#unlockInnerAsync(long threadId)
方法,实现解锁逻辑,并且支持可重入性。代码如下:
FROM 《慢谈 Redis 实现分布式锁 以及 Redisson 源码解析》
解锁流程图
// RedissonLock.java1: protected RFuture<Boolean> unlockInnerAsync(long threadId) {2: return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,3: "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 情况一,分布式锁未被 ARGV[3] 持有,则直接返回 null ,表示解锁失败。4: "return nil;" +5: "end; " +6: "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 持有锁的数量减 1 。7: "if (counter > 0) then " + // 情况二,如果后还有剩余的持有锁数量,则返回 0 ,表示解锁未完成8: "redis.call('pexpire', KEYS[1], ARGV[2]); " + // 重新设置过期时间为 ARGV[2]9: "return 0; " +10: "else " + // 情况三,不存在剩余的锁数量,则返回 1 ,表示解锁成功11: "redis.call('del', KEYS[1]); " + // 删除对应的分布式锁对应的 KEYS[1]12: "redis.call('publish', KEYS[2], ARGV[1]); " + // 发布解锁事件到 KEYS[2] ,通知气他可能要获取锁的线程13: "return 1; "+14: "end; " +15: "return nil;", // 不存在这个情况。16: Arrays.<Object>asList(getName(), getChannelName()), // KEYS[分布式锁名, 该分布式锁对应的 Channel 名]17: LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); // ARGV[解锁消息,锁超时时间,获得的锁名]18: } |
具体传入的参数,朋友看下第 16 和 17 行的代码,对应的
KEYS
和ARGV
。KEYS[1]
:调用#getName()
方法获得分布式锁的名字。KEYS[2]
:调用#getChannelName()
方法,该分布式锁对应的 Channel 名。因为 RedissonLock 释放锁时,会通过该 Channel 来 Publish 一条消息,通知其它可能在阻塞等待这条消息的客户端。代码如下:// RedissonLock.javaString getChannelName() {return prefixName("redisson_lock__channel", getName()); }
- 通过 Redis Pub/Sub 机制,实现未获得到锁的等待机制。
- 每个分布式锁,对应一个其独有的 Channel 。
ARGV[1]
:解锁消息 LockPubSub.UNLOCK_MESSAGE 。通过收到这条消息,其它等待锁的客户端,会重新发起获得锁的请求。具体的,我们在下文来一起瞅瞅。ARGV[2]
:锁的时长。ARGV[3]
:调用#getLockName(threadId)
方法,获得的锁名。
- 第 3 至 15 行:Lua 脚本,还是分成 3 种情况。
不同于我们在市面上看到的 Redis 通过 Lua 脚本的方式实现释放分布式锁,一共有 2 点:
- 1、要实现可重入性,所以只有在计数为 0 时,才会真正释放锁。
- 2、要实现客户端的等待通知,所以在释放锁时,Publish 一条释放锁的消息。
3.3 forceUnlockAsync
#forceUnlockAsync()
方法,实现强制解锁逻辑。代码如下:
// RedissonLock.java@Override public RFuture<Boolean> forceUnlockAsync() {cancelExpirationRenewal(null);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('del', KEYS[1]) == 1) then " // 情况一,释放锁成功,则通过 Publish 发布释放锁的消息,并返回 1+ "redis.call('publish', KEYS[2], ARGV[1]); "+ "return 1 "+ "else " // 情况二,释放锁失败,因为不存在这个 KEY ,所以返回 0+ "return 0 "+ "end",Arrays.<Object>asList(getName(), getChannelName()),LockPubSub.UNLOCK_MESSAGE); } |
- 比较简单,分成 2 种情况。
代码处理的比较细致,Redis DEL 成功,才 PUBLISH 发布释放锁的消息,避免错误通知客户端。
3.4 renewExpirationAsync
#renewExpirationAsync(long threadId)
方法,实现续锁逻辑。
我们先来看下 《Redisson 文档 —— 分布式锁和同步器》 ,有一段奇怪的说明:
RLock lock = redisson.getLock("anyLock"); // 最常见的使用方法 lock.lock(); |
大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定。
在使用 RedissonLock#lock()
方法,我们要求持续持有锁,直到手动释放。但是实际上,我们有一个隐藏条件,如果 Java 进程挂掉时,需要自动释放。那么,如果实现 RedissonLock#lock()
时,设置过期 Redis 为无限大,或者不过期都不合适。那么 RedissonLock 是怎么实现的呢?RedissonLock 先获得一个 internalLockLeaseTime
的分布式锁,然后每 internalLockLeaseTime / 3
时间,定时调用 #renewExpirationAsync(long threadId)
方法,进行续租。这样,在 Java 进程异常 Crash 掉后,能够保证最多 internalLockLeaseTime
时间后,分布式锁自动释放。
略骚略巧妙~不过为了实现这样的功能,RedissonLock 的整体逻辑,又复杂了一丢丢。
下面,还是先让我们看下具体的 #renewExpirationAsync(long threadId)
方法的代码,如下:
// RedissonLock.javaprotected RFuture<Boolean> renewExpirationAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 情况一,如果持有锁,则重新设置过期时间为 ARGV[1] internalLockLeaseTime ,并返回 1 续租成功。"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;", // 情况二,未吃货有,返回 0 续租失败。Collections.<Object>singletonList(getName()),internalLockLeaseTime, getLockName(threadId)); } |
- 比较简单,分成 2 种情况。
至此,我们看完了 Lua 脚本部分,其实基本也大体知道 RedissonLock 是如何实现加锁、接锁的逻辑。但是,复杂的逻辑,还在下面。
4. LockPubSub
在开始研究真正的加锁和解锁的调用之前,我们先看看和其相关的客户端订阅解锁消息,从而实现在持有锁的客户端释放锁时,等待锁的客户端能够快速的去调用加锁逻辑。
org.redisson.pubsub.LockPubSub ,继承 PublishSubscribe 抽象类,实现 Lock 相关消息的订阅。代码如下:
// LockPubSub.javapublic class LockPubSub extends PublishSubscribe<RedissonLockEntry> {/*** 锁释放的消息*/public static final Long UNLOCK_MESSAGE = 0L;/*** 读锁释放的消息*/public static final Long READ_UNLOCK_MESSAGE = 1L;public LockPubSub(PublishSubscribeService service) {super(service);}@Overrideprotected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {return new RedissonLockEntry(newPromise);}@Overrideprotected void onMessage(RedissonLockEntry value, Long message) {if (message.equals(UNLOCK_MESSAGE)) {// 回调监听器Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute != null) {runnableToExecute.run();}// 通过信号量,通知阻塞等待的线程value.getLatch().release();} else if (message.equals(READ_UNLOCK_MESSAGE)) {while (true) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute == null) {break;}runnableToExecute.run();}value.getLatch().release(value.getLatch().getQueueLength());}}} |
在
#createEntry(RPromise<RedissonLockEntry> newPromise)
方法,会创建 org.redisson.RedissonLockEntry 对象。代码如下:// RedissonLockEntry.javapublic class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {/*** 计数器** 每次发起订阅,则计数器 + 1* 每次取消订阅,则计数器 - 1 。当减少到 0 时,才正常取消订阅。*/private int counter;/*** 信号量,用于实现 RedissonLock 阻塞等待的通知*/private final Semaphore latch;private final RPromise<RedissonLockEntry> promise;/*** 监听器们*/private final ConcurrentLinkedQueue<Runnable> listeners = new ConcurrentLinkedQueue<Runnable>();public RedissonLockEntry(RPromise<RedissonLockEntry> promise) {super();this.latch = new Semaphore(0);this.promise = promise;}@Overridepublic void aquire() {counter++;}@Overridepublic int release() {return --counter;}@Overridepublic RPromise<RedissonLockEntry> getPromise() {return promise;}public void addListener(Runnable listener) {listeners.add(listener);}public boolean removeListener(Runnable listener) {return listeners.remove(listener);}public ConcurrentLinkedQueue<Runnable> getListeners() {return listeners;}public Semaphore getLatch() {return latch;}}
- 虽然代码比较多,我们重点来看
latch
和listeners
属性。 latch
属性:信号量,用于实现 RedissonLock 阻塞等待的通知。在我们下面看到同步加锁的逻辑,会看到通过它来实现阻塞等待。listeners
属性:监听器,实现订阅到锁的释放消息,从而再次发起获得锁。当然,这里的 Runnable 对象肯定无法体现,具体我们后面看看#tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Boolean> result, long currentThreadId)
或者#lockAsync(long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Void> result, long currentThreadId)
方法,就可以看到方法内部会创建具体的 Runnable 实现类,实现再次发起获得锁的逻辑。
- 虽然代码比较多,我们重点来看
- 在
#onMessage(RedissonLockEntry value, Long message)
方法中,在接收到释放锁的消息后,会执行listeners
的回调,以及latch
的时候放。
当然,单单看 LockPubSub 类。LockPubSub 更多的是实现了锁释放消息的监听,以及回调监听器,释放信号量。真正的逻辑,还是要看监听器的逻辑,以及 RedissonLock 是怎么实现信号量的。
另外,在 RedissonLock 中,提供如下几个方法,发起和取消订阅。代码如下:
// RedissonLock.java/*** Sub Entry 名字*/ final String entryName;protected final LockPubSub pubSub;public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {// ... 省略其他无关this.entryName = id + ":" + name;this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); }/*** 获得线程对应的 RedissonLockEntry 对象** @param threadId 线程编号* @return RedissonLockEntry 对象*/ protected RedissonLockEntry getEntry(long threadId) {return pubSub.getEntry(getEntryName()); }/*** 异步发起订阅** @param threadId 线程编号* @return RFuture 对象*/ protected RFuture<RedissonLockEntry> subscribe(long threadId) {return pubSub.subscribe(getEntryName(), getChannelName()); }/*** 异步取消订阅** @param future RFuture 对象* @param threadId 线程编号*/ protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {pubSub.unsubscribe(future.getNow(), getEntryName(), getChannelName()); } |
5. tryLockAsync
#tryLockAsync(long waitTime, TimeUnit unit)
方法,异步加锁,并返回是否成功。代码如下:
// RedissonLock.java@Override public RFuture<Boolean> tryLockAsync(long waitTime, TimeUnit unit) {return tryLockAsync(waitTime, -1, unit); }@Override public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit) {// 获得线程编号long currentThreadId = Thread.currentThread().getId();// 执行锁return tryLockAsync(waitTime, leaseTime, unit, currentThreadId); } |
- 最终都调用
#tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long currentThreadId)
方法,真正实现异步加锁的逻辑。
#tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long currentThreadId)
方法,代码如下:
// RedissonLock.java1: @Override2: public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long currentThreadId) {3: // 创建 RPromise 对象,用于通知结果4: RPromise<Boolean> result = new RedissonPromise<Boolean>();5:6: // 表示剩余的等待获得锁的时间7: AtomicLong time = new AtomicLong(unit.toMillis(waitTime));8: // 记录当前时间9: long currentTime = System.currentTimeMillis();10: // 执行异步获得锁11: RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);12: ttlFuture.onComplete((ttl, e) -> {13: // 如果发生异常,则通过 result 通知异常14: if (e != null) {15: result.tryFailure(e);16: return;17: }18:19: // lock acquired20: // 如果获得到锁,则通过 result 通知获得锁成功21: if (ttl == null) {22: if (!result.trySuccess(true)) { // 如果处理 result 通知对结果返回 false ,意味着需要异常释放锁23: unlockAsync(currentThreadId);24: }25: return;26: }27:28: // 减掉已经等待的时间29: long el = System.currentTimeMillis() - currentTime;30: time.addAndGet(-el);31:32: // 如果无剩余等待的时间,则通过 result 通知获得锁失败33: if (time.get() <= 0) {34: trySuccessFalse(currentThreadId, result);35: return;36: }37:38: // 记录新的当前时间39: long current = System.currentTimeMillis();40: // 记录下面的 future 的指向41: AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();42:43: // 创建 SUBSCRIBE 订阅的 Future44: RFuture<RedissonLockEntry> subscribeFuture = subscribe(currentThreadId);45: subscribeFuture.onComplete((r, ex) -> {46: // 如果发生异常,则通过 result 通知异常47: if (ex != null) {48: result.tryFailure(ex);49: return;50: }51:52: // 如果创建定时任务 Future scheduledFuture,则进行取消53: if (futureRef.get() != null) {54: futureRef.get().cancel();55: }56:57: // 减掉已经等待的时间58: long elapsed = System.currentTimeMillis() - current;59: time.addAndGet(-elapsed);60:61: // 再次执行异步获得锁62: tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);63: });64:65: // 如果创建 SUBSCRIBE 订阅的 Future 未完成,创建定时任务 Future scheduledFuture 。66: if (!subscribeFuture.isDone()) {67: Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {68: @Override69: public void run(Timeout timeout) throws Exception {70: // 如果创建 SUBSCRIBE 订阅的 Future 未完成71: if (!subscribeFuture.isDone()) {72: // 进行取消 subscribeFuture73: subscribeFuture.cancel(false);74: // 通过 result 通知获得锁失败75: trySuccessFalse(currentThreadId, result);76: }77: }78: }, time.get(), TimeUnit.MILLISECONDS); // 延迟 time 秒后执行79: // 记录 futureRef 执行 scheduledFuture80: futureRef.set(scheduledFuture);81: }82: });83:84: return result;85: } |
- 整体逻辑是,获得分布锁。如果获取失败,则发起 Redis Pub/Sub 订阅,等待释放锁的消息,从而再次发起获得分布式锁。
- 第 11 行:调用
#tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId)
方法,执行异步获得锁。 - 第 13 至 17 行:如果发生异常,则通过
result
通知异常。 - 第 19 至 26 行:如果
ttl
为空,说明获得到锁了,则通过result
通知获得锁成功。这里,在第 23 至 24 行有个小细节,看下注释。 - 第 41 行:声明
futureRef
变量,用于设置第 65 至 81 行创建的定时任务。 第 65 至 82 行:如果创建 SUBSCRIBE 订阅的 Future
subscribeFuture
未完成,创建定时任务 FuturescheduledFuture
。因为subscribeFuture
是异步的,而存在一个情况,可能subscribeFuture
未完成时,等待获得锁已经超时,所以通过scheduledFuture
来实现超时通知。- 第 80 行:记录
futureRef
为scheduledFuture
。 - 第 71 行:兜底判断
subscribeFuture
未完成。 - 第 73 行:进行取消
subscribeFuture
。 第 75 行:调用
#trySuccessFalse(long currentThreadId, RPromise<Boolean> result)
方法,通知获得锁失败。代码如下:// RedissonLock.javaprotected RFuture<Void> acquireFailedAsync(long threadId) {return RedissonPromise.newSucceededFuture(null); }private void trySuccessFalse(long currentThreadId, RPromise<Boolean> result) {acquireFailedAsync(currentThreadId).onComplete((res, e) -> {if (e == null) { // 通知获得锁失败result.trySuccess(false);} else { // 通知异常result.tryFailure(e);}}); }
- x
- 第 80 行:记录
第 43 至 63 行:创建 SUBSCRIBE 订阅的 Future
subscribeFuture
。通过订阅释放锁的消息,从而实现等待锁释放的客户端,快速抢占加锁。- 第 46 至 50 行:如果发生异常,则通过
result
通知异常。 - 第 52 至 55 行:如果创建定时任务 Future
scheduledFuture
,则进行取消。 - 第 57 至 59 行:减掉已经等待的时间。
第 62 行:调用
#tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Boolean> result, long currentThreadId
方法,再次执行异步获得锁。详细解析,见 「5.2 更强的 tryLockAsync」 小节。
- 第 46 至 50 行:如果发生异常,则通过
感叹,想要写好全异步的代码,实际是非常困难的,Spring Webflux 反应式框架,想要推广在编写业务逻辑,基本可能性是为零。当然,Webflux 乃至反应式编程,更加适合推广在基础组件中。
5.1 tryAcquireAsync
#tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId)
方法,执行异步获得锁。代码如下:
// RedissonLock.javaprivate <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {// <1> 情况一,如果锁有时长,则直接获得分布式锁if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}// <2> 情况二,如果锁无时长,则先获得 Lock WatchDog 的锁超时时长RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {// 如果发生异常,则直接返回if (e != null) {return;}// lock acquired// 如果获得到锁,则创建定时任务,定时续锁if (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture; } |
- 一共分成两种情况,是否锁有时长。
<1>
处,leaseTime != -1
,意味着锁设置了时长,则调用 「3.1 #tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command)」 方法,直接获得分布式锁。<2>
处,锁未设置了时长,所以先调用 「3.1 #tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command)」 方法,获得 Lock WatchDog 的锁超时时长的分布式锁,然后在回调中,再调用#scheduleExpirationRenewal(long threadId)
方法,创建定时任务,定时调用 「3.4 renewExpirationAsync」 续锁。详细解析,见 TODO 。
5.2 更强的 tryLockAsync
#tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Boolean> result, long currentThreadId)
方法,更强的异步加锁。主要强在 2 点:
- 1、增加监听锁释放的消息的监听器,从而实现等待锁的客户端快速抢占锁的逻辑。
- 2、增加锁超时自动释放,没有锁释放消息的处理。
整体代码如下:
// RedissonLock.java1: private void tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Boolean> result, long currentThreadId) {2: // 如果 result 已经完成,则直接返回,并取消订阅3: if (result.isDone()) {4: unsubscribe(subscribeFuture, currentThreadId);5: return;6: }7:8: // 如果剩余时间 time 小于 0 ,说明等待超时,则取消订阅,并通过 result 通知失败9: if (time.get() <= 0) {10: unsubscribe(subscribeFuture, currentThreadId);11: trySuccessFalse(currentThreadId, result);12: return;13: }14:15: // 记录当前时间16: long curr = System.currentTimeMillis();17: // 获得分布式锁18: RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);19: ttlFuture.onComplete((ttl, e) -> {20: // 如果发生异常,则取消订阅,并通过 result 通知异常21: if (e != null) {22: unsubscribe(subscribeFuture, currentThreadId);23: result.tryFailure(e);24: return;25: }26:27: // lock acquired28: // 如果获得到锁,则取消订阅,并通过 result 通知获得锁成功29: if (ttl == null) {30: unsubscribe(subscribeFuture, currentThreadId);31: if (!result.trySuccess(true)) {32: unlockAsync(currentThreadId);33: }34: return;35: }36:37: // 减掉已经等待的时间38: long el = System.currentTimeMillis() - curr;39: time.addAndGet(-el);40:41: // 如果无剩余等待的时间,则取消订阅,并通过 result 通知获得锁失败42: if (time.get() <= 0) {43: unsubscribe(subscribeFuture, currentThreadId);44: trySuccessFalse(currentThreadId, result);45: return;46: }47:48: // waiting for message49: // 记录新的当前时间50: long current = System.currentTimeMillis();51: // 获得当前线程对应的 RedissonLockEntry 对象52: RedissonLockEntry entry = getEntry(currentThreadId);53: // 尝试获得 entry 中的信号量,如果获得成功,说明 SUBSCRIBE 已经收到释放锁的消息,则直接立马再次去获得锁。54: if (entry.getLatch().tryAcquire()) {55: tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);56: } else {57: // 创建 AtomicBoolean 变量 executed ,用于标记下面创建的 listener 是否执行。58: AtomicBoolean executed = new AtomicBoolean();59: // 创建 AtomicReference 对象,用于指向定时任务60: AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();61:62: // 创建监听器 listener ,用于在 RedissonLockEntry 的回调,就是我们看到的 PublishSubscribe 监听到释放锁的消息,进行回调。63: Runnable listener = () -> {64: // 标记已经执行65: executed.set(true);66: // 如果有定时任务的 Future ,则进行取消67: if (futureRef.get() != null) {68: futureRef.get().cancel();69: }70:71: // 减掉已经等待的时间72: long elapsed = System.currentTimeMillis() - current;73: time.addAndGet(-elapsed);74:75: // 再次获得分布式锁76: tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);77: };78: // 添加 listener 到 RedissonLockEntry 中79: entry.addListener(listener);80:81: // 下面,会创建一个定时任务。因为极端情况下,可能不存在释放锁的消息,例如说锁自动超时释放,所以需要改定时任务,在获得到锁的超时后,主动去抢下。82: long t = time.get();83: if (ttl >= 0 && ttl < time.get()) { // 如果剩余时间小于锁的超时时间,则使用剩余时间。84: t = ttl;85: }86: // 如果 listener 未执行87: if (!executed.get()) {88: Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {89: @Override90: public void run(Timeout timeout) throws Exception {91: // 移除 listener 从 RedissonLockEntry 中92: if (entry.removeListener(listener)) {93: // 减掉已经等待的时间94: long elapsed = System.currentTimeMillis() - current;95: time.addAndGet(-elapsed);96:97: // 再次获得分布式锁98: tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);99: } 100: } 101: }, t, TimeUnit.MILLISECONDS); 102: // 记录 futureRef 执行 scheduledFuture 103: futureRef.set(scheduledFuture); 104: } 105: } 106: }); 107: } |
- 第 2 至 46 行:和 「5. tryLockAsync」 基本一致,就不重复哔哔了。
- 第 52 行:调用
#getEntry(long threadId)
方法,获得当前线程对应的 RedissonLockEntry 对象。 - 第 53 至 55 行:尝试获得
entry
中的信号量,如果获得成功,说明 SUBSCRIBE 已经收到释放锁的消息,则调用 「5.2 ##tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Boolean> result, long currentThreadId)」 方法,直接立马再次去获得锁。 - 第 58 行:创建 AtomicBoolean 变量
executed
,用于标记下面创建的listener
是否执行。 - 第 60 行:声明
futureRef
变量,用于设置第 87 至 104 行创建的定时任务。因为极端情况下,可能不存在释放锁的消息,例如说锁自动超时释放,所以需要改定时任务,在获得到锁的超时后,主动去抢下。- 第 82 至 85 行:计算定时任务的延迟时间时间。如果剩余时间小于锁的超时时间,则使用剩余时间。
- 第 87 行:通过
executed
变量,判断listener
未执行。 - 第 103 行: 记录
futureRef
为scheduledFuture
。 - 第 92 行:移除
listener
从 RedissonLockEntry 中。避免,可能存在的并发执行。 - 第 98 行:调用 「5.2 ##tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Boolean> result, long currentThreadId)」 方法,再次去获得锁。
- 这个定时任务,真的处理的是细节中的细节。之前思考获得分布式失败客户端的等待通知,只考虑了 Redis Pub/Sub 机制来实现,没有想到如果没有 PUBLISH 消息的场景。这块的逻辑,算是看 RedissonLock 最大的收获吧。
- 第 62 至 79 行:创建监听器
listener
,用于在 RedissonLockEntry 的回调,就是我们看到的 PublishSubscribe 监听到释放锁的消息,进行回调。- 第 79 行:添加
listener
到 RedissonLockEntry 中。 - 第 65 行:通过
executed
标记已经执行。 - 第 66 至 69 行:如果有定时任务的 Future ,则进行取消。
- 第 71 至 74 行:减掉已经等待的时间。
- 第 76 行:调用 「5.2 ##tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Boolean> result, long currentThreadId)」 方法,再次去获得锁。
- 第 79 行:添加
至此,RedissonLock 加锁的逻辑我们已经全部看完。
5.3 遗漏的 tryLockAsync
还有两个重载的 #tryLockAsync(...)
方法,它们是未设置锁定时长的两个。代码如下:
// RedissonLock.java@Override public RFuture<Boolean> tryLockAsync() {return tryLockAsync(Thread.currentThread().getId()); }@Override public RFuture<Boolean> tryLockAsync(long threadId) {return tryAcquireOnceAsync(-1, null, threadId); } |
- 最终都调用
#tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId)
方法,真正实现异步加锁的逻辑。
#tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId)
方法,真正实现异步加锁的逻辑。代码如下:
// RedissonLock.javaprivate RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {// 情况一,如果锁有时长,则直接获得分布式锁if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);}// 情况二,如果锁无时长,则先获得 Lock WatchDog 的锁超时时长RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {// 如果发生异常,则直接返回if (e != null) {return;}// lock acquired// 如果获得到锁,则创建定时任务,定时续锁if (ttlRemaining) {scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture; } |
- 看到这个方法,是不是发现很熟悉,和 「5.1 tryAcquireAsync」 基本一模一样。差别在于它的返回的结果是
RFuture<Boolean>
。 - 有一点要特别注意,因为本小节我们看到的两个
#tryLockAsync(...)
方法,是尝试去加锁。如果加锁失败,则返回false
即可,所以不会像我们在 「5.1 tryLockAsync」 方法,无限重试直到等待超时(超过waitTime
)。
6. tryLock
#tryLock(long waitTime, long leaseTime, TimeUnit unit)
方法,同步加锁,并返回是否成功。。代码如下:
// RedissonLock.java@Overridepublic boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {return tryLock(waitTime, -1, unit);}1: @Override2: public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {3: long time = unit.toMillis(waitTime);4: long current = System.currentTimeMillis();5: long threadId = Thread.currentThread().getId();6: // 同步获加锁7: Long ttl = tryAcquire(leaseTime, unit, threadId);8: // lock acquired9: // 加锁成功,直接返回 true 加锁成功10: if (ttl == null) {11: return true;12: }13:14: // 减掉已经等待的时间15: time -= System.currentTimeMillis() - current;16: // 如果无剩余等待的时间,则返回 false 加锁失败17: if (time <= 0) {18: acquireFailed(threadId);19: return false;20: }21:22: // 记录新的当前时间23: current = System.currentTimeMillis();24: // 创建 SUBSCRIBE 订阅的 Future25: RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);26: // 阻塞等待订阅发起成功27: if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {28: // 进入到此处,说明阻塞等待发起订阅超时29: // 取消 SUBSCRIBE 订阅30: if (!subscribeFuture.cancel(false)) {31: // 进入到此处,说明取消发起订阅失败,则通过设置回调,在启发订阅完成后,回调取消 SUBSCRIBE 订阅32: subscribeFuture.onComplete((res, e) -> {33: if (e == null) {34: unsubscribe(subscribeFuture, threadId);35: }36: });37: }38: // 等待超时,则返回 false 加锁失败39: acquireFailed(threadId);40: return false;41: }42:43: try {44: // 减掉已经等待的时间45: time -= System.currentTimeMillis() - current;46: // 如果无剩余等待的时间,则返回 false 加锁失败47: if (time <= 0) {48: acquireFailed(threadId);49: return false;50: }51:52: while (true) {53: // 记录新的当前时间54: long currentTime = System.currentTimeMillis();55: // 同步获加锁56: ttl = tryAcquire(leaseTime, unit, threadId);57: // lock acquired58: // 加锁成功,直接返回 true 加锁成功59: if (ttl == null) {60: return true;61: }62:63: // 减掉已经等待的时间64: time -= System.currentTimeMillis() - currentTime;65: // 如果无剩余等待的时间,则返回 false 加锁失败66: if (time <= 0) {67: acquireFailed(threadId);68: return false;69: }70:71: // waiting for message72: // 记录新的当前时间73: currentTime = System.currentTimeMillis();74:75: // 通过 RedissonLockEntry 的信号量,阻塞等待锁的释放消息,或者 ttl/time 超时(例如说,锁的自动超时释放)76: if (ttl >= 0 && ttl < time) {77: getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);78: } else {79: getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);80: }81:82: // 减掉已经等待的时间83: time -= System.currentTimeMillis() - currentTime;84: // 如果无剩余等待的时间,则返回 false 加锁失败85: if (time <= 0) {86: acquireFailed(threadId);87: return false;88: }89: }90: } finally {91: // 小细节,需要最终取消 SUBSCRIBE 订阅92: unsubscribe(subscribeFuture, threadId);93: }94: // return get(tryLockAsync(waitTime, leaseTime, unit));95: } |
第 7 行:调用
#tryAcquire(long leaseTime, TimeUnit unit, long threadId)
方法,同步加锁。代码如下:// RedissonLock.javaprivate Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(leaseTime, unit, threadId)); }
- 该方法内部,调用的就是 「5.1 #tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId)」 方法。
第 8 至 12 行:加锁成功,直接返回
true
加锁成功。- 第 15 行:减掉已经等待的时间。
- 第 17 至 20 行:如果无剩余等待的时间,则返回
false
加锁失败。 - 第 25 行:调用
#subscribe(long threadId)
方法,创建 SUBSCRIBE 订阅的 FuturesubscribeFuture
。 - 【重要差异点】第 27 至 41 行:调用
#await(subscribeFuture, time, TimeUnit.MILLISECONDS)
方法,阻塞等待订阅发起成功,因为subscribeFuture
是异步的,需要这一步转同步。如果发生超时,则就会进入第 28 至 37 行的取消逻辑,并在第 38 至 40 行返回false
加锁失败。 - 第 52 至 89 行:反复重试,直到成功加锁返回
true
,或者超时返回false
。- 第 54 至 73 行:重试一波第 6 至 20 行的逻辑。
- 【重要差异点】第 75 至 80 行:通过 RedissonLockEntry 的信号量,阻塞等待锁的释放消息,或者
ttl
/time
超时(例如说,锁的自动超时释放)。- 相比 「5.2 #tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Boolean> result, long currentThreadId)」 方法,它把信号量的等待和定时任务的等待融合在一起了。
- 等待完成后,如果无剩余时间,在第 82 至 88 行的逻辑中,返回
false
加锁失败。 - 等待完成后,如果有剩余时间,在第 56 行:获得重新同步获得锁。
- 第 92 行:调用
#unsubscribe(RFuture<RedissonLockEntry> future, long threadId)
方法,小细节,需要最终取消 SUBSCRIBE 订阅。
7. lockAsync
#lockAsync(long leaseTime, TimeUnit unit, long currentThreadId)
方法,异步加锁,无需返回是否成功。代码如下:
// RedissonLock.java@Overridepublic RFuture<Void> lockAsync() {return lockAsync(-1, null);}@Overridepublic RFuture<Void> lockAsync(long leaseTime, TimeUnit unit) {// 获得线程编号long currentThreadId = Thread.currentThread().getId();// 异步锁return lockAsync(leaseTime, unit, currentThreadId);}@Overridepublic RFuture<Void> lockAsync(long currentThreadId) {return lockAsync(-1, null, currentThreadId);}1: @Override2: public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit, long currentThreadId) {3: // 创建 RPromise 对象,用于异步回调4: RPromise<Void> result = new RedissonPromise<Void>();5: // 异步加锁6: RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);7: ttlFuture.onComplete((ttl, e) -> {8: // 如果发生异常,则通过 result 通知异常9: if (e != null) {10: result.tryFailure(e);11: return;12: }13:14: // lock acquired15: // 如果获得到锁,则通过 result 通知获得锁成功16: if (ttl == null) {17: if (!result.trySuccess(null)) { // 如果处理 result 通知对结果返回 false ,意味着需要异常释放锁18: unlockAsync(currentThreadId);19: }20: return;21: }22:23: // 创建 SUBSCRIBE 订阅的 Future24: RFuture<RedissonLockEntry> subscribeFuture = subscribe(currentThreadId);25: subscribeFuture.onComplete((res, ex) -> {26: // 如果发生异常,则通过 result 通知异常27: if (ex != null) {28: result.tryFailure(ex);29: return;30: }31:32: // 异步加锁33: lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);34: });35: });36:37: return result;38: } |
- 第 6 行:调用 「5.1 #tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId)」 方法,执行异步获得锁。
- 第 7 至 35 行:又是熟悉的配方,在回调中,处理响应的加锁结果。差异就在第 34 行,见 「7.1 更强的 lockAsync」 的详细解析。
7.1 更强的 lockAsync
实际上,#lockAsync(long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Void> result, long currentThreadId)
方法,和 「5.2 更强的 tryLockAsync」 是基本一致的。那么为什么不直接重用呢?注意,这个方法不需要考虑等待超时,有一种“劳资有钱,必须拿到锁”。
代码如下:
// RedissonLock.javaprivate void lockAsync(long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Void> result, long currentThreadId) {// 获得分布式锁RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);ttlFuture.onComplete((ttl, e) -> {// 如果发生异常,则取消订阅,并通过 result 通知异常if (e != null) {unsubscribe(subscribeFuture, currentThreadId);result.tryFailure(e);return;}// lock acquired// 如果获得到锁,则取消订阅,并通过 result 通知获得锁成功if (ttl == null) {unsubscribe(subscribeFuture, currentThreadId);if (!result.trySuccess(null)) {unlockAsync(currentThreadId);}return;}// 获得当前线程对应的 RedissonLockEntry 对象RedissonLockEntry entry = getEntry(currentThreadId);// 尝试获得 entry 中的信号量,如果获得成功,说明 SUBSCRIBE 已经收到释放锁的消息,则直接立马再次去获得锁。if (entry.getLatch().tryAcquire()) {lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);} else {// waiting for message// 创建 AtomicReference 对象,用于指向定时任务AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();// 创建监听器 listener ,用于在 RedissonLockEntry 的回调,就是我们看到的 PublishSubscribe 监听到释放锁的消息,进行回调。Runnable listener = () -> {// 如果有定时任务的 Future ,则进行取消if (futureRef.get() != null) {futureRef.get().cancel();}// 再次获得分布式锁lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);};// 添加 listener 到 RedissonLockEntry 中entry.addListener(listener);// 下面,会创建一个定时任务。因为极端情况下,可能不存在释放锁的消息,例如说锁自动超时释放,所以需要改定时任务,在获得到锁的超时后,主动去抢下。if (ttl >= 0) {Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {// 移除 listener 从 RedissonLockEntry 中if (entry.removeListener(listener)) {// 再次获得分布式锁lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);}}}, ttl, TimeUnit.MILLISECONDS);// 记录 futureRef 执行 scheduledFuturefutureRef.set(scheduledFuture);}}}); } |
- 更加熟悉的配方,全程无需处理等待锁超时的逻辑。
8. lock
#tryLock(long waitTime, long leaseTime, TimeUnit unit)
方法,同步加锁,无需返回是否成功。代码如下:
// RedissonLock.java@Override public void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();} }@Override public void lock(long leaseTime, TimeUnit unit) {try {lock(leaseTime, unit, false);} catch (InterruptedException e) {throw new IllegalStateException();} }@Override public void lockInterruptibly() throws InterruptedException {lock(-1, null, true); }@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {lock(leaseTime, unit, true); }1: private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {2: long threadId = Thread.currentThread().getId();3: // 同步获加锁4: Long ttl = tryAcquire(leaseTime, unit, threadId);5: // lock acquired6: // 加锁成功,直接返回7: if (ttl == null) {8: return;9: }10:11: // 创建 SUBSCRIBE 订阅的 Future12: RFuture<RedissonLockEntry> future = subscribe(threadId);13: // 阻塞等待订阅发起成功14: commandExecutor.syncSubscription(future);15:16: try {17: while (true) {18: // 同步获加锁19: ttl = tryAcquire(leaseTime, unit, threadId);20: // lock acquired21: // 加锁成功,直接返回22: if (ttl == null) {23: break;24: }25:26: // waiting for message27: // 通过 RedissonLockEntry 的信号量,阻塞等待锁的释放消息,或者 ttl/time 超时(例如说,锁的自动超时释放)28: if (ttl >= 0) {29: try {30: getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);31: } catch (InterruptedException e) {32: // 如果允许打断,则抛出 e33: if (interruptibly) {34: throw e;35: }36: // 如果不允许打断,则继续37: getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);38: }39: } else {40: if (interruptibly) {41: getEntry(threadId).getLatch().acquire();42: } else {43: getEntry(threadId).getLatch().acquireUninterruptibly();44: }45: }46: }47: } finally {48: // 小细节,需要最终取消 SUBSCRIBE 订阅49: unsubscribe(future, threadId);50: }51: // get(lockAsync(leaseTime, unit));52: }53:54: private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {55: return get(tryAcquireAsync(leaseTime, unit, threadId));56: } |
- 太过熟悉,就不哔哔了。
至此,加锁的几种组合排列,我们就已经看完了。
精尽 Redisson 源码分析 —— 可重入分布式锁 ReentrantLock相关推荐
- 精尽 Redisson 源码分析 —— 限流器 RateLimiter
1. 概述 限流,无论在系统层面,还是在业务层面,使用都非常广泛.例如说: [业务]为了避免恶意的灌水机或者用户,限制每分钟至允许回复 10 个帖子. [系统]为了避免服务系统被大规模调用,超过极限, ...
- 精尽 Redisson 源码分析 —— 可靠分布式锁 RedLock
1. 概述 我们来看一个 Redis 主从结构下的示例,Redis 分布式锁是如何失效的: 1.客户端 A 从 Redis Master 获得到锁 anylock . 2.在 Redis Master ...
- 精尽 Dubbo 源码分析 —— API 配置
1. 概述 Dubbo 的配置目前提供了四种配置方式:1. API 配置 2. 属性配置 3. XML 配置 4. 注解配置 2. 配置一览 我们来看看 dubbo-config-api 的项目结构, ...
- 【Redis Lua 脚本 可重入分布式锁】
文章目录 前言 一.最简单的版本:setnx key value 获取锁成功 获取锁失败 释放锁 缺点 二.升级版本:set key value [ex seconds] [nx] 获取锁成功 获取锁 ...
- 老大吩咐的可重入分布式锁,终于完美的实现了~
重做永远比改造简单 最近在做一个项目,将一个其他公司的实现系统(下文称作旧系统),完整的整合到自己公司的系统(下文称作新系统)中,这其中需要将对方实现的功能完整在自己系统也实现一遍. 旧系统还有一批存 ...
- 精尽 jasypt-spring-boot 源码分析 3.0.4
1 依赖工具 Maven Git JDK IntelliJ IDEA 2 源码拉取 git clone git://github.com/ulisesbocchio/jasypt-spring-boo ...
- Redisson 源码初探 (六)公平锁
因为Redisson 默认是非公平锁,client 端互相一起争抢,现在我们继续研究公平锁,为什么要研究?研究分布式锁 不仅仅要研究最基础的锁对吧,我们要把一系列的非公平锁 公平锁 读写锁 RedLo ...
- mybatis一个方法执行多条sql_精尽MyBatis源码分析——SQL执行过程之Executor!
MyBatis的SQL执行过程 在前面一系列的文档中,我已经分析了 MyBatis 的基础支持层以及整个的初始化过程,此时 MyBatis 已经处于就绪状态了,等待使用者发号施令了 那么接下来我们来看 ...
- AtomicInteger源码分析——基于CAS的乐观锁实现
原文出处: bestStyle 1. 悲观锁与乐观锁 我们都知道,cpu是时分复用的,也就是把cpu的时间片,分配给不同的thread/process轮流执行,时间片与时间片之间,需要进行cpu切换, ...
最新文章
- flex图表数据动态更新效果示例
- 软件测试工程师们,今年的年终奖你想拿多少?
- 震惊:菲律宾总统咧嘴冷笑视察惨剧!
- c 富文本html编辑器,富文本HTML编辑器UEditor
- python画画加粗_Matplotlib'粗体'字体 - python
- 让手机重现“一律允许使用这台计算机进行调试”确认窗口
- steam第三方授权登录不稳定(openid4java)
- linux十大实用工具,10大好用的Linux实用工具推荐
- php gridreport,Grid++Report下载-Grid++Report报表工具官方版下载[报表插件]-华军软件园...
- python之父北京尚学堂怎么样_尚学堂百战程序员:Python的元类
- 计算机函数公式相乘,excel表格数据相乘公式-如何在Excel中使用乘法函数公式
- wamp中php无法启动,wamp无法正常启动
- 中科大自主招生2018笔试数学之三
- C# UDP Socket ReceiveFrom 远程主机强迫关闭了一个现有的连接。
- OPENWRT-LUCI开发总结-LUCI添加新页面总结
- 关于H5页面在iPhoneX刘海屏适配(转)
- 广东迅视问泸州第三代社保卡发行 现有二代社保卡仍可正常使用吗?
- python 读取文件到字典读取顺序_python顺序的读取文件夹下名称有序的文件方法...
- Jquery鼠标滚轮放大缩小图片
- AJAX实现页面登录及注册用户名验证
热门文章
- 【Python小技巧】Python操控Chrome浏览器实现网页打开、切换、关闭(送独家Chrome操作打包类源码、Chrome浏览器Cookie在哪里?)
- android videoview开发播放比例16比9,VideoView按原始視頻比例播放
- java 压缩/解压【tar.gz】
- java实现ftp协议_Java语言实现简单FTP软件 FTP协议分析(1)
- 兼容arduino的linux开发板,Arduino的各种常用开发板
- 单链表的创建,插入,删除以及查找
- elasticsearch 学习笔记(查询语句和修改语句)
- 栅栏效应、频谱泄露、细化技术
- 栅栏效应,频谱泄露,旁瓣效应
- 14.4-14.5 NFS的exportfs命令,NFS客户端问题