redis 分布式锁整理
上面这种写法是有问题的
redis设置分布式锁 同时设置超时时间 不能分开写 原子操作才行 ,否则 挂掉会有问题
就是说第一行设置完分布式锁的key,第二行设置超时时间的,那么如果在第一行和第二行之间服务器挂掉了就会有问题。
下图这种设置超时时间也是有问题的:
因为是固定的十秒钟,那么存在这种情况:
1.线程一运行了15秒钟,但是锁设置的是十秒钟,那么这时候线程二在线程一运行代码的时候同时加锁运行了,然后线程一在线程二运行五秒钟的时候(也就是线程一的第15秒把锁删除了,这时候线程三就可以直接加锁了)
解决方法:加锁添加当前线程id和客户端id 优化如下:
但是这个代码还有个问题:
原子性问题:
线程一9.9秒(无限接近于10秒钟的时候)执行到上图finally的if 执行完,还没有删除锁,但是if执行完了是true ,这时候正好到了十秒钟锁失效线程二直接加锁成功,那么这时候线程一删除锁的话是删除了线程二的锁。
这种要使用锁续命来解决,也就是下面要说的redisson
redission加锁核心:
具体实现:
RedissonLock的lock()方法——>lockInterruptibly()->tryAcquire()->tryAcquireAsync(leaseTime, unit, threadId)->tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}
上面第一段if的脚本就是先判断锁存不存在,不存在就用hash结构存放并且设置过期时间,
第二个if就是锁存在那么支持可重入,锁增加相应的重入值(hincrby)。
上面连个if都不满足就说明不是当前线程持有锁,就返回当前锁key 的剩余过期时间。
接下来看异步回调的部分:
private void scheduleExpirationRenewal(final long threadId) {if (expirationRenewalMap.containsKey(getEntryName())) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {expirationRenewalMap.remove(getEntryName());if (!future.isSuccess()) {log.error("Can't update lock " + getName() + " expiration", future.cause());return;}if (future.getNow()) {// reschedule itselfscheduleExpirationRenewal(threadId);}}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {task.cancel();}}
ttlRemaining是null就是上面lua脚本两个if的情况,加锁成功了,那么调用scheduleExpirationRenewal(threadId);方法给锁按照超时时间的三分之一延长续命,就是比如锁是30秒超时,那么没=每十秒钟调用续命。 这里scheduleExpirationRenewal方法是嵌套调用的。
上面脚本返回0就结束看门狗线程的续命了。也就是不嵌套调用了。
接下来看下RedissonLock的tryLock(long waitTime, long leaseTime, TimeUnit unit)方法
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();final long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= (System.currentTimeMillis() - current);if (time <= 0) {acquireFailed(threadId);return false;}current = System.currentTimeMillis();final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {@Overridepublic void operationComplete(Future<RedissonLockEntry> future) throws Exception {if (subscribeFuture.isSuccess()) {unsubscribe(subscribeFuture, threadId);}}});}acquireFailed(threadId);return false;}try {time -= (System.currentTimeMillis() - current);if (time <= 0) {acquireFailed(threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= (System.currentTimeMillis() - currentTime);if (time <= 0) {acquireFailed(threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= (System.currentTimeMillis() - currentTime);if (time <= 0) {acquireFailed(threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);}
// return get(tryLockAsync(waitTime, leaseTime, unit));}
由前面分析的tryAcquire方法可见,lua脚本返回null说明加锁成功,否则返回锁的剩余时间,
接着final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);使用recis的发布订阅模式,订阅这个锁的channel队列,
然后进入while(true)的循环再次加锁,返回null就和上面说的一样加锁成功,否则
信号量 加锁 : 获取许可 ,等待锁的超时时间结束再获取锁 (ttl)对性能要好,否则一直while(true)的循环性能就太差了
if (ttl >= 0 && ttl < time) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}
这里的getlatch是aqs的信号量
public Semaphore getLatch() {return latch;}
接下来看解锁方法:
@Overridepublic void unlock() {Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));if (opStatus == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + Thread.currentThread().getId());}if (opStatus) {cancelExpirationRenewal();}// Future<Void> future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);}
照样是个lua脚本
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}
重点是counter开始的逻辑,local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); -1就是可重入数量减1。如果count大于0及说明是重入锁,只有count等于0才是说明解锁成功,那么发布通知那些订阅这个锁的线程,并且删除这个key。
发布订阅在LockPubSub这个类里面:
回调的方法:
protected void onMessage(RedissonLockEntry value, Long message) {if (message.equals(unlockMessage)) {value.getLatch().release();while (true) {Runnable runnableToExecute = null;synchronized (value) {Runnable runnable = value.getListeners().poll();if (runnable != null) {if (value.getLatch().tryAcquire()) {runnableToExecute = runnable;} else {value.addListener(runnable);}}}if (runnableToExecute != null) {runnableToExecute.run();} else {return;}}}}
这里就有尝试加锁的逻辑tryAcquire
可重入分布式锁:
这里重点还是lua脚本有区别
RedissonReadLock:
@Override<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'read'); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('set', KEYS[2] .. ':1', 1); " +"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local key = KEYS[2] .. ':' .. ind;" +"redis.call('set', key, 1); " +"redis.call('pexpire', key, ARGV[1]); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end;" +"return redis.call('pttl', KEYS[1]);",Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));}
这里用hash的mode字段,read表示读锁,write表示写锁。
第一个if表示加了读锁,设置过期时间
第二个if就是可重入锁的逻辑,不管之前是读还是写都将其 +1
同样解锁也有publish发布通知订阅这个key的线程这些操作,可重入count数量减少这些操作
写锁:
@Override<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"local mode = redis.call('hget', KEYS[1], 'mode'); " +"if (mode == false) then " +"redis.call('hset', KEYS[1], 'mode', 'write'); " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (mode == 'write') then " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local currentExpire = redis.call('pttl', KEYS[1]); " +"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +"return nil; " +"end; " +"end;" +"return redis.call('pttl', KEYS[1]);",Arrays.<Object>asList(getName()), internalLockLeaseTime, getLockName(threadId));}
同样的 第一个if是加锁逻辑,第二个if是重入逻辑。
RedLock:
保证高可用 都拖一个从节点,免得挂两个出现永远加锁不成功 如下图所示:
底层每个节点都去setnx ,至少半数以上节点返回成功才认为成功
红锁不推荐用: 针对同一个key的锁而言
客户端1 redis1加锁成功,redis2加锁成功,然后redis2返回成功后挂掉了
客户端2 来redis3加锁成功 ,新的redis2从节点顶上来加锁成功 ,那么客户端1和客户端2都加锁成功了,就有问题了
redis的持久化 假设1s一次 ,那么redis2 也就是节点2 就这一秒钟 加锁的时候key 挂掉了或者重启了 ,那就坑了,这个key就丢掉了,然后别的客户端再来加锁就出现上面的问题
因为 redis 就是 ap 保证高性能,所以这样的话要不就用zk算了
说白了分布式锁就是把并行的请求串行化了
又要分布式锁,又要高并发,那就得根据各种场景优化了。
锁的粒度 越小越好
redis里面多分几个key 初始化时候拆成十个key , 写热点分散
这样就性能提升了十倍,减完key的段位就加标记
就和cmap底层差不多
分布式锁可以解决双写不一致问题
大部分 都是 冷门数据 ,九成 热点商品大部分请求 下面 的第一个if直接返回,很少的请求 完整走一次后端 , 大部分代码都是解决小部分问题, 下面if红线的小部分代码解决九成情况。
比如 并发重建线程有几万这在下图箭头这边
预估知道逻辑 1秒能执行完 ,就直接 串行转并发 设置1秒钟的trylock 然后直接读取下面的缓存 99.99场景 0.001情况出bug 所以用不用需要结合具体的业务场景
说白了架构就是结合具体业务场景的取舍
热点缓存监控系统:
所有 web应用 监听 热点缓存(去实时计算热点 对 redis 操作做aop拦截 然后往热点缓存系统里面发送请求 分布式大规模数据实时计算来维护这个热点缓存的)
热点缓存通知 web应用去更新
不可能说是redis的key修改,web应用去通知别的web应用更新,这样成本太大了。
redis 分布式锁整理相关推荐
- redis分布式锁 在集群模式下如何实现_收藏慢慢看系列:简洁实用的Redis分布式锁用法...
在微服务中很多情况下需要使用到分布式锁功能,而目前比较常见的方案是通过Redis来实现分布式锁,网上关于分布式锁的实现方式有很多,早期主要是基于Redisson等客户端,但在Spring Boot2. ...
- 快来学习Redis 分布式锁的背后原理
以前在学校做小项目的时候,用到Redis,基本也只是用来当作缓存.可阿粉在工作中发现,Redis在生产中并不只是当作缓存这么简单.在阿粉接触到的项目中,Redis起到了一个分布式锁的作用,具体情况是这 ...
- Redis分布式锁使用不当,酿成一个重大事故,超卖了100瓶飞天茅台!!!
点击关注公众号,Java干货及时送达 来源:juejin.cn/post/6854573212831842311 基于Redis使用分布式锁在当今已经不是什么新鲜事了. 本篇文章主要是基于我们实际项目 ...
- Redis 分布式锁使用不当,酿成一个重大事故,超卖了100瓶飞天茅台!!!
点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 基于Redis使用分布式锁在当今已经不是什么新鲜事了. 本 ...
- 秒杀商品超卖事故:Redis分布式锁请慎用!
点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者:浪漫先生 来源:juejin.im/post/6854573 ...
- 记一次由Redis分布式锁造成的重大事故,避免以后踩坑!
点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者:浪漫先生 juejin.im/post/5f159cd8f2 ...
- 深度剖析:Redis分布式锁到底安全吗?看完这篇文章彻底懂了!
阅读本文大约需要 20 分钟. 大家好,我是 Kaito. 这篇文章我想和你聊一聊,关于 Redis 分布式锁的「安全性」问题. Redis 分布式锁的话题,很多文章已经写烂了 ...
- Redisson实现Redis分布式锁的N种姿势
点击蓝色"程序猿DD"关注我哟 来源:阿飞的博客 前几天发的一篇文章<Redlock:Redis分布式锁最牛逼的实现>,引起了一些同学的讨论,也有一些同学提出了一些疑问 ...
- Redlock:Redis分布式锁的实现
来源:阿飞的博客 普通实现 说道Redis分布式锁大部分人都会想到:setnx+lua,或者知道set key value px milliseconds nx.后一种方式的核心实现命令如下: - 获 ...
最新文章
- celery源码分析:multi命令分析
- Word中大括号内公式如何左对齐
- oracle 12519,TNS-12519 与 processes 参数设置
- vuex第三弹vuex之actions(前端网备份)
- 3月任务--target
- matlab的傅里叶变换
- 移动路线(信息学奥赛一本通-T1194)
- android 自定义 打包文件类型,Android Studio配置打包生成自定义文件名
- 拼多多:永远不会对孵化品牌“二选一” 扶持千家工厂触达4.4亿消费者
- java getreturntype_java.lang.reflect.Method.getGenericReturnType()方法示例
- redis缓存和mysql数据库同步
- u8 附件上传后存放路径_用友U8生产不良退料案例教程
- 俄罗斯方块c语言教程codeblocks,C语言俄罗斯方块修改结尾
- 【65】如何通过sys文件系统remove和probe一个PCI设备
- 电脑外放没声音但插入耳机有声音怎么回事
- sudo 授权需谨慎,否则亲人两行泪!6 个超实用使用技巧
- Secondary NameNode:究竟是什么?
- HTML实战宝典PDF,《selenium webdriver实战宝典》记
- edge怎么开启沉浸式阅读_美人鼓上舞!端午小长假,豫园将开启史上首次沉浸式国风庙会...
- 家用服务器 无线路由器,评测六款热门家用Wi-Fi 6路由器