目录

  • 简介
  • 简单的使用
  • 重复使用的例子
  • 动态减少
  • API
    • 重要API
      • 注册
      • 到达
      • 不再等待机制
      • 监控子线程任务
      • 强制关闭
    • 调试API
      • 获取阶段数
      • 获取注册的数
      • 获得到达和未到达的数目
  • 总结

简介

java7中引入了一种新的可重复使用的同步屏障,称为移相器Phaser。Phaser拥有与CyclicBarrierCountDownLatch类似的功劳.

但是这个类提供了更加灵活的应用。CountDownLatch和CyclicBarrier都是只适用于固定数量的参与者。移相器适用于可变数目的屏障,在这个意义上,可以在任何时间注册新的参与者。并且在抵达屏障是可以注销已经注册的参与者。因此,注册到同步移相器的参与者的数目可能会随着时间的推移而变化。

如CyclicBarrier一样,移相器可以重复使用,这意味着当前参与者到达移相器后,可以再一次注册自己并等待另一次到达.

移相器的另一个重要特征是:移相器可能是分层的,这允许你以树形结构来安排移相器以减少竞争

简单的使用

public class PhaserTest {private final static Random RANDOM = new Random();public static void main(String[] args) {final Phaser phaser = new Phaser();IntStream.rangeClosed(1,5).boxed().map(i->phaser).forEach(Task::new);phaser.register();phaser.arriveAndAwaitAdvance();//相当于CountDownSystem.out.println("All of work are finished.");}static class Task extends Thread{private final Phaser phaser;Task(Phaser phaser) {this.phaser = phaser;phaser.register();//把自己加入计数器中start();}@Overridepublic void run() {System.out.println("The worker[ "+getName()+ " ]" +" is working.");try {TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}phaser.arriveAndAwaitAdvance();//自己完成,等待其他线程完成,相当于CyclicBarrier}}
}

结果:

The worker[ Thread-1 ] is working.
The worker[ Thread-2 ] is working.
The worker[ Thread-0 ] is working.
The worker[ Thread-4 ] is working.
The worker[ Thread-3 ] is working.
All of work are finished.

重复使用的例子

public class PhaserTest {private final static Random RANDOM = new Random();public static void main(String[] args) {final Phaser phaser = new Phaser(5);for (int i = 0; i < 6; i++) {new Athletes(i,phaser).start();}}static class Athletes extends Thread {private final int no;private final Phaser phaser;Athletes(int no, Phaser phaser) {this.no = no;this.phaser = phaser;}@Overridepublic void run() {try {System.out.println(no + " start running.");TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));System.out.println(no + " end running.");phaser.arriveAndAwaitAdvance();System.out.println(no + " start bicycle.");TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));System.out.println(no + " end bicycle.");phaser.arriveAndAwaitAdvance();System.out.println(no + " start long jump.");TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));System.out.println(no + " end long jump.");phaser.arriveAndAwaitAdvance();} catch (InterruptedException e) {e.printStackTrace();}}}
}

结果

0 start running.
1 start running.
4 start running.
2 start running.
3 start running.
5 start running.
5 end running.
4 end running.
2 end running.
1 end running.
0 end running.
4 start bicycle.
......

动态减少

static class InjuredAthletes extends Thread {private final int no;private final Phaser phaser;InjuredAthletes(int no, Phaser phaser) {this.no = no;this.phaser = phaser;}@Overridepublic void run() {try {sport(no, phaser, " start running.", " end running.");sport(no, phaser, " start bicycle.", " end bicycle.");System.out.println("I am injured.");phaser.arriveAndDeregister();//动态减少} catch (InterruptedException e) {e.printStackTrace();}}
}static class Athletes extends Thread {private final int no;private final Phaser phaser;Athletes(int no, Phaser phaser) {this.no = no;this.phaser = phaser;}@Overridepublic void run() {try {sport(no, phaser, " start running.", " end running.");sport(no, phaser, " start bicycle.", " end bicycle.");sport(no, phaser, " start long jump.", " end long jump.");} catch (InterruptedException e) {e.printStackTrace();}}}private static void sport(int no, Phaser phaser, String s, String s2) throws InterruptedException {System.out.println(no + s);TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));System.out.println(no + s2);phaser.arriveAndAwaitAdvance();
}

API

重要API

注册

public int register()
public int bulkRegister(int parties)

register

  • 是注册一个线程数,比较常用

bulkRegister

  • 可以批量注册

到达

public int arrive()
public int arriveAndDeregister()
public int arriveAndAwaitAdvance()

arrive

  • 这个到达后,不会阻塞,相当于countdown机制

arriveAndAwaitAdvance

  • 到达后会阻塞,相当于CyclicBarrier机制

