前言

大家好,我是飓风
上一遍文章 分布式锁的实现- mysql,我们讲解了分布式锁实现的特性,主要包含:

  • 互斥性
  • 超时特性
  • 提供阻塞和非阻塞接口
  • 可重入性
  • 公平锁和非公平锁
  • 其他 高性能 高可用等

今天咱们来看看用zookeeper 怎么来实现分布式锁。

实现

环境准备

  • zookeeper 3.4.11
  • jdk 8
  • spring-boot 2.3.2.RELEASE
  • curator-framework 4.0.0

jdk 安装

这里就不介绍jdk安装了,相信大家肯定google 或者百度都可以查到,很简单,略过。

zookeeper 安装

这里我们利用docker 来快速安装和启动
安装:

docker pull zookeeper:3.4.11

启动:

docker run --name zookeeper --restart always -d zookeeper:3.4.11

创建maven 项目

这里创建maven 项目就省略了,下面的maven的依赖配置,具体的版本号在我的父pom里,完成代理,我会传到github 上。

<dependencies><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>org.slf4j</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>provided</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>transmittable-thread-local</artifactId></dependency></dependencies>

实现原理

zookeeper 的特性

  • 结构简单类,似于文件系统的树状结构
  • 单系统镜像,无论客户端连接到哪一个服务器,他将看到相同的、Zookeeper视图
  • 有序性, 有序的事务编号,客户端的更新顺序与它们被发送的顺序相一致
  • 原子性, 更新操作要么成功要么失败,没有第三种结果

zookeeper的节点

  • 每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
  • znode有三种类型,临时的( EPHEMERAL )、持久的( PERSISTENT )和有序的 (SEQUENTIAL)
  • znode的类型在创建时确定并且之后不能再修改znode可以包含数据和子节点,但是EPHEMERAL类型的节点不能有子节点
  • znode中的数据可以有多个版本,比如某一个路径下存有多个数据版本,那么查询这个路径下的数据就需要带上版本
  • 节点不支持部分读写,而是一次性完整读写
  • 短暂znode的客户端会话结束时,zookeeper会将该短暂znode删除,短暂znode不可以有子节点
  • 持久znode不依赖于客户端会话,只有当客户端明确要删除该持久znode时才会被删除
  • 客户端应用可以在节点上设置监视器

zookeeper watcher

Watcher 在 zookeeper 是一个核心功能,Watcher 可以监控目录节点的数据变化以及子目录的变化,一旦这些状态发生变化,服务器就会通知所有设置在这个目录节点上的 Watcher,从而每个客户端都很快知道它所关注的目录节点的状态发生变化,而做出相应的反应。

公共创建相同的临时节点方式

通过上面的zookeeper的介绍,我们知道,zookeeper 是读写是原子性的,且节点不能重复创建的,那么我们就让客户端,在获取分布式锁的时候,去创建这个临时节点,先创建的client ,那么就会返回创建成功,其他都会返回创建失败,其他创建失败,此时需要监听这个 临时节点,如果临时节点被删除了,那么说明就是释放锁了,其他client 可以接着创建这个临时节点,来争抢分布式锁。
之所以会利用临时节点,如果程序down掉了,那么此时临时节点就会自动删除,不会出现死锁的现象。
举例:比如我们进行某个SKU库存的扣减,那么此时zookeeper 的创建节点的路径咱们可以设置为 /lock/sku/100121212, 其中 100121212 就是要扣减的sku的值,也是一个临时节点。
下面我画个图,看完相信会更加清晰。

1.创建临时节点,也就是开始获取锁,如下图:

2.client1 获取锁成功,如下图:


3.此时client1 获取锁成功,其他client2 和client3 获取锁失败

接着client2 和 client3 开始监听 这个临时节点是否被删除了,如下图:

4.client1 执行完扣减库存业务,那么就会删除临时节点,也就是释放锁,那么其他client2 和client3 监听到这个临时节点被删除了,那么就会再次进行锁的获取,也就是创建这个临时节点了。


通过上面这个几个步骤,一个基于zookeeper临时节点的分布式锁就实现了。但是这里有些问题需要说明下:

