Java 7的并发包中推出了Phaser,其功能跟CyclicBarrier和CountDownLatch有些重叠,但是提供了更灵活的用法,例如支持动态调整注册任务的数量等。本文在Phaser自带的示例代码基础上进行一下简单的分析。

注册(Registration)

Phaser支持通过register()和bulkRegister(int parties)方法来动态调整注册任务的数量,此外也支持通过其构造函数进行指定初始数量。在适当的时机,Phaser支持减少注册任务的数量,例如 arriveAndDeregister()。单个Phaser实例允许的注册任务数的上限是65535。

到达(Arrival)

正如Phaser类的名字所暗示,每个Phaser实例都会维护一个phase number,初始值为0。每当所有注册的任务都到达Phaser时,phase number累加,并在超过Integer.MAX_VALUE后清零。arrive()和arriveAndDeregister()方法用于记录到 达,arriveAndAwaitAdvance()方法用于记录到达,并且等待其它未到达的任务。

终止(Termination)

Phaser支持终止。Phaser终止之后,调用register()和bulkRegister(int parties)方法没有任何效果,arriveAndAwaitAdvance()方法也会立即返回。触发终止的时机是在protected boolean onAdvance(int phase, int registeredParties)方法返回时,如果该方法返回true,那么Phaser会被终止。默认实现是在注册任务数为0时返回true(即 return registeredParties == 0;)。此外,forceTermination()方法用于强制终止,isTerminated()方法用于判断是否已经终止。

层次结构(Tiering)

Phaser支持层次结构,即通过构造函数Phaser(Phaser parent)和Phaser(Phaser parent, int parties)构造一个树形结构。这有助于减轻因在单个的Phaser上注册过多的任务而导致的竞争,从而提升吞吐量,代价是增加单个操作的开销。

使用案例

样例1

在有些场景下,我们希望控制多个线程的启动时机:例如在并发相关的单元测试中,有时需要控制线程的启动时机,以期获得最大程度的并发,通常我们会使用CountDownLatch,以下是使用Phaser的版本。

Java代码  

import java.util.concurrent.Phaser;

public class PhaserTest1 {

public static void main(String args[]) {

//

final int count = 5;

final Phaser phaser = new Phaser(count);

for(int i = 0; i

System.out.println("starting thread, id: " + i);

final Thread thread = new Thread(new Task(i, phaser));

thread.start();

}

}

public static class Task implements Runnable {

//

private final int id;

private final Phaser phaser;

public Task(int id, Phaser phaser) {

this.id = id;

this.phaser = phaser;

}

@Override

public void run() {

phaser.arriveAndAwaitAdvance();

System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);

}

}

}

以上例子中,由于线程是在一个循环中start,因此start的时机有一定的间隔。本例中这些线程实际开始工作的时机是在所有的线程都调用了phaser.arriveAndAwaitAdvance()之后。

此外,如果留心arriveAndAwaitAdvance()方法的签名,会发现它并没有抛出InterruptedException,实际上,即使 当前线程被中断,arriveAndAwaitAdvance()方法也不会返回,而是继续等待。如果在等待时希望可中断,或者可超时,那么需要使用以下 方法:

Java代码  

awaitAdvance(arrive())  // 等效于arriveAndAwaitAdvance()

awaitAdvanceInterruptibly(int phase)

awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)

样例2

有些时候我们希望只有在某些外部条件满足时,才真正开始任务的执行,例如:

Java代码  

import java.io.BufferedReader;

import java.io.InputStreamReader;

import java.util.concurrent.Phaser;

public class PhaserTest2 {

public static void main(String args[]) throws Exception {

//

final Phaser phaser = new Phaser(1);

for(int i = 0; i

phaser.register();

System.out.println("starting thread, id: " + i);

final Thread thread = new Thread(new Task(i, phaser));

thread.start();

}

//

System.out.println("Press ENTER to continue");

BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

reader.readLine();

phaser.arriveAndDeregister();

}

public static class Task implements Runnable {

//

private final int id;

private final Phaser phaser;

public Task(int id, Phaser phaser) {

this.id = id;

this.phaser = phaser;

}

@Override

public void run() {

phaser.arriveAndAwaitAdvance();

System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);

}

}

}

以上例子中,只有当用户按下回车之后,任务才真正开始执行。需要注意的是,arriveAndDeregister()方法不会被阻塞,并且返回到达时的phase number(arrive方法也是如此)。

样例3

CyclicBarrier支持barrier action, Phaser同样也支持。不同之处是Phaser的barrier action需要改写onAdvance方法来进行定制。

Java代码  

import java.util.concurrent.Phaser;