arriveAndDeregister

  • 当线程出现异常,不能正常到达时,可以调用该方法,动态减少注册数

举例

public class PhaserTest {private static final Random RANDOM = new Random();public static void main(String[] args) throws InterruptedException {final Phaser phaser = new Phaser(5);for (int i = 0; i < 4; i++) {new ArriveTask(i,phaser).start();}//等待全部任务进行完成phaser.arriveAndAwaitAdvance();System.out.println("The phase 1 work finish done.");}private static class ArriveTask extends Thread{private final Phaser phaser;private ArriveTask(int no,Phaser phaser) {super(String.valueOf(no));this.phaser = phaser;}@Overridepublic void run() {System.out.println(getName() +  " start working. ");threadSleep();System.out.println(getName() + " The phase one is running.");phaser.arrive();threadSleep();System.out.println(getName() +  " keep to other thing. ");}}private static void threadSleep()  {try {TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}}}

不再等待机制

protected boolean onAdvance(int phase, int registeredParties)

举例

public class PhaserTest {public static void main(String[] args) throws InterruptedException {final Phaser phaser = new Phaser(2){@Overrideprotected boolean onAdvance(int phase, int registeredParties) {return true;}};new OnAdvanceTask("Alex",phaser).start();new OnAdvanceTask("Jack",phaser).start();TimeUnit.SECONDS.sleep(3);System.out.println(phaser.getArrivedParties());System.out.println(phaser.getUnarrivedParties());}static class OnAdvanceTask extends Thread{private final Phaser phaser;OnAdvanceTask(String name, Phaser phaser) {super(name);this.phaser = phaser;}@Overridepublic void run() {try {sout();TimeUnit.SECONDS.sleep(1);if (getName().equals("Alex")){System.out.println(phaser.isTerminated());sout();}} catch (InterruptedException e) {e.printStackTrace();}}private void sout() {System.out.println(getName() + " I am start and the phase " + phaser.getPhase());phaser.arriveAndAwaitAdvance();System.out.println(getName() + " I am end !");}}
}

结果

Jack I am start and the phase 0
Alex I am start and the phase 0
Alex I am end !
Jack I am end !
true
Alex I am start and the phase -2147483647
Alex I am end !
2
0
  • 默认情况,当别人调用arriveAndDeregister时,使注册的数量减到0时,直接不会陷入阻塞,返回true,相当于销毁掉

监控子线程任务

public int awaitAdvance(int phase)
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
  • 相当于起到监控的作用
  • 如果子线程还没有执行完成,主线程就会阻塞
  • 相较而言,可以不用增加注册量

举例

public static void main(String[] args) throws InterruptedException {final Phaser phaser = new Phaser(4);for (int i = 0; i < 4; i++) {new AwaitAdvance(i,phaser).start();}//等待全部任务进行完成phaser.awaitAdvance(phaser.getPhase());System.out.println("The phase 1 work finish done.");
}

强制关闭

public void forceTermination()
public boolean isTerminated()
  • 强制关闭phaser,但是如果线程陷入阻塞,不会唤醒

调试API

获取阶段数

public final int getPhase()
  • 返回当前相位数。 最大相位数为Integer.MAX_VALUE
  • 每增加一轮就会加一

举例

public class PhaserTest {public static void main(String[] args) {final Phaser phaser = new Phaser(1);System.out.println(phaser.getPhase());phaser.arriveAndAwaitAdvance();System.out.println(phaser.getPhase());phaser.arriveAndAwaitAdvance();System.out.println(phaser.getPhase());phaser.arriveAndAwaitAdvance();System.out.println(phaser.getPhase());}
}

结果

0
1
2
3

获取注册的数

public int getRegisteredParties()
  • 获得注册的线程数,相当于Countdown初始的的计数器
  • 可以动态更改

获得到达和未到达的数目

public int getArrivedParties()
public int getUnarrivedParties()

getArrivedParties

  • 获得已经到达的线程数,和没有到达的线程数

getUnarrivedParties

  • 获得没有到达的线程数,和没有到达的线程数

总结

  • Phaser 可以通过register() 方法和arriveAndDeregister() 方法,动态的增加或者减少注册量
  • 使用arriveAndAwaitAdvance,相当于CyclicBarrier机制
  • 使用arrive,相当于countdown机制
  • 可以利用awaitAdvance,让主线程等待子线程全部完成任务

【详解】JUC之Phaser(移相器)相关推荐

  1. JUC详解 | JUC Lock

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 JUC详解 | JUC Lock锁的详解 前言 一.Lock锁的入门 1. ReentrantLock可重入锁 1.1 测试可重入性 ...

  2. 详解JUC高并发编程

