上面这种写法是有问题的

redis设置分布式锁 同时设置超时时间 不能分开写  原子操作才行 ,否则 挂掉会有问题

就是说第一行设置完分布式锁的key,第二行设置超时时间的,那么如果在第一行和第二行之间服务器挂掉了就会有问题。

下图这种设置超时时间也是有问题的:

因为是固定的十秒钟,那么存在这种情况:

1.线程一运行了15秒钟,但是锁设置的是十秒钟,那么这时候线程二在线程一运行代码的时候同时加锁运行了,然后线程一在线程二运行五秒钟的时候(也就是线程一的第15秒把锁删除了,这时候线程三就可以直接加锁了)

解决方法:加锁添加当前线程id和客户端id 优化如下:

但是这个代码还有个问题:

原子性问题:

线程一9.9秒(无限接近于10秒钟的时候)执行到上图finally的if 执行完,还没有删除锁,但是if执行完了是true ,这时候正好到了十秒钟锁失效线程二直接加锁成功,那么这时候线程一删除锁的话是删除了线程二的锁。

这种要使用锁续命来解决,也就是下面要说的redisson

redission加锁核心: 

具体实现:

RedissonLock的lock()方法——>lockInterruptibly()->tryAcquire()->tryAcquireAsync(leaseTime, unit, threadId)->tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}

上面第一段if的脚本就是先判断锁存不存在,不存在就用hash结构存放并且设置过期时间,

第二个if就是锁存在那么支持可重入,锁增加相应的重入值(hincrby)。

上面连个if都不满足就说明不是当前线程持有锁,就返回当前锁key 的剩余过期时间。

接下来看异步回调的部分:

 private void scheduleExpirationRenewal(final long threadId) {if (expirationRenewalMap.containsKey(getEntryName())) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {expirationRenewalMap.remove(getEntryName());if (!future.isSuccess()) {log.error("Can't update lock " + getName() + " expiration", future.cause());return;}if (future.getNow()) {// reschedule itselfscheduleExpirationRenewal(threadId);}}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {task.cancel();}}
ttlRemaining是null就是上面lua脚本两个if的情况,加锁成功了,那么调用scheduleExpirationRenewal(threadId);方法给锁按照超时时间的三分之一延长续命,就是比如锁是30秒超时,那么没=每十秒钟调用续命。 这里scheduleExpirationRenewal方法是嵌套调用的。

上面脚本返回0就结束看门狗线程的续命了。也就是不嵌套调用了。

接下来看下RedissonLock的tryLock(long waitTime, long leaseTime, TimeUnit unit)方法

 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();final long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= (System.currentTimeMillis() - current);if (time <= 0) {acquireFailed(threadId);return false;}current = System.currentTimeMillis();final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {@Overridepublic void operationComplete(Future<RedissonLockEntry> future) throws Exception {if (subscribeFuture.isSuccess()) {unsubscribe(subscribeFuture, threadId);}}});}acquireFailed(threadId);return false;}try {time -= (System.currentTimeMillis() - current);if (time <= 0) {acquireFailed(threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= (System.currentTimeMillis() - currentTime);if (time <= 0) {acquireFailed(threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= (System.currentTimeMillis() - currentTime);if (time <= 0) {acquireFailed(threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);}
//        return get(tryLockAsync(waitTime, leaseTime, unit));}

由前面分析的tryAcquire方法可见,lua脚本返回null说明加锁成功,否则返回锁的剩余时间,

接着final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);使用recis的发布订阅模式,订阅这个锁的channel队列,

然后进入while(true)的循环再次加锁,返回null就和上面说的一样加锁成功,否则

信号量 加锁 :  获取许可 ,等待锁的超时时间结束再获取锁 (ttl)对性能要好,否则一直while(true)的循环性能就太差了

  if (ttl >= 0 && ttl < time) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}

这里的getlatch是aqs的信号量

    public Semaphore getLatch() {return latch;}

接下来看解锁方法:

 @Overridepublic void unlock() {Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));if (opStatus == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + Thread.currentThread().getId());}if (opStatus) {cancelExpirationRenewal();}//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);}

照样是个lua脚本

  protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}