public class PhaserTest3 {

public static void main(String args[]) throws Exception {

//

final int count = 5;

final int phaseToTerminate = 3;

final Phaser phaser = new Phaser(count) {

@Override

protected boolean onAdvance(int phase, int registeredParties) {

System.out.println("====== " + phase + " ======");

return phase >= phaseToTerminate || registeredParties == 0;

}

};

//

for(int i = 0; i

System.out.println("starting thread, id: " + i);

final Thread thread = new Thread(new Task(i, phaser));

thread.start();

}

}

public static class Task implements Runnable {

//

private final int id;

private final Phaser phaser;

public Task(int id, Phaser phaser) {

this.id = id;

this.phaser = phaser;

}

@Override

public void run() {

do {

try {

Thread.sleep(500);

} catch(InterruptedException e) {

// NOP

}

System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);

phaser.arriveAndAwaitAdvance();

} while(!phaser.isTerminated());

}

}

}

本例中的barrier action只是简单地打印了一条信息,此外在超过指定的迭代次数后终止了Phaser。

样例4

在Smaple 3的例子中,主线程在其它工作线程结束之前已经终止。如果希望主线程等待这些工作线程结束,除了使用Thread.join()之外,也可以尝试以下的方式:

Java代码  

import java.util.concurrent.Phaser;

public class PhaserTest4 {

public static void main(String args[]) throws Exception {

//

final int count = 5;

final int phaseToTerminate = 3;

final Phaser phaser = new Phaser(count) {

@Override

protected boolean onAdvance(int phase, int registeredParties) {

System.out.println("====== " + phase + " ======");

return phase == phaseToTerminate || registeredParties == 0;

}

};

//

for(int i = 0; i

System.out.println("starting thread, id: " + i);

final Thread thread = new Thread(new Task(i, phaser));

thread.start();

}

//

phaser.register();

while (!phaser.isTerminated()) {

phaser.arriveAndAwaitAdvance();

}

System.out.println("done");

}

public static class Task implements Runnable {

//

private final int id;

private final Phaser phaser;

public Task(int id, Phaser phaser) {

this.id = id;

this.phaser = phaser;

}

@Override

public void run() {

while(!phaser.isTerminated()) {

try {

Thread.sleep(500);

} catch(InterruptedException e) {

// NOP

}

System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);

phaser.arriveAndAwaitAdvance();

}

}

}

}

如果希望主线程在特定的phase结束之后终止,那么可以在主线程中调用下述方法:

Java代码  

public static void awaitPhase(Phaser phaser, int phase) {

int p = phaser.register(); // assumes caller not already registered

while (p

if (phaser.isTerminated()) {

break; // ... deal with unexpected termination

} else {

p = phaser.arriveAndAwaitAdvance();

}

}

phaser.arriveAndDeregister();

}

需要注意的是,awaitPhase方法中的if (phaser.isTerminated()) 分支里需要能够正确处理Phaser终止的情况。否则由于在Phaser终止之后, phaser.register()和arriveAndAwaitAdvance()方法均返回负值,那么上述方法可能陷入死循环。

样例5

以下对Phaser进行分层的例子:

Java代码  

import java.util.concurrent.Phaser;

public class PhaserTest6 {

//

private static final int TASKS_PER_PHASER = 4;

public static void main(String args[]) throws Exception {

//

final int phaseToTerminate = 3;

final Phaser phaser = new Phaser() {

@Override

protected boolean onAdvance(int phase, int registeredParties) {

System.out.println("====== " + phase + " ======");

return phase == phaseToTerminate || registeredParties == 0;

}

};

//

final Task tasks[] = new Task[10];

build(tasks, 0, tasks.length, phaser);

for (int i = 0; i

System.out.println("starting thread, id: " + i);

final Thread thread = new Thread(tasks[i]);

thread.start();

}

}

public static void build(Task[] tasks, int lo, int hi, Phaser ph) {

if (hi - lo > TASKS_PER_PHASER) {

for (int i = lo; i

int j = Math.min(i + TASKS_PER_PHASER, hi);

build(tasks, i, j, new Phaser(ph));

}

} else {

for (int i = lo; i

tasks[i] = new Task(i, ph);

}

}

public static class Task implements Runnable {

//

private final int id;

private final Phaser phaser;

public Task(int id, Phaser phaser) {

this.id = id;

this.phaser = phaser;

this.phaser.register();

}

@Override

public void run() {

while (!phaser.isTerminated()) {

try {

Thread.sleep(200);

} catch (InterruptedException e) {

// NOP

}

System.out.println("in Task.run(), phase: " + phaser.getPhase()    + ", id: " + this.id);

phaser.arriveAndAwaitAdvance();

}

}

}

}

需要注意的是,TASKS_PER_PHASER的值取决于具体的Task实现。对于Task执行时间很短的场景(也就是竞争相对激烈),可以考虑使用较小的TASKS_PER_PHASER值,例如4。反之可以适当增大TASKS_PER_PHASER。

phaser java_Java 7的并发编程-Phaser相关推荐

