【详解】JUC之Phaser(移相器)
目录
- 简介
- 简单的使用
- 重复使用的例子
- 动态减少
- API
- 重要API
- 注册
- 到达
- 不再等待机制
- 监控子线程任务
- 强制关闭
- 调试API
- 获取阶段数
- 获取注册的数
- 获得到达和未到达的数目
- 总结
简介
java7中引入了一种新的可重复使用的同步屏障,称为移相器Phaser。Phaser拥有与CyclicBarrier
和CountDownLatch
类似的功劳.
但是这个类提供了更加灵活的应用。CountDownLatch和CyclicBarrier都是只适用于固定数量的参与者。移相器适用于可变数目的屏障,在这个意义上,可以在任何时间注册新的参与者。并且在抵达屏障是可以注销已经注册的参与者。因此,注册到同步移相器的参与者的数目可能会随着时间的推移而变化。
如CyclicBarrier一样,移相器可以重复使用,这意味着当前参与者到达移相器后,可以再一次注册自己并等待另一次到达.
移相器的另一个重要特征是:移相器可能是分层的,这允许你以树形结构来安排移相器以减少竞争
简单的使用
public class PhaserTest {private final static Random RANDOM = new Random();public static void main(String[] args) {final Phaser phaser = new Phaser();IntStream.rangeClosed(1,5).boxed().map(i->phaser).forEach(Task::new);phaser.register();phaser.arriveAndAwaitAdvance();//相当于CountDownSystem.out.println("All of work are finished.");}static class Task extends Thread{private final Phaser phaser;Task(Phaser phaser) {this.phaser = phaser;phaser.register();//把自己加入计数器中start();}@Overridepublic void run() {System.out.println("The worker[ "+getName()+ " ]" +" is working.");try {TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}phaser.arriveAndAwaitAdvance();//自己完成,等待其他线程完成,相当于CyclicBarrier}}
}
结果:
The worker[ Thread-1 ] is working.
The worker[ Thread-2 ] is working.
The worker[ Thread-0 ] is working.
The worker[ Thread-4 ] is working.
The worker[ Thread-3 ] is working.
All of work are finished.
重复使用的例子
public class PhaserTest {private final static Random RANDOM = new Random();public static void main(String[] args) {final Phaser phaser = new Phaser(5);for (int i = 0; i < 6; i++) {new Athletes(i,phaser).start();}}static class Athletes extends Thread {private final int no;private final Phaser phaser;Athletes(int no, Phaser phaser) {this.no = no;this.phaser = phaser;}@Overridepublic void run() {try {System.out.println(no + " start running.");TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));System.out.println(no + " end running.");phaser.arriveAndAwaitAdvance();System.out.println(no + " start bicycle.");TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));System.out.println(no + " end bicycle.");phaser.arriveAndAwaitAdvance();System.out.println(no + " start long jump.");TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));System.out.println(no + " end long jump.");phaser.arriveAndAwaitAdvance();} catch (InterruptedException e) {e.printStackTrace();}}}
}
结果:
0 start running.
1 start running.
4 start running.
2 start running.
3 start running.
5 start running.
5 end running.
4 end running.
2 end running.
1 end running.
0 end running.
4 start bicycle.
......
动态减少
static class InjuredAthletes extends Thread {private final int no;private final Phaser phaser;InjuredAthletes(int no, Phaser phaser) {this.no = no;this.phaser = phaser;}@Overridepublic void run() {try {sport(no, phaser, " start running.", " end running.");sport(no, phaser, " start bicycle.", " end bicycle.");System.out.println("I am injured.");phaser.arriveAndDeregister();//动态减少} catch (InterruptedException e) {e.printStackTrace();}}
}static class Athletes extends Thread {private final int no;private final Phaser phaser;Athletes(int no, Phaser phaser) {this.no = no;this.phaser = phaser;}@Overridepublic void run() {try {sport(no, phaser, " start running.", " end running.");sport(no, phaser, " start bicycle.", " end bicycle.");sport(no, phaser, " start long jump.", " end long jump.");} catch (InterruptedException e) {e.printStackTrace();}}}private static void sport(int no, Phaser phaser, String s, String s2) throws InterruptedException {System.out.println(no + s);TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));System.out.println(no + s2);phaser.arriveAndAwaitAdvance();
}
API
重要API
注册
public int register()
public int bulkRegister(int parties)
register
- 是注册一个线程数,比较常用
bulkRegister
- 可以批量注册
到达
public int arrive()
public int arriveAndDeregister()
public int arriveAndAwaitAdvance()
arrive
- 这个到达后,不会阻塞,相当于
countdown
机制
arriveAndAwaitAdvance
- 到达后会阻塞,相当于
CyclicBarrier
机制
arriveAndDeregister
- 当线程出现异常,不能正常到达时,可以调用该方法,
动态减少注册数
举例
public class PhaserTest {private static final Random RANDOM = new Random();public static void main(String[] args) throws InterruptedException {final Phaser phaser = new Phaser(5);for (int i = 0; i < 4; i++) {new ArriveTask(i,phaser).start();}//等待全部任务进行完成phaser.arriveAndAwaitAdvance();System.out.println("The phase 1 work finish done.");}private static class ArriveTask extends Thread{private final Phaser phaser;private ArriveTask(int no,Phaser phaser) {super(String.valueOf(no));this.phaser = phaser;}@Overridepublic void run() {System.out.println(getName() + " start working. ");threadSleep();System.out.println(getName() + " The phase one is running.");phaser.arrive();threadSleep();System.out.println(getName() + " keep to other thing. ");}}private static void threadSleep() {try {TimeUnit.SECONDS.sleep(RANDOM.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}}}
不再等待机制
protected boolean onAdvance(int phase, int registeredParties)
举例
public class PhaserTest {public static void main(String[] args) throws InterruptedException {final Phaser phaser = new Phaser(2){@Overrideprotected boolean onAdvance(int phase, int registeredParties) {return true;}};new OnAdvanceTask("Alex",phaser).start();new OnAdvanceTask("Jack",phaser).start();TimeUnit.SECONDS.sleep(3);System.out.println(phaser.getArrivedParties());System.out.println(phaser.getUnarrivedParties());}static class OnAdvanceTask extends Thread{private final Phaser phaser;OnAdvanceTask(String name, Phaser phaser) {super(name);this.phaser = phaser;}@Overridepublic void run() {try {sout();TimeUnit.SECONDS.sleep(1);if (getName().equals("Alex")){System.out.println(phaser.isTerminated());sout();}} catch (InterruptedException e) {e.printStackTrace();}}private void sout() {System.out.println(getName() + " I am start and the phase " + phaser.getPhase());phaser.arriveAndAwaitAdvance();System.out.println(getName() + " I am end !");}}
}
结果:
Jack I am start and the phase 0
Alex I am start and the phase 0
Alex I am end !
Jack I am end !
true
Alex I am start and the phase -2147483647
Alex I am end !
2
0
- 默认情况,当别人调用arriveAndDeregister时,使注册的数量减到0时,直接不会陷入阻塞,返回true,相当于销毁掉
监控子线程任务
public int awaitAdvance(int phase)
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
- 相当于起到监控的作用
- 如果子线程还没有执行完成,主线程就会阻塞
- 相较而言,可以不用增加注册量
举例
public static void main(String[] args) throws InterruptedException {final Phaser phaser = new Phaser(4);for (int i = 0; i < 4; i++) {new AwaitAdvance(i,phaser).start();}//等待全部任务进行完成phaser.awaitAdvance(phaser.getPhase());System.out.println("The phase 1 work finish done.");
}
强制关闭
public void forceTermination()
public boolean isTerminated()
- 强制关闭phaser,但是
如果线程陷入阻塞,不会唤醒
调试API
获取阶段数
public final int getPhase()
- 返回当前相位数。 最大相位数为Integer.MAX_VALUE
- 每增加一轮就会加一
举例
public class PhaserTest {public static void main(String[] args) {final Phaser phaser = new Phaser(1);System.out.println(phaser.getPhase());phaser.arriveAndAwaitAdvance();System.out.println(phaser.getPhase());phaser.arriveAndAwaitAdvance();System.out.println(phaser.getPhase());phaser.arriveAndAwaitAdvance();System.out.println(phaser.getPhase());}
}
结果:
0
1
2
3
获取注册的数
public int getRegisteredParties()
- 获得注册的线程数,相当于Countdown初始的的计数器
- 可以动态更改
获得到达和未到达的数目
public int getArrivedParties()
public int getUnarrivedParties()
getArrivedParties
- 获得已经到达的线程数,和没有到达的线程数
getUnarrivedParties
- 获得没有到达的线程数,和没有到达的线程数
总结
- Phaser 可以通过
register
() 方法和arriveAndDeregister
() 方法,动态的增加或者减少注册量 - 使用
arriveAndAwaitAdvance
,相当于CyclicBarrier
机制 - 使用
arrive
,相当于countdown
机制 - 可以利用
awaitAdvance
,让主线程等待子线程全部完成任务
【详解】JUC之Phaser(移相器)相关推荐
- JUC详解 | JUC Lock
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 JUC详解 | JUC Lock锁的详解 前言 一.Lock锁的入门 1. ReentrantLock可重入锁 1.1 测试可重入性 ...
- 详解JUC高并发编程
JUC并发编程 并发编程的本质:充分利用CPU的资源 问题:JAVA可以开启线程吗? 不能:最底层是本地方法,底层是C++,Java无法直接操作硬件 1.线程里有几个状态 NEW, 新生 RUNN ...
- 40000+字超强总结?阿里P8把Java全栈知识体系详解整理成这份PDF
40000 +字长文总结,已将此文整理成PDF文档了,需要的见文后下载获取方式. 全栈知识体系总览 Java入门与进阶面向对象与Java基础 Java 基础 - 面向对象 Java 基础 - 知识点 ...
- Java JUC学习 - ConcurrentLinkedDeque 详解
Java JUC学习 - ConcurrentLinkedDeque 详解 0x00 前言 如何实现并发程序,对于Java以及其他高级语言来说都是一件并不容易的事情.在大一上学期的时候,我们学习了链表 ...
- JUC第六讲:ThreadLocal/InheritableThreadLocal详解/TTL-MDC日志上下文实践
本文是JUC第六讲:ThreadLocal/InheritableThreadLocal详解.ThreadLocal无论在项目开发还是面试中都会经常碰到,本文就 ThreadLocal 的使用.主要方 ...
- JUC详解(各种乱七八糟的锁)
文章目录 1JUC 2回顾多线程知识 3Lock锁(重点) 4Lock锁 5JUC版的生产者与消费者问题(虚假唤醒) 6有序线程 7八锁问题 8CopyOnWriteArrayList(读写复制) 9 ...
- Java JUC并发编程详解
Java JUC并发编程详解 1. JUC概述 1.1 JUC简介 1.2 进程与线程 1.2 并发与并行 1.3 用户线程和守护线程 2. Lock接口 2.1 Synchronized 2.2 什 ...
- Java - JUC详解
目录 一.了解和JUC相关的概念 二.Java线程 三.线程共享模型 一.了解和JUC相关的概念 1.1 什么是JUC? JUC是java.util.concurrent包的简称,在Java5.0添加 ...
- Java开发 - 不知道算不算详细的JUC详解
前言 大概有快两周没有发文了,这段时间不断的充实自己,算算时间,也到了该收获的时候,今天带来一篇JUC详解,但说实话,我也不敢自信它详不详细.JUC说白了就是多线程,学Java不久的人知道多线程,恐怕 ...
- AQS基础——多图详解CLH锁的原理与实现
1 什么是自旋锁和互斥锁? 由于CLH锁是一种自旋锁,那么我们先来看看自旋锁是什么? 自旋锁说白了也是一种互斥锁,只不过没有抢到锁的线程会一直自旋等待锁的释放,处于busy-waiting的状态,此时 ...
最新文章
- JSP学习笔记(五):日期处理、页面重定向、点击量统计、自动刷新和发送邮件...
- GNN教程:第六篇Spectral算法细节详解!
- C语言的预处理、编译、汇编、链接
- c# thread 编程
- Linux系统任务计划(at、crontab)的使用方法
- Java命令行界面(第4部分):命令行
- 敏捷开发绩效管理之四:为团队设立外部绩效目标(目标管理,外向型绩效)...
- 多路复用与设置阻塞、非阻塞模式
- 飞鸽传书开发者的圈子里面 有很多对飞鸽公司
- 信息学奥赛一本通C++语言——1025:保留12位小数的浮点数
- 在docker容器中安装ifconfig、ping等工具
- python操控chrome抓网页
- JavaScript比较是否在某时间段内
- 基于MATLAB的身份证号码的识别算法
- 利用ESP8266与米思齐从零制作模拟温室大棚--接线篇
- [渝粤教育] 南昌大学 生物化学 参考 资料
- 微型计算机主机的组成不包括______,微型计算机主机的主要组成部分是什么
- mysql 获取昨天日期、今天日期、明天日期以及前一个小时和后一个小时的时间
- 电脑打开网络没有WiFi列表
- win7python怎么设置环境变量_如何在win7下设置python的环境变量