简介

CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

注意比较CountDownLatch和CyclicBarrier:
(01) CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
(02) CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。

CyclicBarrier类结构

CyclicBarrier类源码

package java.util.concurrent;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class CyclicBarrier {private static class Generation {boolean broken = false;}private final ReentrantLock lock = new ReentrantLock();private final Condition trip = lock.newCondition();private final int parties;private final Runnable barrierCommand;private Generation generation = new Generation();private int count;private void nextGeneration() {trip.signalAll();count = parties;generation = new Generation();}private void breakBarrier() {generation.broken = true;count = parties;trip.signalAll();}private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;if (g.broken)throw new BrokenBarrierException();if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}int index = --count;if (index == 0) {boolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}for (;;) {try {if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;}public CyclicBarrier(int parties) {this(parties, null);}public int getParties() {return parties;}public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe);}}public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {return dowait(true, unit.toNanos(timeout));}public boolean isBroken() {final ReentrantLock lock = this.lock;lock.lock();try {return generation.broken;} finally {lock.unlock();}}public void reset() {final ReentrantLock lock = this.lock;lock.lock();try {breakBarrier();nextGeneration();} finally {lock.unlock();}}public int getNumberWaiting() {final ReentrantLock lock = this.lock;lock.lock();try {return parties - count;} finally {lock.unlock();}}
}

CyclicBarrier是通过ReentrantLock(独占锁)和Condition来实现的。下面,我们分析CyclicBarrier中的核心函数: 构造函数, await()作出分析。

1. 构造函数

CyclicBarrier的构造函数共2个:CyclicBarrier(int parties)CyclicBarrier(int parties, Runnable barrierAction)。第1个构造函数是调用第2个构造函数来实现的,直接看第2个构造函数的源码。

public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();// parties表示“必须同时到达barrier的线程个数”。this.parties = parties;// count表示“处在等待状态的线程个数”。this.count = parties;// barrierCommand表示“parties个线程到达barrier时,会执行的动作”。this.barrierCommand = barrierAction;}

2. 等待函数await()

public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen;}
}

await()方法调用的dowait方法:

private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;// 获取“独占锁(lock)”lock.lock();try {// 保存“当前的generation”final Generation g = generation;// 若“当前generation已损坏”,则抛出异常。if (g.broken)throw new BrokenBarrierException();// 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// 将“count计数器”-1int index = --count;// 如果index=0,则意味着“有parties个线程到达barrier”。if (index == 0) {  // trippedboolean ranAction = false;try {// 如果barrierCommand不为null,则执行该动作。final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;// 唤醒所有等待线程,并更新generation。nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}// 当前线程一直阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,// 当前线程才继续执行。for (;;) {try {// 如果不是“超时等待”,则调用awati()进行等待;否则,调用awaitNanos()进行等待。if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {// 如果等待过程中,线程被中断,则执行下面的函数。if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}// 如果“当前generation已经损坏”,则抛出异常。if (g.broken)throw new BrokenBarrierException();// 如果“generation已经换代”,则返回index。if (g != generation)return index;// 如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {// 释放“独占锁(lock)”lock.unlock();}
}

说明:dowait()的作用就是让当前线程阻塞直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,当前线程才继续执行。
(01) generation是CyclicBarrier的一个成员遍历,它的定义如下:

private Generation generation = new Generation();private static class Generation {boolean broken = false;
}

在CyclicBarrier中,同一批的线程属于同一代,即同一个Generation;CyclicBarrier中通过generation对象,记录属于哪一代。
当有parties个线程到达barrier,generation就会被更新换代

(02) 如果当前线程被中断,即Thread.interrupted()为true;则通过breakBarrier()终止CyclicBarrier。breakBarrier()的源码如下:

private void breakBarrier() {generation.broken = true;count = parties;trip.signalAll();
}

breakBarrier()会设置当前中断标记broken为true,意味着“将该Generation中断”;同时,设置count=parties,即重新初始化count;最后,通过signalAll()唤醒CyclicBarrier上所有的等待线程。

(03) 将“count计数器”-1,即--count;然后判断是不是“有parties个线程到达barrier”,即index是不是为0。
当index=0时,如果barrierCommand不为null,则执行该barrierCommand,barrierCommand就是我们创建CyclicBarrier时,传入的Runnable对象。然后,调用nextGeneration()进行换代工作,nextGeneration()的源码如下:

private void nextGeneration() {trip.signalAll();count = parties;generation = new Generation();
}

首先,它会调用signalAll()唤醒CyclicBarrier上所有的等待线程;接着,重新初始化count;最后,更新generation的值。

(04) 在for(;;)循环中。timed是用来表示当前是不是“超时等待”线程。如果不是,则通过trip.await()进行等待;否则,调用awaitNanos()进行超时等待。

示例

1、新建5个线程,这5个线程达到一定的条件时,它们才继续往后运行。

