1 前言

在程序中,我们想要保证一个变量的可见性及原子性,我们可以用volatile(对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性)、synchronized、乐观锁、悲观锁等等来控制。单体应用内可以这样做,而现在随着时代的发展,大多项目都已经告别的单机时代,拥抱微服务时代,这样的情况下很多服务需要做集群,一个应用需要部署到几台机器上然后做负载均衡,在并发情况下使用上面说的机制来保证变量的可见性及原子性就不可行了(如下图),从而产生了很多分布式机制(如分布式事务、分布式锁等),主要的作用还是用来保证数据的一致性:


如上图,假设变量a是剩余库存,值为1,这时候三个用户进来下单,正好三个请求被分到了三个不同的服务节点上面,三个节点 检查剩余库存,发现还有1个,然后都去进行扣减,这样就导致库存为负数,有两个用户没有货发,就是俗称的超卖。这种情况是不被接受的,用户会和业务撕逼、业务会和你领导吵架,然后你就收拾书包回家了!

在这种场景中,我们就需要一种方法解决这个问题,这就是分布式锁要解决的问题

2 分布式锁的实现与特性

2.1 分布式锁的实现

本地锁可以通过语言本身支持,要实现分布式锁,就必须依赖中间件,数据库、redis、zookeeper等,主要有以下几种实现方式:
1)Memcached:利用 Memcached 的 add 命令。此命令是原子性操作,只有在 key 不存在的情况下,才能 add 成功,也就意味着线程得到了锁。
2)Redis:和 Memcached 的方式类似,利用 Redis 的 setnx 命令。此命令同样是原子性操作,只有在 key 不存在的情况下,才能 set 成功。
3)Zookeeper:利用 Zookeeper 的顺序临时节点,来实现分布式锁和等待队列。Zookeeper 设计的初衷,就是为了实现分布式锁服务的。
4)Chubby:Google 公司实现的粗粒度分布式锁服务,底层利用了 Paxos 一致性算法。

2.2 分布式锁的特性

1)在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行。
2)高可用的获取锁与释放锁。
3)高性能的获取锁与释放锁。
4)具备可重入特性。
5)具备锁失效机制,防止死锁。
6)具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。

3 Redisson实现Redis分布式锁以及实现原理

3.1 添加依赖

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.12.4</version>
</dependency>

3.2 测试查看

库存数量100,调用一次减1,小于等于0的时候返回false,表示下单失败。

@Component
public class RedissonLock {private static Integer inventory = 100;/*** 测试** @return true:下单成功 false:下单失败*/public Boolean redisLockTest(){// 获取锁实例RLock inventoryLock = RedissonService.getRLock("inventory-number");try {// 加锁inventoryLock.lock();if (inventory <= 0){return false;}inventory--;System.out.println("线程名称:" + Thread.currentThread().getName() + "剩余数量:" + RedissonLock.inventory);}catch (Exception e){e.printStackTrace();}finally {// 释放锁inventoryLock.unlock();}return true;}
}

用jmeter进行压测:

线程组100执行20秒:

响应断言true为正确,false为失败:

结果:

3.3 获取锁的实例

RLock inventoryLock = RedissonService.getRLock("inventory-number");这段就是获取锁的实例,inventory-number为指定锁名称,进去getLock(String name)方法之后就能看到获取锁的实例就是在RedissonLock构造方法中,初始化一些属性。

public RLock getLock(String name) {return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}

看下RedissonLock的构造函数:

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);//命令执行器this.commandExecutor = commandExecutor;//UUID字符串(MasterSlaveConnectionManager类的构造函数 传入UUID)this.id = commandExecutor.getConnectionManager().getId();//内部锁过期时间(防止死锁,默认时间为30s)this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();//uuid+传进来的锁名称this.entryName = this.id + ":" + name;//redis消息体this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}

内部锁过期时间:

3.4 加锁

inventoryLock.lock();这段代码表示加锁,一步一步进去源码里面看看,进来首先看到如下lock()方法:

public void lock() {try {this.lock(-1L, (TimeUnit)null, false);} catch (InterruptedException var2) {throw new IllegalStateException();}}

