Semaphore和countDownLatch差不多,都涉及到共享锁,也涉及到PROPAGATE,所以一起讲,至于ReentrantReadWriteLock以及CyclicBarrier,这个之后再说,如果不了解aqs请查看之前的博文

Semaphore

Semaphore semaphore = new Semaphore(2); //设置state值
semaphore.acquire();
semaphore.release();

初始化的时候设置一个值,如果线程acquire数量超过了这个值,再acquire的线程会阻塞,直到有线程调用release释放,注意,不一定是先前acquire的线程才release,和锁有点不同。

semaphore.acquire()

public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
//其中,sync是
public Semaphore(int permits) {sync = new NonfairSync(permits);
}

sync.acquireSharedInterruptibly(1)中

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//还是老方法,先try尝试一下,如果没有申请上再说,返回的是remaining,也就是减去现在申请1个还剩下多少,小于0的时候,也就是没有申请上if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
}
//具体的很简单,这个还看不懂直接自杀
final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}

如果尝试失败,那么进入doAcquireSharedInterruptibly方法,开始第二次申请或者阻塞

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//放入队列,注意如果队列不存在的话就新建一个,注意如果是新建的话,这个队列的头是个空的,然后指向这个nodefinal Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {//再次尝试能不能搞到,不过这一次必须安装链表的顺序来,也就是clh队列那个机制final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);//r>=0说明搞到了资源,此时将此节点设为新头,注意注意  //doAcquireSharedInterruptibly方法和独占模式的acquireQueued方法类似,但区别是共享模式在一个节点获取锁后,会通知后续的节点也来一起尝试获取//这里还要注意,如果在这里申请成功了,那么必说明有线程调用了semaphore.release()释放了线程if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//这个之前有,在阻塞之前要把前驱节点waitStatus设为-1if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}

setHeadAndPropagate

private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);/** Try to signal next queued node if:*   Propagation was indicated by caller,*     or was recorded (as h.waitStatus either before*     or after setHead) by a previous operation*     (note: this uses sign-check of waitStatus because*      PROPAGATE status may transition to SIGNAL.)* and*   The next node is waiting in shared mode,*     or we don't know, because it appears null** The conservatism in both of these checks may cause* unnecessary wake-ups, but only when there are multiple* racing acquires/releases, so most need signals now or soon* anyway.*///注意这里,propagate是在tryAcquireShared(arg)那一瞬间的剩余值,如果大于0,那么有理由相信还有剩余的资源可以通知后面的线程。//h为null说明根本没有,这个只是为了防止空指针异常,因为接下来要判断h.waitStatus < 0,//第一个h.waitStatus这里涉及到了PROPAGATE的使用(因为之所以会调用setHeadAndPropagate方法,是第一次尝试申请资源的时候没有申请好,在第二次或者第n次申请的时候才会进入这个,那么必有其他线程调用了semaphore.release()修改了state,而release()涉及到对链表后续节点的释放,会修改头节点的waitStatus(也不一定),所以这里第一个h.waitStatus有可能等于0,而如果小于0,则涉及到PROPAGATE,之后再讲)。//现在的头如果为空也是防止空指针异常,第二个h.waitStatus < 0说明后续有节点if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;//注意,这里要判断下一个节点为共享锁的节点才会进行释放(在Semaphore中因为都是共享锁的节点所以可以忽略),判断s是否为null一方面也是为了后面判断s是否是共享节点时不会抛出空指针异常;但更重要的原因是因为如果node是CLH队列中的最后一个节点的话,这个时候虽然拿到的s是null,但如果此时有其他的线程在CLH队列中新添加了一个节点后,此处并不能及时感知到这个变化。于是此时也会走进doReleaseShared方法中去处理这种情况(当然,如果没有发生多线程插入节点的时候,多调用一次doReleaseShared方法也是无妨的,在该方法里面会过滤掉这种情况)。同时这里会特殊判断共享节点是因为CLH队列中可能会存在独占节点和共享节点共存的场景出现,也就是ReentrantReadWriteLock读写锁的场景。这里会一直传播唤醒共享节点直到遇到一个独占节点为止,后面的节点不管是独占或共享状态都不会再被唤醒了if (s == null || s.isShared())doReleaseShared();}
}