重点是counter开始的逻辑,local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); -1就是可重入数量减1。如果count大于0及说明是重入锁,只有count等于0才是说明解锁成功,那么发布通知那些订阅这个锁的线程,并且删除这个key。

发布订阅在LockPubSub这个类里面:

回调的方法:

  protected void onMessage(RedissonLockEntry value, Long message) {if (message.equals(unlockMessage)) {value.getLatch().release();while (true) {Runnable runnableToExecute = null;synchronized (value) {Runnable runnable = value.getListeners().poll();if (runnable != null) {if (value.getLatch().tryAcquire()) {runnableToExecute = runnable;} else {value.addListener(runnable);}}}if (runnableToExecute != null) {runnableToExecute.run();} else {return;}}}}

这里就有尝试加锁的逻辑tryAcquire

可重入分布式锁:

这里重点还是lua脚本有区别

RedissonReadLock:
@Override<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'read'); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('set', KEYS[2] .. ':1', 1); " +"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local key = KEYS[2] .. ':' .. ind;" +"redis.call('set', key, 1); " +"redis.call('pexpire', key, ARGV[1]); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end;" +"return redis.call('pttl', KEYS[1]);",Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));}

这里用hash的mode字段,read表示读锁,write表示写锁。

第一个if表示加了读锁,设置过期时间

第二个if就是可重入锁的逻辑,不管之前是读还是写都将其 +1

同样解锁也有publish发布通知订阅这个key的线程这些操作,可重入count数量减少这些操作

写锁:

  @Override<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'write'); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (mode == 'write') then " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local currentExpire = redis.call('pttl', KEYS[1]); " +"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +"return nil; " +"end; " +"end;" +"return redis.call('pttl', KEYS[1]);",Arrays.<Object>asList(getName()), internalLockLeaseTime, getLockName(threadId));}

同样的 第一个if是加锁逻辑,第二个if是重入逻辑。

RedLock:

保证高可用 都拖一个从节点,免得挂两个出现永远加锁不成功 如下图所示:

底层每个节点都去setnx  ,至少半数以上节点返回成功才认为成功

红锁不推荐用: 针对同一个key的锁而言

客户端1 redis1加锁成功,redis2加锁成功,然后redis2返回成功后挂掉了

客户端2 来redis3加锁成功 ,新的redis2从节点顶上来加锁成功 ,那么客户端1和客户端2都加锁成功了,就有问题了

redis的持久化 假设1s一次 ,那么redis2 也就是节点2 就这一秒钟 加锁的时候key 挂掉了或者重启了  ,那就坑了,这个key就丢掉了,然后别的客户端再来加锁就出现上面的问题

因为 redis 就是 ap 保证高性能,所以这样的话要不就用zk算了

说白了分布式锁就是把并行的请求串行化了

又要分布式锁,又要高并发,那就得根据各种场景优化了。

锁的粒度 越小越好

redis里面多分几个key    初始化时候拆成十个key , 写热点分散

这样就性能提升了十倍,减完key的段位就加标记

就和cmap底层差不多

分布式锁可以解决双写不一致问题

大部分 都是 冷门数据 ,九成  热点商品大部分请求  下面 的第一个if直接返回,很少的请求 完整走一次后端 , 大部分代码都是解决小部分问题,  下面if红线的小部分代码解决九成情况。

比如 并发重建线程有几万这在下图箭头这边

预估知道逻辑 1秒能执行完 ,就直接  串行转并发  设置1秒钟的trylock 然后直接读取下面的缓存   99.99场景     0.001情况出bug  所以用不用需要结合具体的业务场景

说白了架构就是结合具体业务场景的取舍

热点缓存监控系统:

所有 web应用 监听 热点缓存(去实时计算热点   对 redis 操作做aop拦截 然后往热点缓存系统里面发送请求  分布式大规模数据实时计算来维护这个热点缓存的)

热点缓存通知 web应用去更新

不可能说是redis的key修改,web应用去通知别的web应用更新,这样成本太大了。