可以看到这里设置了一些默认值,然后继续调用了带参lock()方法,也是在这里,完成了加锁的逻辑,源码如下:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {// 线程IDlong threadId = Thread.currentThread().getId();// 尝试获取锁Long ttl = this.tryAcquire(leaseTime, unit, threadId);// 如果过期时间等于null,则表示获取到锁,直接返回,不等于null继续往下执行if (ttl != null) {// 如果获取锁失败,则订阅到对应这个锁的channelRFuture<RedissonLockEntry> future = this.subscribe(threadId);if (interruptibly) {// 可中断订阅this.commandExecutor.syncSubscriptionInterrupted(future);} else {// 不可中断订阅this.commandExecutor.syncSubscription(future);}try {// 不断循环while(true) {// 再次尝试获取锁ttl = this.tryAcquire(leaseTime, unit, threadId);// ttl(过期时间)为空,说明成功获取锁,返回if (ttl == null) {return;}// ttl(过期时间)大于0 则等待ttl时间后继续尝试获取if (ttl >= 0L) {try {((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException var13) {if (interruptibly) {throw var13;}((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else if (interruptibly) {((RedissonLockEntry)future.getNow()).getLatch().acquire();} else {((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();}}} finally {// 取消对channel的订阅this.unsubscribe(future, threadId);}}}

再来看下获取锁的tryAcquire方法:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));}

