多线程访问同一个共享资源时,会出现并发问题,synchronized或者lock 类的锁只能控制单一进程的资源访问,多进程下就需要用到分布式锁

利用zk 可以实现独占锁,(同级节点唯一性)多个进程往zk指定节点下创建一个相同名称的节点,只有一个能成功,创建失败的通过zk的watcher机制监听子节点变化,一个监听到子节点删除事件,会再次触发所有进程的写锁,但这里会有惊群效应,会影响到性能

利用有序节点实现分布式锁:每个客户端都往一个指定节点(locks)注册一个临时有序节点,越早创建的节点编号越小,最小编号的节点获得锁,通过监听比自己小的节点,当比自己小的节点删除后,客户端会收到watcher,再次判断自己的节点是不是所有节点最小的,是则获得锁;这种方式也会解决惊群问题

接下来我们来看实现:curator 分布式锁的使用
curator对锁封装了一层,提供了InterProcessMutex;还提供了leader 选举、分布式队列 InterProcessMutex 分布式可重入排他锁
InterProcessSemaphoreMutex 分布式排他锁
InterProcessReadWriteLock 分布式读写锁

 public class LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable {private  String name;  //表示当前的进程private  LeaderSelector leaderSelector;  //leader选举的APIprivate CountDownLatch countDownLatch=new CountDownLatch(1);public LeaderSelectorClient(){}public LeaderSelectorClient(String name) {this.name = name;}public LeaderSelector getLeaderSelector() {return leaderSelector;}public void setLeaderSelector(LeaderSelector leaderSelector) {this.leaderSelector = leaderSelector;}public void start(){leaderSelector.start(); //开始竞争leader}@Overridepublic void takeLeadership(CuratorFramework client) throws Exception {//如果进入当前的方法,意味着当前的进程获得了锁。获得锁以后,这个方法会被回调//这个方法执行结束之后,表示释放leader权限System.out.println(name+"->现在是leader了");
//        countDownLatch.await(); //阻塞当前的进程防止leader丢失}@Overridepublic void close() throws IOException {leaderSelector.close();}private static String CONNECTION_STR="zk集群地址,至少三台机器";public static void main(String[] args) throws IOException {CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECTION_STR).sessionTimeoutMs(50000000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();curatorFramework.start();LeaderSelectorClient leaderSelectorClient=new LeaderSelectorClient("ClientA");LeaderSelector leaderSelector=new LeaderSelector(curatorFramework,"/leader",leaderSelectorClient);leaderSelectorClient.setLeaderSelector(leaderSelector);leaderSelectorClient.start(); //开始选举System.in.read();}
}
我们来看下curator 实现分布式锁的原理,这里我把注释写在了代码中,所以把代码贴到一块public InterProcessMutex(CuratorFramework client, String path) {// 实现公平锁的核心:zookeeper利用path 创建临时有序节点this(client, path, new StandardLockInternalsDriver());}public StandardLockInternalsDriver() {}public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {this(client, path, "lock-", 1, driver);}//maxLeases:互斥锁InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {this.threadData = Maps.newConcurrentMap();this.basePath = PathUtils.validatePath(path);//InterProcessMutex 把分布式锁的申请和释放委托给了 LockInternals internalsthis.internals = new LockInternals(client, driver, path, lockName, maxLeases);}// 无限等待public void acquire() throws Exception {if (!this.internalLock(-1L, (TimeUnit)null)) {throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);}}// 限时等待public boolean acquire(long time, TimeUnit unit) throws Exception {return this.internalLock(time, unit);}private boolean internalLock(long time, TimeUnit unit) throws Exception {Thread currentThread = Thread.currentThread();//同一线程再次acquire,首先判断threaData 是否有这个线程锁信息,如果有则原子+1,然后返回InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);if (lockData != null) {// 实现可重入;lockData.lockCount.incrementAndGet();return true;// 映射表没有对应的锁信息,尝试通过LockInternals 获取锁} else {String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());if (lockPath != null) {// 成功获取锁,存储到映射表中InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);this.threadData.put(currentThread, newLockData);return true;} else {return false;}}}// 记录线程和锁信息的映射关系private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData;//zk 中一个临时有序节点对应一个锁,但是让锁生效需要排队private static class LockData {final Thread owningThread;final String lockPath;final AtomicInteger lockCount;private LockData(Thread owningThread, String lockPath) {this.lockCount = new AtomicInteger(1);      //分布式锁重入次数this.owningThread = owningThread;this.lockPath = lockPath;}}//尝试获取锁,并返回锁对应的zk 临时有序节点路径String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {long startMillis = System.currentTimeMillis();// millisToWait 是个nullLong millisToWait = unit != null ? unit.toMillis(time) : null;byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;int retryCount = 0;String ourPath = null;//是否已经有分布式锁?boolean hasTheLock = false;//是否已经完成尝试获取分布式锁操作boolean isDone = false;while(!isDone) {isDone = true;try {//driver = StandardLockInternalsDriverourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);//循环等待激活分布式锁hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);} catch (NoNodeException var14) {if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {throw var14;}isDone = false;}}//成功获取分布式锁,返回有序节点的路径return hasTheLock ? ourPath : null;}//在zk 中创建临时顺序节点public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {String ourPath;//默认内容是ip地址if (lockNodeBytes != null) {// creatingParentContainersIfNeeded:创建父节点,如果不支持CreateMode.CONTAINER,就采用CreateMode.PERSISTENT// withProtection:临时节点添加GUIDourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes);} else {ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path);}return ourPath;}//循环等待激活分布式锁private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {boolean haveTheLock = false;boolean doDelete = false;try {if (this.revocable.get() != null) {((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);}while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {List<String> children = this.getSortedChildren();String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);if (predicateResults.getsTheLock()) {haveTheLock = true;} else {String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();synchronized(this) {try {((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);if (millisToWait == null) {this.wait();} else {millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if (millisToWait > 0L) {this.wait(millisToWait);} else {doDelete = true;break;}}} catch (NoNodeException var19) {}}}}} catch (Exception var21) {ThreadUtils.checkInterrupted(var21);doDelete = true;throw var21;} finally {if (doDelete) {this.deleteOurPath(ourPath);}}return haveTheLock;}

分布式锁-zk临时节点相关推荐

  1. 阿里面试官让我用Zk(Zookeeper)实现分布式锁

    点赞再看,养成习惯,微信搜索[三太子敖丙]关注这个互联网苟且偷生的工具人. 本文 GitHub https://github.com/JavaFamily 已收录,有一线大厂面试完整考点.资料以及我的 ...

  2. 面试官:ZK分布式锁实现,你了解了吗?

    在平时我们对锁的使用,在针对单个服务,我们可以用 Java 自带的一些锁来实现,资源的顺序访问,但是随着业务的发展,现在基本上公司的服务都是多个,单纯的 Lock或者Synchronize 只能解决单 ...

  3. 面试官:ZK(ZooKeeper)分布式锁实现,你了解了吗?

    准备 本文会使用到 三台 独立服务器,可以自行提前搭建好. 不知道如何搭建的,可以看我之前 ZooKeeper集群 搭建:Zookeeper 集群部署的那些事儿 关于ZooKeeper 一些基础命令可 ...

  4. ZK(ZooKeeper)分布式锁实现

    点赞再看,养成习惯,微信搜索[牧小农]关注我获取更多资讯,风里雨里,小农等你. 本文中案例都会在上传到git上,请放心浏览 git地址:https://github.com/muxiaonong/Zo ...

  5. 分布式锁是啥?zk还是redis?

    关于分布式系统中应该考虑的几个问题包括诸如最基本的分布式锁和分布式事务等.该篇中我们将简单来聊聊分布式锁相关知识,比如常见的分布式锁的实现方式有哪些?redis如何设计分布式锁?zk如何设计分布式锁? ...

  6. 分布式锁、ZK分布式锁、Redis分布式锁

    常见的分布式锁实现方案:ZK分布式锁.Redis分布式锁 ZK分布式锁: 原理:使用ZK 的临时有序节点.节点的监听机制来实现的. 锁特点:悲观锁,公平锁 获取锁:客户端A在/mylock节点目录下创 ...

  7. CP模式(ZK)的分布式锁分析

    目录 JAVA的锁 死锁 分布式锁 CP模型的分布式锁 zk分布式锁的原理 zk节点操作 zk分布式锁代码实现 CP模式分布式锁总结 JAVA的锁 锁是用来控制多个线程访问共享资源的方式,一般来说,一 ...

  8. c# 操作redisclient 设置过期时间_C# Redis分布式锁单节点

    (给DotNet加星标,提升.Net技能) 转自:热敷哥cnblogs.com/refuge/p/13774008.html 为什么要用分布式锁? 先上一张截图,这是在浏览别人的博客时看到的. 在了解 ...

  9. c# 操作redisclient 设置过期时间_C# Redis分布式锁 单节点

    为什么要用分布式锁? 先上一张截图,这是在浏览别人的博客时看到的. 在了解为什么要用分布式锁之前,我们应该知道到底什么是分布式锁. 锁按照不同的维度,有多种分类.比如 1.悲观锁,乐观锁; 2.公平锁 ...

最新文章

  1. HDU-1789-Doing Homework again
  2. nginx静态资源反向代理
  3. 一条sql语句,要修改一个字段的俩个值,比如把字段sex中的男改为女,女改为男...
  4. MAC算法原理与常用实现
  5. Gradle Issue: OutOfMemoryError: PermGen space
  6. 【Linux】CentOS 7 安装Redis
  7. [CTO札记]研究:日本Rakuten(乐天)
  8. xjad使用中的几个问题
  9. 单片机仿真软件Proteus安装时遇到的问题
  10. 【挨踢人物传】向立天:从电视编导到技术总监,只要努力,你也能铸就传奇(第七期)...
  11. Windows下的MySQL实例没有mysql.user表#Olivia丶长歌#
  12. 从零开始学前端第十七讲--微信小程序开发入门
  13. Ps导航栏的简略讲解(一)
  14. office2010 word 关闭很慢
  15. IPv6-GRE 隧道技术
  16. 《Cisco防火墙》一6.5 虚拟防火墙的管理访问
  17. PCIe系列专题之二:2.3 TLP结构解析
  18. 常用汇编命令OD命令总结
  19. 关于go在函数退出后子协程的退出问题
  20. echarts画市县乡镇级地图

热门文章

  1. linux ttyusb读写_linux下非root用户获得devttyUSB0的读写权限
  2. Leetcode 112. 路径总和 (每日一题 20210910)
  3. python编程基础(四):编程习惯、代码规范、易混淆之处
  4. markdown 笔记
  5. Hadoop应用实战100讲(三)-Hadoop分布式文件系统
  6. 职业大揭秘,算法攻城狮在日常工作中都干了些啥?
  7. Matlab并行运算
  8. Matplotlib实例教程 | 句子长度累积分布函数图
  9. Python的__str__()方法
  10. hadoop和spark搭建记录