引自:https://shift-alt-ctrl.iteye.com/blog/2302923

在JAVA 1.7引入了一个新的并发API:Phaser,一个可重用的同步barrier。在此前,JAVA已经有CyclicBarrier、CountDownLatch这两种同步barrier,但是Phaser更加灵活,而且侧重于“重用”。

一、简述

1、注册机制:与其他barrier不同的是,Phaser中的“注册的同步者(parties)”会随时间而变化,Phaser可以通过构造器初始化parties个数,也可以在Phaser运行期间随时加入(register)新的parties,以及在运行期间注销(deregister)parties。运行时可以随时加入、注销parties,只会影响Phaser内部的计数器,它建立任何内部的bookkeeping(账本),因此task不能查询自己是否已经注册了,当然你可以通过实现子类来达成这一设计要求。

Java代码  
  1. //伪代码
  2. Phaser phaser = new Phaser();
  3. phaser.register();//parties count: 1
  4. ....
  5. phaser.arriveAndDeregister()://count : 0;
  6. ....

此外,CyclicBarrier、CountDownLatch需要在初始化的构造函数中指定同步者的个数,且运行时无法再次调整。

Java代码  
  1. CountDownLatch countDownLatch = new CountDownLatch(12);
  2. //count deregister parties after all
  3. //parties count is 12 all the times
  4. //if you want change the number of parties, you should create a new instance.
  5. CyclicBarrier cyclicBarrier = new CyclicBarrier(12);

2、同步机制:类似于CyclicBarrier,Phaser也可以awaited多次,它的arrivedAndAwaitAdvance()方法的效果类似于CyclicBarrier的await()。Phaser的每个周期(generation)都有一个phase数字,phase 从0开始,当所有的已注册的parties都到达后(arrive)将会导致此phase数字自增(advance),当达到Integer.MAX_VALUE后继续从0开始。这个phase数字用于表示当前parties所处于的“阶段周期”,它既可以标记和控制parties的wait行为、唤醒等待的时机。

1)Arrival:Phaser中的arrive()、arriveAndDeregister()方法,这两个方法不会阻塞(block),但是会返回相应的phase数字,当此phase中最后一个party也arrive以后,phase数字将会增加,即phase进入下一个周期,同时触发(onAdvance)那些阻塞在上一phase的线程。这一点类似于CyclicBarrier的barrier到达机制;更灵活的是,我们可以通过重写onAdvance方法来实现更多的触发行为。

2)Waiting:Phaser中的awaitAdvance()方法,需要指定一个phase数字,表示此Thread阻塞直到phase推进到此周期,arriveAndAwaitAdvance()方法阻塞到下一周期开始(或者当前phase结束)。不像CyclicBarrier,即使等待Thread已经interrupted,awaitAdvance方法会继续等待。Phaser提供了Interruptible和Timout的阻塞机制,不过当线程Interrupted或者timout之后将会抛出异常,而不会修改Phaser的内部状态。如果必要的话,你可以在遇到此类异常时,进行相应的恢复操作,通常是在调用forceTermination()方法之后。

Phaser通常在ForJoinPool中执行tasks,它可以在有task阻塞等待advance时,确保其他tasks的充分并行能力。

3、中断(终止):Phaser可以进入Termination状态,可以通过isTermination()方法判断;当Phaser被终止后,所有的同步方法将会立即返回(解除阻塞),不需要等到advance(即advance也会解除阻塞),且这些阻塞方法将会返回一个负值的phase值(awaitAdvance方法、arriveAndAwaitAdvance方法)。当然,向一个termination状态的Phaser注册party将不会有效;此时onAdvance()方法也将会返回true(默认实现),即所有的parties都会被deregister,即register个数为0。

4、Tiering(分层):Phaser可以“分层”,以tree的方式构建Phaser来降低“竞争”。如果一个Phaser中有大量parties,这会导致严重的同步竞争,所以我们可以将它们分组并共享一个parent Phaser,这样可以提高吞吐能力;Phaser中注册和注销parties都会有Child 和parent Phaser自动管理。当Child Phaser中中注册的parties变为非0时(在构造函数Phaser(Phaser parent,int parties),或者register()方法),Child Phaser将会注册到其Parent上;当Child Phaser中的parties变为0时(比如由arrivedAndDegister()方法),那么此时Child Phaser也将从其parent中注销出去。