进去看下tryAcquireAsync方法:

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {// 有设置过期时间if (leaseTime != -1L) {return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 没有设置过期时间RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e == null) {if (ttlRemaining == null) {this.scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}}
  • tryLockInnerAsync方法是真正执行获取锁的逻辑,它是一段LUA脚本代码。在这里,它使用的是hash数据结构。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {this.internalLockLeaseTime = unit.toMillis(leaseTime);return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, // 如果锁不存在,则通过hset设置它的值,并设置过期时间"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; // 如果锁已存在,并且锁的是当前线程,则通过hincrby给数值递增1(这里显示了redis分布式锁的可重入性)if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; // 如果锁已存在,但并非本线程,则返回过期时间ttlreturn redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});}

KEYS[1]代表的是你加锁的那个key,比如说:RLock inventoryLock = RedissonService.getRLock("inventory-number");这里你自己设置了加锁的那个锁key就是"inventory-number"。
ARGV[1]代表的就是锁key的默认生存时间,上面也截图看了,默认时间为30秒。
ARGV[2]代表的是加锁的客户端的ID,类似于后面这样: 8743c9c0-0795-4907-87fd-6c719a6b4586:1

上面这段LUA代码看起来也不是很复杂,其中有三个判断:

通过exists判断锁存不存在,如果锁不存在,则设置值和过期时间,加锁成功。
通过hexists判断,如果锁已存在,并且锁的是当前线程,则证明是重入锁,加锁成功,ARGV[2]的value+1,原来是1,现在变为2,当然,释放的时候也要释放两次。
如果锁已存在,但锁的不是当前线程,则证明有其他线程持有锁。返回当前锁的过期时间,加锁失败


3.5 解锁

inventoryLock.unlock();这段代码表示解锁,跟刚才一样,一步一步进去源码里面看看,进来首先看到如下unlock()方法:

public void unlock() {try {this.get(this.unlockAsync(Thread.currentThread().getId()));} catch (RedisException var2) {if (var2.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException)var2.getCause();} else {throw var2;}}}

进去unlockAsync()查看,这是解锁的方法:

public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise();// 释放锁的方法RFuture<Boolean> future = this.unlockInnerAsync(threadId);// 添加监听器 解锁opStatus:返回值future.onComplete((opStatus, e) -> {this.cancelExpirationRenewal(threadId);if (e != null) {result.tryFailure(e);//如果返回null,则证明解锁的线程和当前锁不是同一个线程,抛出异常} else if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);result.tryFailure(cause);} else {// 解锁成功result.trySuccess((Object)null);}});return result;}

再进去看下释放锁的方法:unlockInnerAsync():

protected RFuture<Boolean> unlockInnerAsync(long threadId) {return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 如果释放锁的线程和已存在锁的线程不是同一个线程,返回null"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; // 如果是同一个线程,就通过hincrby减1的方式,释放一次锁local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);// 若剩余次数大于0 ,则刷新过期时间if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; // 其他就证明锁已经释放,删除key并发布锁释放的消息else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});}

上述代码是释放锁的逻辑。同样的,它也是有三个判断:

如果解锁的线程和当前锁的线程不是同一个,解锁失败,抛出异常。
如果解锁的线程和当前锁的线程是同一个,就通过hincrby减1的方式,释放一次锁。若剩余次数还大于0,则证明是重入锁,再次刷新过期时间。
锁已不存在,通过publish发布锁释放的消息,解锁成功


到这里就结束了,眼过千百不如手过一遍,自己试试就明白了,各位老板看到这里能不能点个赞,鄙人想看看恐怖如斯的二级世界,谢谢各位老板!

如果有需要的话可以关注一下我的公众号,会即时更新Java相关技术文章,公众号内还有一些实用资料,如Java秒杀系统视频教程、黑马2019的教学资料(IDEA版)、BAT面试题汇总(分类齐全)、MAC电脑常用安装包(有一些是淘宝买的,已PJ的)。

Redis分布式锁的实现以及原理相关推荐

  1. 爽文,Redis分布式锁的实现和原理

    为什么需要分布式锁 我们知道,当多个线程并发操作某个对象时,可以通过synchronized来保证同一时刻只能有一个线程获取到对象锁进而处理synchronized关键字修饰的代码块或方法.既然已经有 ...

  2. Redis 分布式锁的正确实现原理演化历程与 Redisson 实战总结

    Redis 分布式锁使用 SET 指令就可以实现了么?在分布式领域 CAP 理论一直存在. 分布式锁的门道可没那么简单,我们在网上看到的分布式锁方案可能是有问题的. 一步步带你深入分布式锁是如何一步步 ...

  3. 快来学习Redis 分布式锁的背后原理

    以前在学校做小项目的时候,用到Redis,基本也只是用来当作缓存.可阿粉在工作中发现,Redis在生产中并不只是当作缓存这么简单.在阿粉接触到的项目中,Redis起到了一个分布式锁的作用,具体情况是这 ...

  4. 分布式锁原理——redis分布式锁,zookeeper分布式锁

    首先分布式锁和我们平常讲到的锁原理基本一样,目的就是确保,在多个线程并发时,只有一个线程在同一刻操作这个业务或者说方法.变量. 在一个进程中,也就是一个jvm 或者说应用中,我们很容易去处理控制,在j ...

  5. 还不知道 Redis 分布式锁的背后原理?还不赶快学习一下

    前言 以前在学校做小项目的时候,用到Redis,基本也只是用来当作缓存.可阿粉在工作中发现,Redis在生产中并不只是当作缓存这么简单.在阿粉接触到的项目中,Redis起到了一个分布式锁的作用,具体情 ...

  6. 关于分布式锁原理的一些学习与思考:redis分布式锁,zookeeper分布式锁

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:牛人 20000 字的 Spring Cloud 总结,太硬核了~ 作者:队长给我球. 出处:https://w ...

  7. Redis分布式锁的实现原理

    目前基于Redis实现的分布式锁常用的框架是Redisson,它的使用比较简单,在项目中引入Redisson的依赖,然后基于Redis实现分布式锁的加锁与释放锁. Redis分布式锁的底层原理 ​ R ...

  8. Redis分布式锁原理解析

    这章节我们来学习一下,Redis分布式锁的一个原理,首先我们看一下目录,最开始我们要讲一下,Redis分布式锁,相关的一些命令,然后在分布式锁演进的时候呢,还会以时间戳进行一个结合,后边还会讲一下,R ...

  9. zookeeper 分布式锁_关于redis分布式锁,zookeeper分布式锁原理的一些学习与思考

    编辑:业余草来源:https://www.xttblog.com/?p=4946 首先分布式锁和我们平常讲到的锁原理基本一样,目的就是确保,在多个线程并发时,只有一个线程在同一刻操作这个业务或者说方法 ...

最新文章

  1. NCBI|转录组原始数据上传
  2. js 中的break continue return
  3. org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'xx' is defined
  4. 华硕老毛子(Padavan)——L2TP连接自动重连解决方案
  5. c语言为什么要建项目,一个C语言小项目为什么都说牛逼
  6. 暑期训练日志----2018.8.18
  7. Linux基础系列4(ls,cp命令详解)
  8. Kafka 源码环境搭建
  9. codewars python Regex validate PIN code
  10. 腾讯敏捷开发及快速迭代
  11. MODULE_DEVICE_TABLE宏的作用
  12. boost::lexical_cast 学习小记
  13. 使用CSS完成用户注册页面;
  14. 绘画入门经典教程——如果你想, 一切皆有可能!
  15. 基于Java毕业设计房屋租赁系统源码+系统+mysql+lw文档+部署软件
  16. Mysql笔试题(转载)
  17. 计算机中专综合知识,湖南省汨罗市职业中专高考(八)计算机应用专业综合知识试题讲解.doc...
  18. [android adb shell] 怎么解锁图案锁屏(pattern lock)的手机?
  19. 【翻译】数据资产价值评估
  20. 数字健康-共建共享 火绒安全将亮相2021中华医院信息网络大会(CHINC)

热门文章

  1. Config variable ${APACHE_RUN_DIR} is not defined
  2. 什么是点阵图 Bitmap ?
  3. 加密与解密第三版光盘ISO资料
  4. lambda表达式图解-一图胜百文
  5. 走着走着就散了,回忆都淡了,看着看着就倦了,星光也暗了
  6. 一个基于JDBC的通用DAO的设计参考(北大青鸟课程)
  7. 安装windows和android双系统,小雷问答丨普通电脑怎么装 Windows 和安卓的双系统?...
  8. 以前学习C语言资料2
  9. 手机上视频格式m3u8装换为mp4格式文件
  10. 如何使用Bancor避免无常损失