shouldParkAfterFailedAcquire中会将头前驱节点的waitStatus设置成-1.

未申请上,则挂起。

semaphore.release()

public void release() {sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {//注意,这个虽然是try,但是必返回的是true(或者抛出异常)if (tryReleaseShared(arg)) {//尝试唤醒挂起的线程doReleaseShared();return true;}return false;
}
//没什么好说的
protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}
}

doReleaseShared

private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases.  This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {//将头节点设置为0,不然的话同一时间以后很多线程都执行release方法,都会执行到这,执行unparkSuccessor(h),效率不高。if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck cases//和前面相同unparkSuccessor(h);}//同一时间以后很多线程都执行release方法,只有一个线程成功的执行了unparkSuccessor(h),之后第二快的线程将head节点的WaitStatus改成了Node.PROPAGATE,是为了消除高并发下乐观锁有节点被不唤醒的问题else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}if (h == head)                   // loop if head changedbreak;}
}

PROPAGATE状态

那么,为什么要改呢?举一个例子:

假设现在使用Semaphore semaphore = new Semaphore(2)

https://blog.csdn.net/tomakemyself/article/details/109499230

但是这个也只是对比修改之前和修改之后,没有解释

private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}
}

(h = head) == null || h.waitStatus < 0这一行中拿到现在的head,并且判断 h.waitStatus < 0,这样的话即使不设置Node.PROPAGATE,也会最终刷新。

CountDownLatch

基本用法,注意可以多个线程同时调用latch.await()一同阻塞。

public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(10);ExecutorService exec = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {exec.execute(() -> {try {int millis = new Random().nextInt(10000);System.out.println("等待游客上船,耗时:" + millis + "(millis)");Thread.sleep(millis);} catch (Exception ignore) {} finally {latch.countDown(); // 完事一个扣减一个名额 }};});// 等待游客latch.await();System.out.println("船长急躁了,开船!"); // 关闭线程池exec.shutdown();}
}

先看countDownLatch.await()吧

public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//先尝试一下是否可以获得资源(也就是到0的时候)if (tryAcquireShared(arg) < 0)//尝试不行,开始之后的再尝试或者阻塞doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {//如果等于0就返回1,不然返回-1return (getState() == 0) ? 1 : -1;
}

doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//放入队列中,注意可以多个线程同时调用latch.await()一同挂起,所以要放入队列排队final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {//如果发现是在头节点后面,再次尝试int r = tryAcquireShared(arg);if (r >= 0) {//申请到了资源,把自己申请成为头,如果是个链表,通知后面排队的线程节点setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//这一部分不用解释了吧,判断并阻塞if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}

再看countDownLatch.countDown()

public void countDown() {sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {//将state减1if (tryReleaseShared(arg)) {//看看能不能释放锁doReleaseShared();return true;}return false;
}
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}
}
//释放链表后面的线程,让他们再去尝试申请资源
private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases.  This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}if (h == head)                   // loop if head changedbreak;}
}

