最近又在重读CyclicBarrier源码,并进行了深入分析,重点源码也自己跟过并做了一些注释,仅供大家参考。

CyclicBarrier:回环栅栏(有人也称之为循环屏障),通过他可以让一组线程等待至某个状态(屏障点)之后再全部同时执行,同时他还有一个特点,所有线程都被释放了以后,CyclicBarrier还可以被重用。

废话不多说,一切以实践为主,以下是我写的一个例子,供大家理解。

CyclicBarrier cyclicBarrier = new CyclicBarrier(3, ()->{System.out.println("人满发车了");});System.out.println("一车3人,车满发车");
for (int i = 0; i < 10; i++) {int finalI = i;new Thread(()->{System.out.println("第"+ finalI +"个人上车了");try {cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}).start();
}

再来看一下执行结果:

从结果可以看出来,车上每上3个人,就会人满发车,最后第9个人上车了,但是由于不满3个人,所以线程一直没发车,直到等到3个人满时才会发车。

1. 先看下CyclicBarrier的结构有哪些方法和属性

//栅栏代
private static class Generation {//是否要打破boolean broken = false;
}//资源独占锁
private final ReentrantLock lock = new ReentrantLock();
//条件等待队列
private final Condition trip = lock.newCondition();
//拦截的线程数
private final int parties;
//到达屏障点(换代前),执行的方法
private final Runnable barrierCommand;
//栅栏代
private Generation generation = new Generation();
//线程计数器
private int count;

CyclicBarrier提供了2个构造方法。

public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();//标识屏障拦截的线程数量,相当于一个副本,作用是在达到屏障后进行重置count操作this.parties = parties;//线程计数器this.count = parties;//达到屏障点后执行的方法this.barrierCommand = barrierAction;
}public CyclicBarrier(int parties) {this(parties, null);
}

2. 接着我们分析一下CyclicBarrier的await()方法

public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}
}private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException//实际用的是一把独占锁           final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;if (g.broken)throw new BrokenBarrierException();//线程是否中断,如果中断的话,需要打破栅栏,并抛出中断异常    if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}int index = --count;if (index == 0) {  // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;//到达屏障点时,执行需要处理的业务if (command != null)command.run();ranAction = true;//执行下一代屏障,在这儿会将条件等待队列的元素转到同步队列,并将之前阻塞的线程唤醒,nextGeneration();return 0;} finally {//是否要打破栅栏代if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed outfor (;;) {try {if (!timed)//在线程计数器count不等于0时,会走到Condition.await()trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}
}/*** Implements interruptible condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,*      throwing IllegalMonitorStateException if it fails.* <li> Block until signalled or interrupted.* <li> Reacquire by invoking specialized version of*      {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* </ol>*/
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//添加到条件等待队列中Node node = addConditionWaiter();//释放当前拿到的ReentrantLock锁int savedState = fullyRelease(node);int interruptMode = 0;//判断是否在同步队列上while (!isOnSyncQueue(node)) {//不在的话,就阻塞当前队列LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}/*** Adds a new waiter to wait queue.* @return its new wait node*/
private Node addConditionWaiter() {//第一次进来时,lastWaiter为NullNode t = lastWaiter;// If lastWaiter is cancelled, clean out.//如果最后一个节点不为空而且不是条件等待状态if (t != null && t.waitStatus != Node.CONDITION) {//剔除已取消的等待队列,并重新给t复制lastWaiterunlinkCancelledWaiters();t = lastWaiter;}//新创建一个thread当前线程的NodeNode node = new Node(Thread.currentThread(), Node.CONDITION);//lastWaiter为空的话,新增firstWaiter为当前节点;lastWaiter不为空的话,新增t.nextWaiter为当前节点;if (t == null)firstWaiter = node;elset.nextWaiter = node;//将lastWaiter指向当前节点lastWaiter = node;return node;
}/*** Invokes release with current state value; returns saved state.* Cancels node and throws exception on failure.* @param node the condition node for this wait* @return previous sync state*/
final int fullyRelease(Node node) {boolean failed = true;try {//拿到当前的锁状态标识stateint savedState = getState();//释放锁if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}
}/*** Unlinks cancelled waiter nodes from condition queue.* Called only while holding lock. This is called when* cancellation occurred during condition wait, and upon* insertion of a new waiter when lastWaiter is seen to have* been cancelled. This method is needed to avoid garbage* retention in the absence of signals. So even though it may* require a full traversal, it comes into play only when* timeouts or cancellations occur in the absence of* signals. It traverses all nodes rather than stopping at a* particular target to unlink all pointers to garbage nodes* without requiring many re-traversals during cancellation* storms.*/
private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) {Node next = t.nextWaiter;if (t.waitStatus != Node.CONDITION) {//出队操作,由于条件等待队列是个单向链表,所以只需要将t.nextWaiter=null即可完成出队操作t.nextWaiter = null;if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)lastWaiter = trail;}elsetrail = t;t = next;}
}/*** Updates state on barrier trip and wakes up everyone.* Called only while holding lock.*/
private void nextGeneration() {// signal completion of last generation//唤醒所有队列trip.signalAll();// set up next generation //重置计数器count = parties;//新创建一个屏障generation = new Generation();
}/*** Moves all threads from the wait queue for this condition to* the wait queue for the owning lock.** @throws IllegalMonitorStateException if {@link #isHeldExclusively}*         returns {@code false}*/
public final void signalAll() {//判断是否为当前独占线程if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);
}/*** Removes and transfers all nodes.* @param first (non-null) the first node on condition queue*/
private void doSignalAll(Node first) {//直接清空条件等待队列,设计精髓所在lastWaiter = firstWaiter = null;do {//保存first的下一个节点Node next = first.nextWaiter;//first出单项链表first.nextWaiter = null;//转换到同步队列Sync并依次唤醒transferForSignal(first);//first指向下一个节点first = next;} while (first != null);
}/*** Transfers a node from a condition queue onto sync queue.* Returns true if successful.* @param node the node* @return true if successfully transferred (else the node was* cancelled before signal)*/
final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*///当前节点入队同步队列Sync,并返回node的前驱节点Node p = enq(node);int ws = p.waitStatus;//CAS将node的前驱节点p的waitStatus修改为待唤醒if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;
}/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/
private Node enq(final Node node) {//用for(;;)必须保证入队成功for (;;) {//t指向队尾Node t = tail;//如果队尾为空,说明队列为空if (t == null) { // Must initialize//CAS操作将head指向一个空Node,并将tail也指向空Nodeif (compareAndSetHead(new Node()))tail = head;} else {//如果队尾不为空,则将当前节点的前驱指向队尾node.prev = t;//CAS操作将tail指向当前节点if (compareAndSetTail(t, node)) {//由于是个双向链表,将t的后继节点指向当前节点t.next = node;return t;}}}
}