当大量客户端去竞争锁的时候,会发生“惊群”效应,这里惊群效应指的是在分布式锁竞争的过程中,大量的"Watcher通知"和“创建/lock/sku/xxxx”两个操作重复运行,并且绝大多数运行结果都创建节点失败,从而继续等待下一次通知,若在集群规模较大的情况下,会对ZooKeeper服务器以及客户端服务器造成巨大的性能影响和网络冲击,所以基于这种方式的实现,并发量上支持不很高,大流量下不建议使用。

下面我来介绍改进方案

基于zookeeper的临时顺序节点方式

临时顺序节点原理

我们可以利用创建zookeeper的临时顺序节点的方式,来解决“惊群”效应,其实是一种公平锁的实现,下面说下具体的步骤:

  1. 使用 zookeeper 的临时节点和有序节点,每个线程获取锁就是在 ZK 创建一个临时有序的节点,比如在 /lock/sku/000001, /lock/sku/000002, /lock/ sku/000003
    其中sku 是要你进行写的公共资源。
    如下图所示:三个client 同时想进行sku= 100121212 进行扣钱库存,那么sku = 100121212 就是共享资源,需要进行加锁,三个client 就会去创建临时顺序节点,
    /lock/100121212,分别创建了 /lock/10012121/001,/lock/10012121/002 ,/lock/10012121/003

  1. 创建节点成功后,获取 /lock/sku 目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点,如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。

  1. 如果当前线程创建的节点不是所有节点序号最小的节点,则对节点序号的前一个节点添加一个事件监听。如下图所示:

  1. 前一个节点被删除了,那么就会被监听到,此时又会获取临时顺序节点的集合,看自己是不是最小的,如果是,那么就获到了,如果不是继续进行监听。

如下图所示,001 被删除了,那么client2 就会监听到001 被删除了,于是再次获取到子节点集合,判断自己已经示最小的节点了,那么获取锁成功了。

临时顺序节点代码

这里代码不做过多解释了,给了主要类的实现说明,可以和上面的原理对应上的。
1 实现了阻塞获取锁
2 实现了非阻塞获取锁
3 锁的可重入性

lock 接口 ,定义要实现获取锁和释放锁的方法

public interface Lock {/*** 阻塞获取锁* @return*/void lock(String source) throws LockException;/*** 非阻塞获取锁* @return*/boolean nonLock(String source,int retries);/*** 释放锁*/boolean unLock();
}

lock 接口的实现