redis 分布式锁整理相关推荐

  1. redis分布式锁 在集群模式下如何实现_收藏慢慢看系列:简洁实用的Redis分布式锁用法...

    在微服务中很多情况下需要使用到分布式锁功能,而目前比较常见的方案是通过Redis来实现分布式锁,网上关于分布式锁的实现方式有很多,早期主要是基于Redisson等客户端,但在Spring Boot2. ...

  2. 快来学习Redis 分布式锁的背后原理

    以前在学校做小项目的时候,用到Redis,基本也只是用来当作缓存.可阿粉在工作中发现,Redis在生产中并不只是当作缓存这么简单.在阿粉接触到的项目中,Redis起到了一个分布式锁的作用,具体情况是这 ...

  3. Redis分布式锁使用不当,酿成一个重大事故,超卖了100瓶飞天茅台!!!

    点击关注公众号,Java干货及时送达 来源:juejin.cn/post/6854573212831842311 基于Redis使用分布式锁在当今已经不是什么新鲜事了. 本篇文章主要是基于我们实际项目 ...

  4. Redis 分布式锁使用不当,酿成一个重大事故,超卖了100瓶飞天茅台!!!

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 基于Redis使用分布式锁在当今已经不是什么新鲜事了. 本 ...

  5. 秒杀商品超卖事故:Redis分布式锁请慎用!

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者:浪漫先生 来源:juejin.im/post/6854573 ...

  6. 记一次由Redis分布式锁造成的重大事故,避免以后踩坑!

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者:浪漫先生 juejin.im/post/5f159cd8f2 ...

  7. 深度剖析:Redis分布式锁到底安全吗?看完这篇文章彻底懂了!

    ‍‍‍‍‍‍‍‍‍‍‍‍阅读本文大约需要 20 分钟. 大家好,我是 Kaito. 这篇文章我想和你聊一聊,关于 Redis 分布式锁的「安全性」问题. Redis 分布式锁的话题,很多文章已经写烂了 ...

  8. Redisson实现Redis分布式锁的N种姿势

    点击蓝色"程序猿DD"关注我哟 来源:阿飞的博客 前几天发的一篇文章<Redlock:Redis分布式锁最牛逼的实现>,引起了一些同学的讨论,也有一些同学提出了一些疑问 ...

  9. Redlock:Redis分布式锁的实现

    来源:阿飞的博客 普通实现 说道Redis分布式锁大部分人都会想到:setnx+lua,或者知道set key value px milliseconds nx.后一种方式的核心实现命令如下: - 获 ...

最新文章

  1. celery源码分析:multi命令分析
  2. Word中大括号内公式如何左对齐
  3. oracle 12519,TNS-12519 与 processes 参数设置
  4. vuex第三弹vuex之actions(前端网备份)
  5. 3月任务--target
  6. matlab的傅里叶变换
  7. 移动路线(信息学奥赛一本通-T1194)
  8. android 自定义 打包文件类型,Android Studio配置打包生成自定义文件名
  9. 拼多多:永远不会对孵化品牌“二选一” 扶持千家工厂触达4.4亿消费者
  10. java getreturntype_java.lang.reflect.Method.getGenericReturnType()方法示例
  11. redis缓存和mysql数据库同步
  12. u8 附件上传后存放路径_用友U8生产不良退料案例教程
  13. 俄罗斯方块c语言教程codeblocks,C语言俄罗斯方块修改结尾
  14. 【65】如何通过sys文件系统remove和probe一个PCI设备
  15. 电脑外放没声音但插入耳机有声音怎么回事
  16. sudo 授权需谨慎,否则亲人两行泪!6 个超实用使用技巧
  17. Secondary NameNode:究竟是什么?
  18. HTML实战宝典PDF,《selenium webdriver实战宝典》记
  19. edge怎么开启沉浸式阅读_美人鼓上舞!端午小长假,豫园将开启史上首次沉浸式国风庙会...
  20. 家用服务器 无线路由器,评测六款热门家用Wi-Fi 6路由器

热门文章

  1. 查询自己名下所有的卡
  2. 超薄石材与珠宝的完美结合:上海珠宝店门头改造设计巧思
  3. python控制浏览器
  4. 321分排第二?985中山大学计算机考研超级爆冷?
  5. 2021-03-23 python数据处理系统学习(二)控制语句
  6. 利用网络数据预测企业失信行为
  7. 对图片进行膨胀与腐蚀
  8. 提前准备!多所院校公布23考研复试公告!
  9. 四种常用的自动化测试框架
  10. 移动端百度地图多点标注php,百度地图API多重打点标注