await()源码也跟着走了一遍,以下是我根据源码画了一些关键节点的流程图,便于理解。

3. 再来看一下我们的reset()方法,可以重置我们的计数器,并释放所有线程

/*** Resets the barrier to its initial state.  If any parties are* currently waiting at the barrier, they will return with a* {@link BrokenBarrierException}. Note that resets <em>after</em>* a breakage has occurred for other reasons can be complicated to* carry out; threads need to re-synchronize in some other way,* and choose one to perform the reset.  It may be preferable to* instead create a new barrier for subsequent use.*/
public void reset() {final ReentrantLock lock = this.lock;lock.lock();try {//打破栅栏breakBarrier();   // break the current generationnextGeneration(); // start a new generation} finally {lock.unlock();}
}/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {//将是否打破标识置为truegeneration.broken = true;//重置计数器count = parties;//重置后,会唤醒之前阻塞的所有的线程trip.signalAll();
}

到此为止,CyclicBarrier的两个重要的方法已分析完,其他的一些方法相对简单,就不在这儿分析了,大家有兴趣的可以去看下源码,也可以私信我进行探讨。

我们来总结一下:
说到底,底层为CyclicBarrier是基于ReentrantLock和Condition实现的。

CountDownLatch也可以实现一组线程等待至某个状态之后再全部同时执行的需求。
那到底CyclicBarrier与CountDownLatch有啥区别呢?

  1. CountDownLatch的计数器只能使用一次,而CyclicBarrier可以通过reset()使用多次.
  2. CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
  3. CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不 同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。 CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
  4. CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
  5. CyclicBarrier是通过ReentrantLock的"独占锁"和Conditon来实现一组线程的阻塞唤 醒的,而CountDownLatch则是通过AQS的“共享锁”实现。

CountDownLatch链接:暂无,后续补上。

JUC系列之CyclicBarrier详解相关推荐

  1. Java并发编程系列之CyclicBarrier详解

    简介 jdk原文 A synchronization aid that allows a set of threads to all wait for each other to reach a co ...

  2. Docker系列07—Dockerfile 详解

    Docker系列07-Dockerfile 详解 1.认识Dockerfile 1.1 镜像的生成途径 基于容器制作  dockerfile,docker build 基于容器制作镜像,已经在上篇Do ...

  3. mongo 3.4分片集群系列之六:详解配置数据库

    这个系列大致想跟大家分享以下篇章: 1.mongo 3.4分片集群系列之一:浅谈分片集群 2.mongo 3.4分片集群系列之二:搭建分片集群--哈希分片 3.mongo 3.4分片集群系列之三:搭建 ...

  4. ftm模块linux驱动,飞思卡尔k系列_ftm模块详解.doc

    飞思卡尔k系列_ftm模块详解 1.5FTM模块1.5.1 FTM模块简介FTM模块是一个多功能定时器模块,主要功能有,PWM输出.输入捕捉.输出比较.定时中断.脉冲加减计数.脉冲周期脉宽测量.在K1 ...

  5. React Native按钮详解|Touchable系列组件使用详解

    转载自:http://www.devio.org/2017/01/10/React-Native按钮详解-Touchable系列组件使用详解/ 在做App开发过程中离不了的需要用户交互,说到交互,我们 ...

  6. Material Design系列之BottomNavigationView详解

    Material Design系列之BottomNavigationView详解 Material Design官方文档Bottom navigation的介绍 BottomNavigationVie ...

  7. React 源码系列 | React Context 详解

    目前来看 Context 是一个非常强大但是很多时候不会直接使用的 api.大多数项目不会直接使用 createContext 然后向下面传递数据,而是采用第三方库(react-redux). 想想项 ...

  8. Landsat系列数据级别详解

    Landsat系列数据级别详解 转载自此文:https://www.cnblogs.com/icydengyw/p/12056211.html 一.Landsat Collection 1 Lands ...

  9. 小猫爪:i.MX RT1050学习笔记26-RT1xxx系列的FlexCAN详解

    i.MX RT1050学习笔记26-RT1xxx系列的FlexCAN详解 1 前言 2 FlexCAN简介 2.1 MB(邮箱)系统 2.1.1 正常模式下 2.1.2 激活了CAN FD情况下 2. ...

最新文章

  1. LA 5717枚举+最小生成树回路性质
  2. Django模板之jinja2模板和CSRF
  3. openmv4闪灯说明_OpenMV Cam
  4. 2014目标!!!!
  5. lua学习笔记之日期时间
  6. Windows Pe 第三章 PE头文件(上)
  7. java继承与实现的_[Java学习] Java继承的概念与实现
  8. 全世界的狗都没有“生殖隔离” | 今日趣图
  9. linux+vim+动不了,linux的vim按了ctrl+s之后假死的解决办法
  10. 计算机进入休眠状态后,Win7电脑进入休眠状态后又自动重启该怎么处理
  11. java培训学费_太原java培训班价格表
  12. 《AutoCAD 2014中文版超级学习手册》——1.3 设置绘图环境
  13. 计算机将图像数字化的原理,图像数字化
  14. 剑指Offer对答如流系列 - 把数字翻译成字符串
  15. 类似于计算机的文件管理器,XYplorer 21比电脑自带的文件管理器还好用的工具
  16. win10亮度无法调节,怎么处理
  17. 【我的渲染技术进阶之旅】你知道数字图像处理的标准图上的女孩子是谁吗?背后的故事你了解吗?为啥这张名为Lenna的图会成为数字图像处理的标准图呢?
  18. 【商业源码】生日大放送-Newlife商业源码分享
  19. stm32正常运行流程图_STM32学习笔记(超详细整理144个问题)
  20. 15、2 使用vsftpd搭建ftp服务

热门文章

  1. 驱动保护:挂接SSDT内核钩子(1)
  2. Top Android App使用的组件(一)
  3. [日常刷题]leetcode D25
  4. git本地服务器搭建-windows环境
  5. 开关电源纹波产生分析
  6. 做一名威客,用你的知识赚钱
  7. 华为OD机试题,用 Java 解【员工出勤 or 出勤奖的判断】问题 | 含解题说明
  8. python pc端微信记录读取_微信 PC 版迎来了重磅更新,可以在电脑端使用小程序了 |内附下载链接...
  9. 基于蜻蜓优化算法的认知无线电网络的服务质量研究附Matlab代码
  10. brooks levitate_Brooks推出超强能量反馈性能跑鞋Levitate 3