因为Redisson 默认是非公平锁,client 端互相一起争抢,现在我们继续研究公平锁,为什么要研究?研究分布式锁 不仅仅要研究最基础的锁对吧,我们要把一系列的非公平锁 公平锁 读写锁 RedLock锁,Semaphore CountDownLatch  一系列的研究完,才算真正的研究了分布式锁,对吧

那么公平锁呢,我们知道公平锁需要维持一个有序的获取锁的顺序,可以使用队列也可以使用一些其他的机制,那么我们慢慢来看Redisson的公平锁是如何实现的?

先找到入口

 public static void main(String[] args) throws Exception {//构建一个配置信息对象Config config = new Config();config.useClusterServers()//定时扫描连接信息 默认1000ms.setScanInterval(2000).addNodeAddress("redis://127.0.0.1:7001");//因为Redisson 是基于redis封装的一套便于复杂操作的框架//所以这里构建对象肯定是创建一些与redis的连接RedissonClient redisson = Redisson.create(config);//这里是重点 获取锁,这也是重点分析的地方//这里获取公平锁RLock lock = redisson.getFairLock("lock");//尝试获取锁,这里其实和之前的非公平锁 基本一样//那么公平锁最主要的逻辑在于有序 应该会有一个队列或者其他机制来维持有序//那么我们知道了类似点,同时我们去看看有哪些地方不一样lock.lock();//释放锁lock.unlock();}

我们可以看到RedissonFairLock 其实是我们RedissonLock 的一个子类,前面的逻辑都是一样的

@Overridepublic void lock() {try {lockInterruptibly();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
@Overridepublic void lockInterruptibly() throws InterruptedException {lockInterruptibly(-1, null);}
@Overridepublic void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {long threadId = Thread.currentThread().getId();//尝试上锁Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return;}RFuture<RedissonLockEntry> future = subscribe(threadId);commandExecutor.syncSubscription(future);try {while (true) {ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {break;}// waiting for messageif (ttl >= 0) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().acquire();}}} finally {unsubscribe(future, threadId);}
//        get(lockAsync(leaseTime, unit));}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(leaseTime, unit, threadId));}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}//真正的尝试获取锁,执行lua脚本,这里是RedissonFairLock 不同的地方,子类进行了重写RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(Future<Long> future) throws Exception {if (!future.isSuccess()) {return;}Long ttlRemaining = future.getNow();// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}

其实区别主要在执行lua脚本这里,这里RedissonFairLock 重写了父类的 tryLockInnerAsync()方法,那么重写了什么内容?

内容有点多,重点还是看lua脚本 调试源码 if (command == RedisCommands.EVAL_LONG) 才是真正的执行逻辑

KEYS : Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName)

AVGS : internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime

KEYS[1] = getName() 其实就是锁的名称 lockName

KEYS[2] =  thredsQueueName = redisson_lock_queue:{lockName} 基于redis  list 结构实现的一个队列

KEYS[3] = timeoutSetName =  redisson_lock_timeout:{lockName}  基于redis 数据结构实现的一个set集合,有序的集合,可以自动按照每个数据指定一个分数(score)来进行排序

AVGS[1] = internalLockLeaseTime 就是lock 的契约时间,默认是30S

AVGS[2] = getLockName(threadId) 其实就是UUID + ":" + ThreadId

AVGS[3] = currentTime + threadWaitTime,当前时间 + 线程等待时间 默认 5000ms,最后得到一个时间 代表什么意义?

AVGS[4] = currentTime 当前时间

@Override<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);long currentTime = System.currentTimeMillis();if (command == RedisCommands.EVAL_NULL_BOOLEAN) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,// remove stale threads"while true do "+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"+ "if firstThreadId2 == false then "+ "break;"+ "end; "+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"+ "if timeout <= tonumber(ARGV[3]) then "+ "redis.call('zrem', KEYS[3], firstThreadId2); "+ "redis.call('lpop', KEYS[2]); "+ "else "+ "break;"+ "end; "+ "end;"+ "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "+ "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +"redis.call('lpop', KEYS[2]); " +"redis.call('zrem', KEYS[3], ARGV[2]); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return 1;", Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName), internalLockLeaseTime, getLockName(threadId), currentTime);}//如果你去debug 源码,你会知道其实走的是这块逻辑,而不是上面那一块if (command == RedisCommands.EVAL_LONG) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,// remove stale threads//一进来 就是一个死循环"while true do "//KEY[2] 是什么threadsQueueName 就是队列的名称// lindex htreadsQueueName 0 什么意思?就是从队列中拿出第一个元素 + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"//如果不存在 就直接会break掉 直接跳出循环+ "if firstThreadId2 == false then "+ "break;"+ "end; "//从有序set 集合中拿去第一个元素 看是否过期//其实就是去把一些过期的key清除掉+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"+ "if timeout <= tonumber(ARGV[4]) then "//从set 集合中 以及 queue中移除代表客户端的元素+ "redis.call('zrem', KEYS[3], firstThreadId2); "+ "redis.call('lpop', KEYS[2]); "+ "else "+ "break;"+ "end; "+ "end;"//如果没有人加锁 锁key 和 队列都不存在 或者 队列的第一个元素是当前线程标志 满足其中一个条件+ "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "+ "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +//弹出第一个元素"redis.call('lpop', KEYS[2]); " +//从set集合中删除当前线程元素"redis.call('zrem', KEYS[3], ARGV[2]); " +//进行加锁 ,就是一个map数据结构"redis.call('hset', KEYS[1], ARGV[2], 1); " +//设置过期时间"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//如果是当前线程持有锁,那么重入的时候 就会将对应的value + 1,白哦名重入的次数 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +//重写设置过期时间"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//就会进入排队逻辑"local firstThreadId = redis.call('lindex', KEYS[2], 0); " +"local ttl; " + //如果第一个元素不为空 同时当前线程不是第一个线程 就会进入排队 或者更新score"if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " + //根据第一个线程的过期时间 - 当前时间 比如 10:00:25 - 10:00:05 = 20S"ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" + "else "//获取锁剩余存活时间+ "ttl = redis.call('pttl', KEYS[1]);" + "end; " + //timeout = ttl + 当前时间 + 5000ms"local timeout = ttl + tonumber(ARGV[3]);" + //放入zset 有序集合中(过期时间作为分数)这里会存在两种情况对吧
//(1)如果不存在zset这个数据集合中 那么就会返回1,就会同时放入到 队列中
//(2)如果存在了元素,那么这个zadd key score value 那么就是更新分数,返回0,这时就不会再放入到队列中"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +"end; " +"return ttl;", Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName), internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime);}throw new IllegalArgumentException();}

这里我们要注意,我们的公平锁,如果某个线程持有线程过长,可能会导致队列被重写排序,为什么?

我们注意两个关键存储数据的结构 queue 和 zset,

在开始的时候,就会从zset中拿去score 判断是否timeout,如果timeout,对吧,会将队列中的元素 pop 以及 set中的对应的元素移除,但是这并不代表对应的线程不进行争夺锁了哦,

那些对应的线程会根据激活时间重写争夺锁,这个时候就会重写将自己放入到queue中 以及 zset中,因为GC 网络或者其他原因  有可能会导致延迟,从而导致 被顺序被重排,所有严格意义上来说,公平锁,并不是完全公平 ,为什么会这设计?

可以剔除一些不在尝试获取锁的线程,不管是网络出问题,还是宕机,或者使用tryAcquire 方法,就是指定了一个尝试获取锁的时长,没有获取到就直接判定获取失败,不会一直等待,

使用while true 可以让线程去将一些不必要一直站在茅坑不拉屎的元素剔除掉,

最后我们来研究一下释放锁 主要就是看下unlock机制,其实这块代码和之前的逻辑差不了多少,主要完成了

(1)删除一些占着茅坑不拉屎的元素,就是timeout的元素

(2)释放锁,如果是重入锁,那么就将对应的值 -1

(3)向channel,也就是redis中的发布/订阅功能,对应的channle 保存的client发送通知消息,其实就是通知对应的线程去开始尝试获取锁

 @Overrideprotected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,// remove stale threads"while true do "+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"+ "if firstThreadId2 == false then "+ "break;"+ "end; "+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"//剔除掉一些timeout 的元素+ "if timeout <= tonumber(ARGV[4]) then "+ "redis.call('zrem', KEYS[3], firstThreadId2); "+ "redis.call('lpop', KEYS[2]); "+ "else "+ "break;"+ "end; "+ "end;"//如果不存在 直接往channel 中投递一个消息,通知其他线程去获取锁+ "if (redis.call('exists', KEYS[1]) == 0) then " + "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " +"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +"end; " +"return 1; " +"end;" +//如果不是当前线程获取的锁,直接返回"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +//当前线程持有锁,将对应的value 值 -1"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +//如果 -1 后还大于0 说明是重入了几次锁"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"end; " +//否则就是删除key 释放锁 同时向channel中投递一个消息去 通知其他线程获取锁"redis.call('del', KEYS[1]); " +"local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " +"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +"end; " +"return 1; ",Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName, getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());}

到这基本上就差不多了

Redisson 源码初探 (六)公平锁相关推荐

  1. 精尽 Redisson 源码分析 —— 可靠分布式锁 RedLock

    1. 概述 我们来看一个 Redis 主从结构下的示例,Redis 分布式锁是如何失效的: 1.客户端 A 从 Redis Master 获得到锁 anylock . 2.在 Redis Master ...

  2. 精尽 Redisson 源码分析 —— 可重入分布式锁 ReentrantLock

    1. 概述 在 Redisson 中,提供了 8 种分布锁的实现,具体我们可以在 <Redisson 文档 -- 分布式锁和同步器> 中看到.绝大数情况下,我们使用可重入锁(Reentra ...

  3. redis watchdog_Redis分布式事务框架Redisson源码解析(一)

    代码片段一. public static void main(String[] args) throws Exception { Config config = new Config(); confi ...

  4. 智能锁方案PCBA原理图PCB单片机开发板源码wifi远程开锁。 提供全套技术资料,包括原理图,PCB图,程序源码,bom清单,说明书等

    智能锁方案PCBA原理图PCB单片机开发板源码wifi远程开锁. 提供全套技术资料,包括原理图,PCB图,程序源码,bom清单,说明书等. 该指纹密码锁具有如下功能: 一.指纹开门 二.触摸密码开门 ...

  5. 精尽 Redisson 源码分析 —— 限流器 RateLimiter

    1. 概述 限流,无论在系统层面,还是在业务层面,使用都非常广泛.例如说: [业务]为了避免恶意的灌水机或者用户,限制每分钟至允许回复 10 个帖子. [系统]为了避免服务系统被大规模调用,超过极限, ...

  6. Spring源码-AOP(六)-自动代理与DefaultAdvisorAutoProxyCreator

    2019独角兽企业重金招聘Python工程师标准>>> Spring AOP 源码解析系列,建议大家按顺序阅读,欢迎讨论 Spring源码-AOP(一)-代理模式 Spring源码- ...

  7. mybatis源码阅读(六) ---StatementHandler了解一下

    转载自  mybatis源码阅读(六) ---StatementHandler了解一下 StatementHandler类结构图与接口设计 BaseStatementHandler:一个抽象类,只是实 ...

  8. Spring-bean的循环依赖以及解决方式___Spring源码初探--Bean的初始化-循环依赖的解决

    本文主要是分析Spring bean的循环依赖,以及Spring的解决方式. 通过这种解决方式,我们可以应用在我们实际开发项目中. 什么是循环依赖? 怎么检测循环依赖 Spring怎么解决循环依赖 S ...

  9. Celery 源码解析六:Events 的实现

    序列文章: Celery 源码解析一:Worker 启动流程概述 Celery 源码解析二:Worker 的执行引擎 Celery 源码解析三: Task 对象的实现 Celery 源码解析四: 定时 ...

最新文章

  1. HTML在表格右边增加一个表格,如何在表格右侧增加一列
  2. 英特尔10纳米处理器再度跳票,或收缩芯片代工业务
  3. AWK神器,继续案例
  4. 理解 JavaScript 作用域和作用域链
  5. 【UGV】32版UGV原理图
  6. DL之DeepLabv2:DeepLab v2算法的简介(论文介绍)、架构详解、案例应用等配图集合之详细攻略
  7. web.xml 配置文件 超详细说明!!!
  8. maven的eclipse找不到本地仓库的的jar包
  9. easyui treenode java_easyui tree的简单使用
  10. python 私有云_构建私有云伴随着哪些需求?
  11. 安全生产应急救援信息管理指挥调度系统
  12. 教你如何将 Excel 中的数据按模板批量生成 Word、Excel、PPT、PDF 以及 Txt 类型的文本文件
  13. android开发者模式自动打开位置touch信息
  14. 介绍一款rar文件密码破解利器——RAR Password Unlocker
  15. Java 在Word中创建多级项目符号列表和编号列表
  16. 企业微信社群运营该怎么做?
  17. 独轮平衡车c语言源码,两轮平衡车STM32源代码
  18. 笔记本控制台开启热点
  19. 2022-06-06 FUSE用户态文件系统
  20. 电脑死机,虚拟机里面的系统开不了

热门文章

  1. linux中sftp默认登录的端口号是多少? sftp通过指定的端口号连接?sftp默认端口号
  2. java option请求_如何在Spring Boot中处理HTTP OPTIONS请求?
  3. 直播间赠送礼物动效、选礼物列表、赠送数量,礼物连发排队处理,Vue项目
  4. PHP实现微信小程序授权登录
  5. python中的pylab_python-什么是%pylab?
  6. 安卓10平台DNS两层缓存
  7. 我国2030年可能首次载人登月
  8. python *args和**kwargs详解
  9. 支付宝9亿红包分钱了,你拿了多少钱?有人领到18888元吗?
  10. 百人项目组unity2d游戏手机端发烫帧率低优化从10帧优化到60帧