Semaphore和countDownLatch完全解析相关推荐

  1. Redisson 分布式锁源码 11:Semaphore 和 CountDownLatch

    前言 Redisson 除了提供了分布式锁之外,还额外提供了同步组件,Semaphore 和 CountDownLatch. Semaphore 意思就是在分布式场景下,只有 3 个凭证,也就意味着同 ...

  2. 抽象同步器AQS应用之-- Semaphore、CountDownLatch、CyclicBarrier的介绍

    文章目录 1. Semaphore 2. CountDownLatch 3. CyclicBarrier 1. Semaphore Semaphore字面意思是信号量,作用是控制访问特定资源的线程数目 ...

  3. AQS、Semaphore、CountDownLatch与CyclicBarrier原理及使用方法

    AQS AQS 的全称为 AbstractQueuedSynchronizer ,翻译过来的意思就是抽象队列同步器.这个类在 java.util.concurrent.locks 包下面,AQS 就是 ...

  4. Java多线程(八)之Semaphore、CountDownLatch、CyclicBarrier、Exchanger

    一.引言 Semaphore               :一个计数信号量 CountDownLatch          :一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线 ...

  5. 并发-6-wait、notify、Semaphore、CountDownLatch、CyclicBarrier

    wait().notify()和notifyAll()是Object类中的方法:   为什么wait()等方法是在Object中而不是Thread中呢?   同理,wait(),notify()是对等 ...

  6. Semaphore、CountDownLatch和CyclicBarrier

    这三者都是java并发包的工具类,提供了比synchronized更加高级的各种同步结构,可以实现更加丰富的多线程操作. Semaphore 信号量,我们应该都在操作系统课程里学过,它是解决进程间通信 ...

  7. java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri

    文章目录 前言 1. stampedLock 1. 概述 2. 代码 1. 读读 2. 读写 3. 注意 2. Semaphore 1. 基本使用 2. 应用场景 3. 原理 3. Countdown ...

  8. 面经手册 · 第18篇《AQS 共享锁,Semaphore、CountDownLatch,听说数据库连接池可以用到!》

    作者:小傅哥 博客:https://bugstack.cn Github:https://github.com/fuzhengwei/CodeGuide/wiki 沉淀.分享.成长,让自己和他人都能有 ...

  9. Java8 Semaphore与Exchanger 源码解析

    目录 一.Semaphore 1.使用 2.定义 3.acquire / acquireUninterruptibly / acquireUninterruptibly / tryAcquire 4. ...

最新文章

  1. itchat 动态注册
  2. 星座图与IQ调制总结+BPSK、QPSK、8PSK、16QAM等的区别与总结
  3. pytorch中的CrossEntropyLoss
  4. 积跬步,聚小流------Bootstrap学习记录(3)
  5. php 8 jit,PHP JIT 是什么?PHP8 新特征之 JIT 图文详解_后端开发
  6. linux简单邮件系统,怎样简单搭建一个Linux操作系统邮件服务器
  7. 机器视觉——双目视觉的基础知识(视差深度、标定、立体匹配)
  8. 1.Shell 编程从入门到精通 --- 第一个 Shell 程序
  9. GitLab 8.9增加了文件锁和硬件U2F支持
  10. catia 工厂设计_关于基于CATIA的三维工厂的设计最终版实用模版
  11. 无法打开文件ws32_2.lib ws2_32.lib
  12. Verilog中的按键消抖
  13. Spring学习(五):动态代理的两种实现方式(全网最容易懂)
  14. Git——详解操作码云
  15. qp_查看表的数据是否更新了指定的某一天数据
  16. 插画手绘培训,“安利25周年”插画主题创作:畅游RichJay的创业之旅!【信念篇】
  17. 从零开始的基于百度大脑EasyData的多人协同数据标注
  18. 张博涵清华大学_特别专访 | 张博涵 上
  19. 情人节主题的公众号图文排版怎样设计最走心?
  20. 安卓基础学习 Day 6|常用控件---列表视图+古诗查看

热门文章

  1. 如何成为一个积极主动的项目经理
  2. linux open_namei,刚学Linux 想问问这个程序写的什么 有大佬帮帮吗
  3. 云环境中的测试即服务(TAAS)体系结构
  4. VS2010~VS2015~VS2019颜色字体设置策略
  5. 沧正压力传感器/压力变送器的分类与选型
  6. 3M EDI 855 采购订单确认报文详解
  7. 2021年弥勒一中高考成绩查询,快来查!2020年云南省高考成绩查询入口-云南招考频道...
  8. python爬取豆瓣小组_Python 爬虫实例+爬取豆瓣小组 + wordcloud 制作词云图
  9. jetty架构及工作原理
  10. Android之视频播放器<JiaoZiVideoPlayer>的使用及解决的问题