5、监控:同步的方法只会被register操作调用,对于当前state的监控方法可以在任何时候调用,比如getRegisteredParties()获取已经注册的parties个数,getPhase()获取当前phase周期数等;因为这些方法并非同步,所以只能反映当时的瞬间状态。

二、常用的Barrier比较

1、CountDownLatch

Java代码  
//创建时,就需要指定参与的parties个数
int parties = 12;
CountDownLatch latch = new CountDownLatch(parties);
//线程池中同步task
ExecutorService executor = Executors.newFixedThreadPool(parties);
for(int i = 0; i < parties; i++) {  executor.execute(new Runnable() {  @Override  public void run() {  try {  //可以在任务执行开始时执行,表示所有的任务都启动后,主线程的await即可解除  //latch.countDown();  //run  //..  Thread.sleep(3000);  } catch (Exception e) {  }  finally {  //任务执行完毕后:到达  //表示所有的任务都结束,主线程才能继续
                latch.countDown();  }  }  });
}
latch.await();//主线程阻塞,直到所有的parties到达
//latch上所有的parties都达到后,再次执行await将不会有效,
//即barrier是不可重用的
executor.shutdown();  

2、CyclicBarrier

Java代码  
//创建时,就需要指定参与的parties个数
int parties = 12;
CyclicBarrier barrier = new CyclicBarrier(parties);
//线程池中同步task
ExecutorService executor = Executors.newFixedThreadPool(parties);
for(int i = 0; i < parties; i++) {  executor.execute(new Runnable() {  @Override  public void run() {  try {  int i = 0;  while (i < 3 && !barrier.isBroken()) {  System.out.println("generation begin:" + i + ",tid:" + Thread.currentThread().getId());  Thread.sleep(3000);  //如果所有的parties都到达,则开启新的一次周期(generation)  //barrier可以被重用
                    barrier.await();  i++;  }  } catch (Exception e) {  e.printStackTrace();  }  finally {  }  }  });
}
Thread.sleep(100000);  

3、Phaser

Java代码  
package com.thread;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;/*** 同步器:Phaser,可重用的同步屏障* @author Administrator**/
public class PhaserTest {public static void main(String[] args) {// 创建时,就需要指定参与的parties个数// 可以在创建时不指定parties// 而是在运行时,随时注册和注销新的partiesint parties = 12;Phaser phaser = new Phaser();// 主线程先注册一个// 对应下文中,主线程可以等待所有的parties到达后再解除阻塞(类似与CountDownLatch)
        phaser.register();ExecutorService executor = Executors.newFixedThreadPool(parties);for (int i = 0; i < parties; i++) {phaser.register();// 每创建一个task,我们就注册一个partyexecutor.execute(new Runnable() {@Overridepublic void run() {try {int i = 0;while (i < 3 && !phaser.isTerminated()) {//getPhase()获取当前phase周期数System.out.println("Generation:" + phaser.getPhase());Thread.sleep(3000);// 等待同一周期内,其他Task到达// 然后进入新的周期,并继续同步进行
                            phaser.arriveAndAwaitAdvance();i++;// 我们假定,运行三个周期即可
                        }} catch (Exception e) {} finally {phaser.arriveAndDeregister();}}});}// 主线程到达,且注销自己// 此后线程池中的线程即可开始按照周期,同步执行。
        phaser.arriveAndDeregister();executor.shutdown();}
}

三、API简述

1、Phaser():构造函数,创建一个Phaser;默认parties个数为0。此后我们可以通过register()、bulkRegister()方法来注册新的parties。每个Phaser实例内部,都持有几个状态数据:termination状态、已经注册的parties个数(registeredParties)、当前phase下已到达的parties个数(arrivedParties)、当前phase周期数,还有2个同步阻塞队列Queue。Queue中保存了所有的waiter,即因为advance而等待的线程信息;这两个Queue分别为evenQ和oddQ,这两个Queue在实现上没有任何区别,Queue的元素为QNode,每个QNode保存一个waiter的信息,比如Thread引用、阻塞的phase、超时的deadline、是否支持interrupted响应等。两个Queue,其中一个保存当前phase中正在使用的waiter,另一个备用,当phase为奇数时使用evenQ、oddQ备用,偶数时相反,即两个Queue轮换使用。当advance事件触发期间,新register的parties将会被放在备用的Queue中,advance只需要响应另一个Queue中的waiters即可,避免出现混乱。