    JUC并发编程 并发编程的本质:充分利用CPU的资源 问题:JAVA可以开启线程吗? ​ 不能:最底层是本地方法,底层是C++,Java无法直接操作硬件 1.线程里有几个状态 NEW, 新生 RUNN ...

  3. 40000+字超强总结?阿里P8把Java全栈知识体系详解整理成这份PDF

    40000 +字长文总结,已将此文整理成PDF文档了,需要的见文后下载获取方式. 全栈知识体系总览 Java入门与进阶面向对象与Java基础 Java 基础 - 面向对象 Java 基础 - 知识点 ...

  4. Java JUC学习 - ConcurrentLinkedDeque 详解

    Java JUC学习 - ConcurrentLinkedDeque 详解 0x00 前言 如何实现并发程序,对于Java以及其他高级语言来说都是一件并不容易的事情.在大一上学期的时候,我们学习了链表 ...

  5. JUC第六讲:ThreadLocal/InheritableThreadLocal详解/TTL-MDC日志上下文实践

    本文是JUC第六讲:ThreadLocal/InheritableThreadLocal详解.ThreadLocal无论在项目开发还是面试中都会经常碰到,本文就 ThreadLocal 的使用.主要方 ...

  6. JUC详解(各种乱七八糟的锁)

    文章目录 1JUC 2回顾多线程知识 3Lock锁(重点) 4Lock锁 5JUC版的生产者与消费者问题(虚假唤醒) 6有序线程 7八锁问题 8CopyOnWriteArrayList(读写复制) 9 ...

  7. Java JUC并发编程详解

    Java JUC并发编程详解 1. JUC概述 1.1 JUC简介 1.2 进程与线程 1.2 并发与并行 1.3 用户线程和守护线程 2. Lock接口 2.1 Synchronized 2.2 什 ...

  8. Java - JUC详解

    目录 一.了解和JUC相关的概念 二.Java线程 三.线程共享模型 一.了解和JUC相关的概念 1.1 什么是JUC? JUC是java.util.concurrent包的简称,在Java5.0添加 ...

  9. Java开发 - 不知道算不算详细的JUC详解

    前言 大概有快两周没有发文了,这段时间不断的充实自己,算算时间,也到了该收获的时候,今天带来一篇JUC详解,但说实话,我也不敢自信它详不详细.JUC说白了就是多线程,学Java不久的人知道多线程,恐怕 ...

  10. AQS基础——多图详解CLH锁的原理与实现

    1 什么是自旋锁和互斥锁? 由于CLH锁是一种自旋锁,那么我们先来看看自旋锁是什么? 自旋锁说白了也是一种互斥锁,只不过没有抢到锁的线程会一直自旋等待锁的释放,处于busy-waiting的状态,此时 ...

最新文章

  1. JSP学习笔记(五):日期处理、页面重定向、点击量统计、自动刷新和发送邮件...
  2. GNN教程:第六篇Spectral算法细节详解!
  3. C语言的预处理、编译、汇编、链接
  4. c# thread 编程
  5. Linux系统任务计划(at、crontab)的使用方法
  6. Java命令行界面(第4部分):命令行
  7. 敏捷开发绩效管理之四:为团队设立外部绩效目标(目标管理,外向型绩效)...
  8. 多路复用与设置阻塞、非阻塞模式
  9. 飞鸽传书开发者的圈子里面 有很多对飞鸽公司
  10. 信息学奥赛一本通C++语言——1025:保留12位小数的浮点数
  11. 在docker容器中安装ifconfig、ping等工具
  12. python操控chrome抓网页
  13. JavaScript比较是否在某时间段内
  14. 基于MATLAB的身份证号码的识别算法
  15. 利用ESP8266与米思齐从零制作模拟温室大棚--接线篇
  16. [渝粤教育] 南昌大学 生物化学 参考 资料
  17. 微型计算机主机的组成不包括______,微型计算机主机的主要组成部分是什么
  18. mysql 获取昨天日期、今天日期、明天日期以及前一个小时和后一个小时的时间
  19. 电脑打开网络没有WiFi列表
  20. win7python怎么设置环境变量_如何在win7下设置python的环境变量

热门文章

  1. c语言atoi函数定义,C语言函数 atoi()
  2. 贝贝网融资一亿美金,唱衰垂直电商是否言之过早?
  3. pip3 install pandas
  4. SQL数据库损坏,报错,原因及注意事项
  5. Puppet+foreman 本地yum安装手册
  6. pHP 眼病,眼科疾病phpv什么意思 深入了解永存原始玻璃体增生症
  7. 攻击和利用TCP的拥塞控制
  8. Java集合部分学习+源码解析
  9. 这是你想要的 Go 技术 leader 吗?
  10. 【小技巧】FPGA开发流程普通流程