@Slf4j
@RequiredArgsConstructor
@Component
@Scope(value = "prototype")
public class ZookeeperLock implements Lock {private final CuratorFramework curatorFramework;private final Watcher watcher = event -> {if (event.getType() == Watcher.Event.EventType.NodeDeleted){notifyAllFromWatcher();}};/*** key: lock path* value: 重入的次数*/private static final TransmittableThreadLocal<LockInfo> THREAD_LOCAL = new TransmittableThreadLocal<>();private LockInfo getLocalMap(){LockInfo lockInfo = THREAD_LOCAL.get();if (lockInfo == null){lockInfo = new LockInfo();THREAD_LOCAL.set(lockInfo);}return lockInfo;}private String createLockPath(String source)  {String base = "/" + source;// 创建临时节点,这里肯定谁最小是谁先创建出来String currentPath = null;try {currentPath = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(base+"/lock_");} catch (Exception e) {e.printStackTrace();}return currentPath;}private String getLockPath(){final LockInfo lockInfo = getLocalMap();return lockInfo.lockPath;}private String getBasePath(){final LockInfo lockInfo = getLocalMap();return "/"+lockInfo.source;}private String getSource(){final LockInfo lockInfo = getLocalMap();return lockInfo.source;}private void setLock(){final LockInfo lockInfo = getLocalMap();lockInfo.lock = true;}private void incCount(){final LockInfo lockInfo = getLocalMap();lockInfo.count++;}private void deCount(){final LockInfo lockInfo = getLocalMap();lockInfo.count--;}private int getCount(){return getLocalMap().count;}private boolean isLock(){return  getLocalMap().lock;}private synchronized void notifyAllFromWatcher(){notifyAll();}private  void nextLock() throws LockException {boolean deleted = false;try {// 不相等,那么说明有比它大的,那么找出它的弟弟节点,进行监听//监听上一个节点final List<String> childrenPath = curatorFramework.getChildren().forPath(getBasePath());final String youngerBrother = CommonUtil.getYoungerBrother(getSource(), childrenPath, getLockPath());//如果为空个,说明就剩下她自己一个了,那么直接返回获取if (StringUtils.isEmpty(youngerBrother)){log.error("currentThread=> "+Thread.currentThread().getId()+"'s youngerBrother is null ");lock(getSource());return;}curatorFramework.getData().usingWatcher(watcher).forPath(youngerBrother);synchronized(this){wait();}lock(getSource());}catch (Exception e){//如果是 NoNodeException ,说明我监听的节点不存了,那么如要继续获取锁if(e instanceof KeeperException.NoNodeException){lock(getSource());return;}e.printStackTrace();deleted = true;throw new LockException("获取锁失败:" + e.getMessage());}finally {// 等待超时,没有获取到锁,那么删除zookeeper 中的临时节点和thread local内的数据if (deleted){removeResource();throw new LockException("超时-获取锁失败");}}}private void initCurrentLock(String source) throws LockException {final LockInfo lockInfo = getLocalMap();//如果为空,那么说第一次尝试获取锁if (StringUtils.isEmpty(lockInfo.lockPath)){String lockPath = createLockPath(source);if (StringUtils.isEmpty(lockPath)){throw new LockException("创建锁失败,请稍后重试");}lockInfo.source = source;lockInfo.lockPath = lockPath;}}@Overridepublic void lock(String source) throws LockException {initCurrentLock(source);//如果获得了,那么不要继续了if (lockResource()){return;}try {//阻塞 监听nextLock();} catch (Exception e) {e.printStackTrace();throw  new LockException("获取锁失败,请稍后重试");}}@Overridepublic boolean nonLock(String source,int retries){boolean notLock = false;try {initCurrentLock(source);while (retries>0){if (lockResource()){return true;}retries--;}if (retries==0){notLock = true;}} catch (LockException e) {e.printStackTrace();log.error("上锁失败: {} ",e.getMessage());notLock = true;return false;}finally {//如果出现异常了,那么一定要删除if (notLock){removeResource();}}return !notLock;}@Overridepublic boolean unLock() {if (getCount()>1){deCount();return true;}return removeResource();}private boolean removeResource(){try {String lockPath = getLockPath();THREAD_LOCAL.remove();if (!StringUtils.isEmpty(lockPath) && curatorFramework.checkExists().forPath(lockPath)!=null){curatorFramework.delete().forPath(lockPath);}} catch (Exception e) {e.printStackTrace();//todo: 如果删除锁失败了,那么要记录日志,同时报警,进行人工干预return false;}return true;}private boolean lockResource() throws LockException {//判断是否重入了if (isLock()){incCount();return true;}String lockPath  = getLockPath();String basePath =  getBasePath();int currentNumber = CommonUtil.getNumber(lockPath);List<String> childrenPath;try {childrenPath = curatorFramework.getChildren().forPath(basePath);} catch (Exception e) {e.printStackTrace();throw new LockException("获取锁列表失败");}// 获取所有节点的最小节点数字int minNumber = CommonUtil.getMin(childrenPath);//如果相等,那么它就是最小的,获得锁if (currentNumber == minNumber){System.out.println("lock thread: " + Thread.currentThread().getId() +" , lock number: " + currentNumber);setLock();incCount();return true;}return false;}@Datapublic static class LockInfo{private String source;private String lockPath;private int count;private boolean lock = false;}
}

源码地址

具体源码地址: github 点击

总结

zookeeper分布式锁的实现方式

  • 共同创建临时节点的方式,会引起“惊群”效应,并发量不能太高
  • 临时顺序接的方式,只会监听上一个顺序节点,性能会很高。

分布式锁的实现- zookeeper相关推荐

  1. 分布式锁-redis、zookeeper优缺点

    redis分布式锁优缺点 缺点: 获取锁的方式简单粗暴,获取不到锁直接不断尝试获取锁,比较消耗性能: redis的设计定位决定了它的数据并不是强一致性的,在某些极端情况下,可能会出现问题.锁的模型不够 ...

  2. api 创建zookeeper客户端_zookeeper分布式锁原理及实现