2、Phaser(int parties):构造函数,初始一定数量的parties;相当于直接regsiter此数量的parties。

3、arrive():到达,阻塞,等到当前phase下其他parties到达。如果没有register(即已register数量为0),调用此方法将会抛出异常,此方法返回当前phase周期数,如果Phaser已经终止,则返回负数。

4、arriveAndDeregister():到达,并注销一个parties数量,非阻塞方法。注销,将会导致Phaser内部的parties个数减一(只影响当前phase),即下一个phase需要等待arrive的parties数量将减一。异常机制和返回值,与arrive方法一致。

5、arriveAndAwaitAdvance():到达,且阻塞直到其他parties都到达,且advance。此方法等同于awaitAdvance(arrive())。如果你希望阻塞机制支持timeout、interrupted响应,可以使用类似的其他方法(参见下文)。如果你希望到达后且注销,而且阻塞等到当前phase下其他的parties到达,可以使用awaitAdvance(arriveAndDeregister())方法组合。此方法的异常机制和返回值同arrive()。

6、awaitAdvance(int phase):阻塞方法,等待phase周期数下其他所有的parties都到达。如果指定的phase与Phaser当前的phase不一致,则立即返回。

7、awaitAdvanceInterruptibly(int phase):阻塞方法,同awaitAdvance,只是支持interrupted响应,即waiter线程如果被外部中断,则此方法立即返回,并抛出InterrutedException。

8、awaitAdvanceInterruptibly(int phase,long timeout,TimeUnit unit):阻塞方法,同awaitAdvance,支持timeout类型的interrupted响应,即当前线程阻塞等待约定的时长,超时后以TimeoutException异常方式返回。

9、forceTermination():强制终止,此后Phaser对象将不可用,即register等将不再有效。此方法将会导致Queue中所有的waiter线程被唤醒。

10、register():新注册一个party,导致Phaser内部registerPaties数量加1;如果此时onAdvance方法正在执行,此方法将会等待它执行完毕后才会返回。此方法返回当前的phase周期数,如果Phaser已经中断,将会返回负数。

11、bulkRegister(int parties):批量注册多个parties数组,规则同10、。

12、getArrivedParties():获取已经到达的parties个数。

13、getPhase():获取当前phase周期数。如果Phaser已经中断,则返回负值。

14、getRegisteredParties():获取已经注册的parties个数。

15、getUnarrivedParties():获取尚未到达的parties个数。

16、onAdvance(int phase,int registeredParties):这个方法比较特殊,表示当进入下一个phase时可以进行的事件处理,如果返回true表示此Phaser应该终止(此后将会把Phaser的状态为termination,即isTermination()将返回true。),否则可以继续进行。phase参数表示当前周期数,registeredParties表示当前已经注册的parties个数。

默认实现为:return registeredParties == 0;在很多情况下,开发者可以通过重写此方法,来实现自定义的advance时间处理机制。

内部原理,比较简单(简述):

1)两个计数器:分别表示parties个数和当前phase。register和deregister会触发parties变更(CAS),全部parties到达(arrive)会触发phase变更。

2)一个主要的阻塞队列:非AQS实现,对于arriveAndWait的线程,会被添加到队列中并被park阻塞,知道当前phase中最后一个party到达后触发唤醒。

转载于:https://www.cnblogs.com/x-jingxin/p/10655164.html