  1. phaser java_【Java并发编程实战】-----“J.U.C”:Phaser

    Phaser由java7中推出,是Java SE 7中新增的一个使用同步工具,在功能上面它与CyclicBarrier.CountDownLatch有些重叠,但是它提供了更加灵活.强大的用法. Cyc ...

  2. 《Java 7 并发编程指南》学习概要 (3)Semaphore, CountDownLatch, CyclicBarrier , Phaser, Exchanger...

    1.Semaphore  信号量 Semaphore(信号量)是一个控制访问多个共享资源的计数器. 当一个线程想要访问某个共享资源,首先,它必须获得semaphore.如果semaphore的内部计数 ...

  3. java phaser用法_第3章Phaser的使用-(java并发编程核心方法与框架)

    3.1 Phaser的使用 3.2 类Phaser的arriveAndAwaitAdvance()方法测试1 3.3 类Phaser的arriveAndAwaitAdvance()方法测试2 3.4 ...

  4. 理解java并发工具Phaser

    java为我们提供了很多并发工具,比如Semaphore.CountDownLatch.CyclicBarrier还有我们这里要讲到的phaser. 当某些任务是分成多个步骤来执行时,并且同一个步骤之 ...

  5. 如何防止线程死锁java_Java 并发编程:如何防止在线程阻塞与唤醒时死锁

    Java并发编程:多线程如何实现阻塞与唤醒 说到suspend与resume组合有死锁倾向,一不小心将导致很多问题,甚至导致整个系统崩溃.接着看另外一种解决方案,我们可以使用以对象为目标的阻塞,即利用 ...

  6. 闭锁 java_Java并发编程之闭锁

    1.什么是闭锁? 闭锁(latch)是一种Synchronizer(Synchronizer:是一个对象,它根据本身的状态调节线程的控制流.常见类型的Synchronizer包括信号量.关卡和闭锁). ...

  7. 剑问天下java_Java并发编程精讲

    第1章 开宗明义[不看错过一个亿] 1-1 课程综述--特点和内容介绍 (16:07) 第2章 跨越第一座山"线程八大核心"[适用于纵观全貌] 2-1 纵观全貌--线程八大核心 ( ...

  8. Java 7 并发编程指南

    原文是发表在并发编程网上翻译后的 <Java 7 并发编程指南>,这里对其中的目录做个更加详细的描述,并且写出了重点说明,方便日后快速查阅.建议仔细查看每节的代码实现,非常具有参考价值.可 ...

  9. Java 多线程 并发编程------超全面

    转载自:http://blog.csdn.net/escaflone/article/details/10418651 一.多线程 1.操作系统有两个容易混淆的概念,进程和线程. 进程:一个计算机程序 ...

最新文章

  1. BGR图像与HSV图像互相转换(opencv)
  2. 数据预处理-异常值识别
  3. 使用内联函数的一个问题
  4. 6、ShardingSphere 之 读写分离
  5. 对list中的元素按照元素的属性进行排序
  6. HDU 5176 The Experience of Love 带权并查集
  7. Java屏蔽输入法_技巧:如何禁止输入法切换到全角状态
  8. android 录像 源代码,android安卓视频录制摄像拍摄源码(测试可用)
  9. 游戏服务器当中的唯一名设计方法
  10. 高校三维地图校内导航系统解决方案
  11. 计算机视觉最新研究方向,计算机视觉的主要研究的内容是什么?,计算机视觉研究方向...
  12. 博客搬家系列(六)-爬取今日头条文章
  13. EPICS -- asynRecord记录使用示例
  14. Python入门习题大全——猫和狗
  15. Excel 自动画图表
  16. 美化终端必备:ubuntu 18.04 x64安装zsh并配置oh-my-zsh(Mac适用)
  17. table()函数用法(转载)
  18. 商用及企业级服务器搭建之二:linux系统,linux命令
  19. BUG的生命周期,bug从“出生到死亡”全部过程
  20. MeterSphere 数据库使用

热门文章

  1. Honeywords项目——检查密码是否被破解的一种简单方法
  2. Struts2 标签库讲解
  3. 如何把一个数据库的数据copy到另外一个数据库
  4. 学习笔记(1):activiti6.0从入门到精通-设置流程变量(概述)
  5. AtomicReference
  6. apache commons collections CollectionUtils工具类简单使用
  7. java 工厂化生产
  8. LeetCode 542. 01 Matrix
  9. 关于maven项install时报找不到符号的错误
  10. spark内核揭秘-06-TaskSceduler启动源码解析初体验