    前言 本文介绍下 zookeeper方式 实现分布式锁 原理简介 zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,并且 ...

  3. 肝一下ZooKeeper实现分布式锁的方案,附带实例!

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 zookeeper客户端选型 原生zookeeper客户端,有wa ...

  4. [转载] zookeeper 分布式锁服务

    转载自http://www.cnblogs.com/shanyou/archive/2012/09/22/2697818.html 分布式锁服务在大家的项目中或许用的不多,因为大家都把排他放在数据库那 ...

  5. 分布式锁原理——redis分布式锁,zookeeper分布式锁

    首先分布式锁和我们平常讲到的锁原理基本一样,目的就是确保,在多个线程并发时,只有一个线程在同一刻操作这个业务或者说方法.变量. 在一个进程中,也就是一个jvm 或者说应用中,我们很容易去处理控制,在j ...

  6. 关于分布式锁原理的一些学习与思考:redis分布式锁,zookeeper分布式锁

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:牛人 20000 字的 Spring Cloud 总结,太硬核了~ 作者:队长给我球. 出处:https://w ...

  7. 什么是分布式锁?redis、zookeeper、etcd实现分布式锁有什么不同之处?

    目录 分布式锁定义 目的 基于redis分布式锁 基于zookeeper实现的分布式锁 edis.zookeeper.etcd实现分布式锁的比较 建议选择etcd实现分布式锁 分布式锁定义 分布式环境 ...

  8. Zookeeper分布式锁的使用

    由于公司引入了dubbo+zookeeper框架,里面不可避免的引入的zookeeper分布式锁,所以自己大致了解了一下.由于是自己研究,有不正确的地方还请大佬批评指正. 首先先介绍一下自己对zook ...

  9. zookeeper 分布式锁服务

    分布式锁服务在大家的项目中或许用的不多,因为大家都把排他放在数据库那一层来挡.当大量的行锁.表锁.事务充斥着数据库的时候.一般web应用很多的瓶颈都在数据库上,这里给大家介绍的是减轻数据库锁负担的一种 ...

最新文章

  1. 吴恩达:AI未来将呈现四大发展趋势
  2. 超级智能的定义,一个已经诞生并不断深刻影响人类的新智能
  3. APL开发日志--2012-11-08
  4. 树与二叉树(c/c++)
  5. 树莓派+百度api实现人脸识别
  6. Python语言编程学习:numpy中的array格式数据切片与pandas中的dataframe格式数据切片、相互转换
  7. SVM 实现与代码(转)
  8. STL sector 应用
  9. Asp.net性能优化-提高ASP.Net应用程序性能的十大方法
  10. 有图有真相——关于“视频专辑:零基础学习C语言 ”
  11. Linux下ICMP洪水***实例
  12. 手机如何打开html文件怎么打开,怎么在手机上打开HTML文件怎么打开
  13. 关于C#如何引用Microsoft.Office.Interop.Excel
  14. excel熵值法计算权重_SPSS主成分分析 | 权重计算amp;极差法标准化超详细教程!(下)...
  15. 互联网体育健康猫触雷之败:留给了创业者什么警示?
  16. 牛客网项目 1.5Mybatis入门
  17. 【亲测有效】如何解决Hadoop运行jar包 报错Exception in thread “main“ java.lang.ClassNotFoundException: /input
  18. php里怎么输入,PHP是怎么进行输入输出的
  19. 前程无忧:节后芯片、通讯行业人才需求看涨, 金融业走低
  20. Qt编写物联网管理平台37-逻辑设计

热门文章

  1. 你给我解释解释,为什么TMD非得选择SpringCloud alibaba作为微服务开发框架?
  2. libsvm java 怎么调参_libsvm使用说明
  3. 张量入门(Tensor for Beginners)(一)
  4. 考研英语 - word-list-8
  5. java中的自动装箱和自动拆箱
  6. win10本地启动tomcat
  7. 双十二,WAVE SUMMIT+2021峰会亮点抢先看!
  8. 把多张 PNG 图片拼在一起合成一张 RAW 格式的图片(附代码)
  9. 安全基础--29--Grails开发基础操作与配置
  10. 线性代数学习指导与MATLAB编程实践,线性代数学习指导与MATLAB编程实践