package cn.com.boco.HermesService;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Test {private static int SIZE = 5;private static CyclicBarrier cb;public static void main(String[] args) {cb = new CyclicBarrier(SIZE);// 新建5个任务for (int i = 0; i < SIZE; i++)new InnerThread().start();}static class InnerThread extends Thread {public void run() {try {System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");// 将cb的参与者数量加1cb.await();// cb的参与者数量等于5时,才继续往后执行System.out.println(Thread.currentThread().getName() + " continued.");} catch (BrokenBarrierException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}
}

执行结果:

结果说明:主线程中新建了5个线程,所有的这些线程都调用cb.await()等待。所有这些线程一直等待,直到cb中所有线程都达到barrier时,这些线程才继续运行!

2、新建5个线程,当这5个线程达到一定的条件时,执行某项任务。

package cn.com.boco.HermesService;import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class Test {private static int SIZE = 5;private static CyclicBarrier cb;public static void main(String[] args) {cb = new CyclicBarrier(SIZE, new Runnable() {public void run() {System.out.println("CyclicBarrier's parties is: " + cb.getParties());}});// 新建5个任务for (int i = 0; i < SIZE; i++)new InnerThread().start();}static class InnerThread extends Thread {public void run() {try {System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");// 将cb的参与者数量加1cb.await();// cb的参与者数量等于5时,才继续往后执行System.out.println(Thread.currentThread().getName() + " continued.");} catch (BrokenBarrierException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}
}

执行结果:

结果说明:主线程中新建了5个线程,所有的这些线程都调用cb.await()等待。所有这些线程一直等待,直到cb中所有线程都达到barrier时,执行新建cb时注册的Runnable任务。

CyclicBarrier相关推荐

  1. Java并发编程之CountDownLatch、CyclicBarrier和Semaphore

    前言 本文为对CountDownLatch.CyclicBarrier.Semaphore的整理使用 CountDownLatch CountDownLatch类位于java.util.concurr ...

  2. java并发之同步辅助类CyclicBarrier和CountDownLatch

    CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier).它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门, ...

  3. cyclicbarrier java_Java并发编程之CyclicBarrier和线程池的使用

    原标题:Java并发编程之CyclicBarrier和线程池的使用 下面我们来讲述一下线程池和CyclicBarrier的使用和对比. 一.场景描述 有四个游戏玩爱好者玩游戏,游戏中有三个关卡,每一个 ...

  4. JAVA中的并发工具 -- CountDownLatch、CyclicBarrier、Semaphore

    2019独角兽企业重金招聘Python工程师标准>>> CountDownLatch CountDownLatch允许一个或多个线程等待其他线程完成操作. CountDownLatc ...

  5. LeetCode 1195. Fizz Buzz Multithreaded--并发系列题目--Java 解法--AtomicInteger/CountDownLatch/CyclicBarrier

    题目地址:Fizz Buzz Multithreaded - LeetCode Write a program that outputs the string representation of nu ...

  6. LeetCode 1115. Print FooBar Alternately--多线程并发问题--Java解法--CyclicBarrier, synchronized, Semaphore 信号量

    此文首发于我的个人博客:zhang0peter的个人博客 LeetCode题解专栏:LeetCode题解 LeetCode 所有题目总结:LeetCode 所有题目总结 题目地址:Print FooB ...

  7. Java的CountDownLatch和CyclicBarrier的理解和区别

    CountDownLatch和CyclicBarrier的功能看起来很相似,不易区分,有一种谜之的神秘.本文将通过通俗的例子并结合代码讲解两者的使用方法和区别. CountDownLatch和Cycl ...

  8. java并发编程同步器 Semaphore、CyclicBarrier、Exchanger、CountDownLatch

    为什么80%的码农都做不了架构师?>>>    一.Semaphore(信号量) 注解:信号量,其实就是定义一定的数量,只有释放一个才能进去下一个,其余都得进入等待状态.比如有2个洗 ...

  9. 使用Java辅助类(CountDownLatch、CyclicBarrier、Semaphore)并发编程

    在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们就来学习一下这三个辅助类的用法 一.C ...

  10. CyclicBarrier使用与原理

    2019独角兽企业重金招聘Python工程师标准>>> CyclicBarrier当计数减少到0时,会唤醒所有阻塞在同一个Condition上的线程,与CountDownLatch不 ...

最新文章

  1. 个人代码库の设置窗体效果AnimateWindow
  2. 彻底搞懂JVM类加载器:基本概念
  3. c+还是python好-既然C+不如Java、Python,为什么还要学C+?
  4. python对话机器人软件_如何用Python为聊天机器人创建对话?
  5. 启动java服务时刷新缓存_Spring java项目对外提供服务和java进程启动时bean,内部缓存加载的先后关系?...
  6. Iirf安装配置(图文)
  7. mysql explain insert_mysql explain详解
  8. 安全狗服云手机端上架各大手机应有市场
  9. NoSQL数据库介绍
  10. flash电脑安装包_一百余款电脑软件及安装方式,忍不住收藏起来
  11. 在校开发的装柜辅助系统
  12. knockoutjs三 text和apperance的绑定
  13. 3DMAX卸载与安装教程和常见问题 适用于3DMAX2013-2020
  14. Unity判断机型iPad/iPhone
  15. 虚拟服务器virt,virt-install error,主机不支持任何虚拟化选项
  16. “不一样的六一儿童节”——暨线上公益跳绳颁奖活动圆满结束
  17. android 设置布局宽度,Android布局宽度为50%
  18. jQuery炫酷3d背景视觉差特效
  19. 计算机网络-自顶向下笔记-应用层-P2P应用
  20. Ultimaker Cura 4.9.X 开发教程 (零)

热门文章

  1. MapReduce中FileInputFormat解析
  2. 计算机功能是什么意思啊,电脑home键是什么意思有什么功能(电脑上home键的7个功能)...
  3. 七、Java 14 新特性
  4. 本体技术视点 | 聊聊Ontology上三种合约“交相辉映”的故事
  5. 木马免杀(绕过杀软)
  6. java使用aspose实现文件预览工功能
  7. 10个炫酷的网站产品卡的实现案例
  8. 拿鸡蛋python_扔鸡蛋问题python编写
  9. 未来展望:Starday供应链火力全开,为跨境电商再添动力!
  10. C#【控件篇】TextBox中只能输入16进制数的格式(长度:2,字母只能是大写)