高级同步器:可重用的同步屏障Phaser相关推荐

  1. java多线程 门闩_Java线程与并发编程实践----同步器(倒计时门闩,同步屏障)...

    Java提供的synchronized关键字对临界区进行线程同步访问.由于基于synchronized很难 正确编写同步代码,并发工具类提供了高级的同步器.倒计时门闩(countdown latch) ...

  2. Java多线程同步屏障计算_Java多线程之CountDownLatch和CyclicBarrier同步屏障的使用

    一:CountDownLatch CountDownLatch是一个执行 完成任务线程数 的 倒数计数器.我们考虑这种情况:士兵晨练,必须全队士兵集合完毕才开始跑步.用程序描述就:在晨练线程中,逐个启 ...

  3. Java并发编程的艺术(八)——闭锁、同步屏障、信号量详解

    1. 闭锁:CountDownLatch 1.1 使用场景 若有多条线程,其中一条线程需要等到其他所有线程准备完所需的资源后才能运行,这样的情况可以使用闭锁. 1.2 代码实现 // 初始化闭锁,并设 ...

  4. android同步方法和对象的区别是什么,(4.1.10.8)Android Handler之同步屏障机制(sync barrier)...

    一.概述 简单理解为 异步消息插队并优先执行. 场景:排队买票 先来了一个普通用户来排队,买完票走了. 后面又来了一个VIP用户A来买票 就一直站在卖窗口这里 也不走(ps:添加屏障 ) 紧接者又来了 ...

  5. Handler sync barrier(同步屏障)

    Handler中的Message可以分为两类:同步消息.异步消息.消息类型可以通过以下函数得知 //Message.java public boolean isAsynchronous() {retu ...

  6. 同步屏障Barrier

    最近写6s081实验写到了lab7,发现了有一个barrier的概念很有意思,查了查资料总结了一下. 同步屏障可以用来管理一个应用中的不同线程,在一个应用中设立一个Barrier,当有指定数目的线程到 ...

  7. Handler机制——同步屏障

    一.消息种类 关于Handler机制的基本原理不了解可以看这里: Handler机制源码解析. Message分为3种:普通消息(同步消息).屏障消息(同步屏障)和异步消息.我们通常使用的都是普通消息 ...

  8. Android:同步屏障的简单理解和使用

    同步屏障的简单理解和使用 1.背景 2.何为同步屏障? 2.1. 发送屏障消息--postSyncBarrier 2.2.发送异步消息 2.3.处理消息 2.4.移除屏障消息--removeSyncB ...

  9. 什么是Handler的同步屏障机制?

    前言 对于Handler机制,想必大家都已经非常熟悉了吧,从迈进Android开发这扇大门的时候,就不停的研究和使用它,同样的这也是Android系统架构的精髓之一.然而在我们使用的时候,往往会忽略掉 ...

最新文章

  1. elasticsearch简单操作(二)
  2. 人群计数--Switching Convolutional Neural Network for Crowd Counting
  3. WinCE5.0中应用程序如何直接写屏
  4. vue中的倒计时跳转页面问题和axios网络请求this作用域问题
  5. python判断英文字母_Python判断两个单词的相似度
  6. python新手程序_推荐:一个适合于Python新手的入门练手项目
  7. Spring的配置文件详解
  8. C语言中的“三字母词”坑了工程师
  9. 国家开放大学2021春1050金融理论前沿课题题目
  10. Windows CE的学习路线
  11. 傅里叶变换的更多性质:相位展开、零相位窗等
  12. git使用总结(持续更新,个人总结记录使用)
  13. 科目一知识点分类梳理
  14. 浅谈利用强化学习A3C玩转超级玛丽奥
  15. linux命令— head
  16. 用java制作扑克牌_java实现简易扑克牌游戏
  17. k8s---adm构建
  18. 用 Python 实现词云可视化
  19. matlab fgetc,fgetc函数的作用是什么
  20. Android禁止EditText弹出输入法

热门文章

  1. SAP CRM Fiori Opportunity应用功能一览
  2. 慕课php进阶篇,PHP进阶篇-函数
  3. ubuntu mysql 中午_Ubuntu更改MySQL字符集UTF-8,正确显示中文
  4. 参数 中_Python中函数的参数传递
  5. 怎么做蒙特卡洛计算npv_计算机一级:这该死的“进制转换”,这种题到底怎么做?...
  6. oracle 列级外键,Oracle 中的外键与锁
  7. 关于pandas绘制图片不显示问题
  8. python中的ans是什么意思_python ans
  9. r与python自然语言处理_Python自然语言处理实践: 在NLTK中使用斯坦福中文分词器 | 我爱自然语言处理...
  10. php mysqli不识别,不识别数据库PHP MYSQLi中的密码