一、CountDownLatch

1、概述

1、CountDownLatch是一个同步辅助类,直译就是倒计数(CountDown)门闩(Latch)。倒计数不用说,门闩的意思就是阻止执行。在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给定的计数初始化CountDownLatch。由于调用了countDown()方法,所以在当前计数到达零之前,await()方法会一直受阻塞。之后会释放所有等待的线程,await()的所有后续调用都将立即返回。
2、主要方法:
3、应用场景
  • 在一些应用中,需要等待某个条件达到要求后才能做后面的事情;同时当所有线程都完成后,也会触发事件,以便进行后面的操作。

2、使用示例

1、不使用CountDownLatch的情况下,主线程会提前执行完,其他线程还在执行。
/*** @Date: 2022/5/26* 5个人同时约定去旅游,等人到齐之后,乘车出发*/
public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {for (int i = 1; i <= 5; i++) {final int name = i;new Thread(() -> {System.out.println(Thread.currentThread().getName() + "到达目的地,等待出发");}, "兄弟" + name).start();}System.out.println("人到齐,乘车出发");}
}
/*** 运行结果:* 兄弟3到达目的地,等待出发* 人到齐,乘车出发* 兄弟5到达目的地,等待出发* 兄弟2到达目的地,等待出发* 兄弟4到达目的地,等待出发*/
2、使用CountDownLatch
/*** @Date: 2022/5/26* 5个人同时约定去旅游,等人到齐之后,乘车出发*/
public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {//计数器CountDownLatch countDownLatch = new CountDownLatch(5);for (int i = 1; i <= 5; i++) {final int name = i;new Thread(() -> {System.out.println(Thread.currentThread().getName() + "到达目的地,等待出发");//计数减1countDownLatch.countDown();}, "兄弟" + name).start();}//等待所有线程执行完成。主线程才继续向下执行countDownLatch.await();System.out.println("人到齐,乘车出发");}
}
/*** 运行结果:* 兄弟3到达目的地,等待出发* 兄弟5到达目的地,等待出发* 兄弟2到达目的地,等待出发* 兄弟4到达目的地,等待出发* 人到齐,乘车出发*/

3、CountDownLatch原理(部分源码)

//CountDownLatch没有显示继承哪个父类或者实现哪个父接口, 它底层是AQS是通过内部类Sync来实现的
public class CountDownLatch {/*** CountDownLatch的核心实现机制利用AbstractQueuedSynchronizer,简称AQS的state状态来实现Count的阻塞机制*/private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;//构造器Sync(int count) {setState(count);}// 返回当前计数int getCount() {return getState();}/*** 试图在共享模式下获取对象状态* 该函数只是简单的判断AQS的state是否为0,为0则返回1,不为0则返回-1*/protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}/*** 试图设置状态来反映共享模式下的一个释放,重写AQS的释放状态方法,实现自己的逻辑,来削减count线程数*/protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {//获取状态int c = getState();//没有被线程占有if (c == 0)return false;//下一个状态int nextc = c-1;//比较并且设置成功if (compareAndSetState(c, nextc))return nextc == 0;}}}//同步队列private final Sync sync;/*** 构造一个用给定计数初始化的CountDownLatch,并且构造函数内完成了sync的初始化,并设置了状态数*/public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}/*** 此函数将会使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断*/public void await() throws InterruptedException {//转发到sync对象上//由源码可知,对CountDownLatch对象的await的调用会转发为对Sync的acquireSharedInterruptibly(从AQS继承的方法)方法的调用sync.acquireSharedInterruptibly(1);}/*** 此函数将递减锁存器的计数,如果计数到达零,则释放所有等待的线程*/public void countDown() {//对countDown的调用转换为对Sync对象的releaseShared(从AQS继承而来)方法的调用sync.releaseShared(1);}
}

二、CyclicBarrier

1、概述

1、CyclicBarrier是一个同步辅助类,翻译过来叫循环栅栏、循环(cyclic)屏障(barrier)。它允许一组线程到达某个公共屏障点(common barrier point)时被阻塞,直到最后一个线程到达屏障时,所有被屏障拦截的线程才会继续执行。
2、主要方法:

2、使用示例

/*** @Date: 2022/6/8* 汇总用户数据*/
public class CyclicBarrierTest {//用于保存用户数据private static ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>();private static AtomicInteger sum = new AtomicInteger(0);public static void main(String[] args) {CyclicBarrier barrier = new CyclicBarrier(3, new Thread(() -> {System.out.println("子任务全都执行完成,开始执行汇总结果...");sum.getAndAccumulate(map.values().stream().reduce(Integer::sum).get(), Integer::sum);System.out.println("总结果为:" + sum.get());}));WorkTask workTask1 = new WorkTask(barrier, 893);WorkTask workTask2 = new WorkTask(barrier, 1894);WorkTask workTask3 = new WorkTask(barrier, 3005);workTask1.start();workTask2.start();workTask3.start();}/*** 工作线程*/static class WorkTask extends Thread {//CyclicBarrierprivate CyclicBarrier barrier;//用户idprivate Integer userId;public WorkTask(CyclicBarrier barrier, Integer userId) {this.barrier = barrier;this.userId = userId;}@Overridepublic void run() {try {int num = userId + 1000;System.out.println("用户 " + userId + " 线程开始统计数据,结果为:" + num);//统计不同用户数据map.put(userId, num);//等待barrier.await();//结束等待之后继续执行System.out.println("用户 " + userId + " 线程等待结束之后继续执行其他操作!!!");} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}
}/*** 运行结果* 用户 1894 线程开始统计数据,结果为:2894* 用户 893 线程开始统计数据,结果为:1893* 用户 3005 线程开始统计数据,结果为:4005* 子任务全都执行完成,开始执行汇总结果...* 子任务汇总结果为:8792* 用户 3005 线程等待结束之后继续执行其他操作!!!* 用户 1894 线程等待结束之后继续执行其他操作!!!* 用户 893 线程等待结束之后继续执行其他操作!!!*/

3、与CountDonwLatch对比

1、CountDownLatch减计数,CyclicBarrier加计数。
2、CountDownLatch是一次性的,CyclicBarrier可以重用。
3、对于CountDownLatch来说,重点是那个“一个线程”在等待,而另外那N个线程在执行完成之后可以继续等待,可以终止。
4、对于CyclicBarrier来说,重点是那N个线程,它们之间任何一个没有完成,所有线程必须等待。

4、CyclicBarrier原理(部分源码)

1、它的实现原理:利用ReentrantLock做线程安全锁,实现线程安全等待,部分源码如下:
public class CyclicBarrier {//可重入锁private final ReentrantLock lock = new ReentrantLock();//利用lock.newCondition()实现主线程的唤醒和等待private final Condition trip = lock.newCondition();//表示同时到达屏障的线程个数private final int parties;//parties个线程到达屏障时,要执行的线程private final Runnable barrierCommand;/*** 重点看下await方法,发现调用的是dowait方法*/public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}/*** 主要屏障代码*/private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {//保存当前锁final ReentrantLock lock = this.lock;//加锁lock.lock();try {//保存当前代final Generation g = generation;//屏障被破坏,抛出异常if (g.broken)throw new BrokenBarrierException();//线程被打断if (Thread.interrupted()) {//结束当前屏障,并且唤醒所有等待的线程,只有拥有锁的时候才会调用breakBarrier();throw new InterruptedException();}//减少正在等待进入屏障的线程数量int index = --count;//正在等待进入屏障的线程数量为0,所有线程都已经进入,表示parties个线程已经到达屏障if (index == 0) {//运行的动作标识boolean ranAction = false;try {//都到达线程阈之后要执行的线程final Runnable command = barrierCommand;//线程不为空,就运行if (command != null)command.run();//设置ranAction状态ranAction = true;//进入下一代nextGeneration();return 0;} finally {//没有运行动作,就结束当前屏障if (!ranAction)breakBarrier();}}//无限循环,直到所有线程都到达设置的线程数,或者中断,超时才结束for (;;) {try {//没有设置超时时间,就等待if (!timed)trip.await();//设置了等待时间,并且等待时间大于0,就等待指定时长else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {//等于当前代并且屏障没有被损坏if (g == generation && ! g.broken) {//结束当前屏障breakBarrier();throw ie;} else {//不等于当前带后者是屏障被损坏,就中断当前线程// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}//屏障被损坏,抛出异常if (g.broken)throw new BrokenBarrierException();//不等于当前代,返回索引if (g != generation)return index;//设置了等待时间,并且等待时间小于0if (timed && nanos <= 0L) {//结束屏障,并且唤醒所有等待的线程breakBarrier();throw new TimeoutException();}}} finally {//释放锁lock.unlock();}}//其他代码省略......
}

三、Semaphore

1、概述

1、Semaphore是一个计数信号量,利用它可以控制一定数量的请求,从而实现资源访问限制的目的,实际应用中,可以用来限制访问某种资源的数量,比如在Hystrix中就有基于Semaphore的资源隔离策略。
2、最简单的理解信号量就是,一个计数器、一个等待队列、两个方法(在Java实现的Semaphore中就是acquire和release)。
3、主要方法:

2、使用示例

/*** @Date: 2022/6/11* 停车场占用车位,每走一辆才会停一辆*/
@Slf4j
public class SemaphoreTest {public static void main(String[] args) {//初始化一个信号量为3,默认是false非公平锁, 模拟3个停车位Semaphore semaphore = new Semaphore(3);//模拟多台车for (int i = 1; i <= 6; i++) {new Thread(() -> {try {//进入车位占用semaphore.acquire();log.info("车辆 " + Thread.currentThread().getName() + " 抢到车位");//停车3sTimeUnit.SECONDS.sleep(3);//释放semaphore.release();log.info("车辆 " + Thread.currentThread().getName() + " 离开车位");} catch (InterruptedException e) {e.printStackTrace();}}, i + "").start();}}
}
/*** 运行结果:* 00:50:24.311 [2] INFO com.itan.ut.SemaphoreTest - 车辆 2 抢到车位* 00:50:24.311 [1] INFO com.itan.ut.SemaphoreTest - 车辆 1 抢到车位* 00:50:24.311 [3] INFO com.itan.ut.SemaphoreTest - 车辆 3 抢到车位* 00:50:27.328 [1] INFO com.itan.ut.SemaphoreTest - 车辆 1 离开车位* 00:50:27.328 [3] INFO com.itan.ut.SemaphoreTest - 车辆 3 离开车位* 00:50:27.328 [2] INFO com.itan.ut.SemaphoreTest - 车辆 2 离开车位* 00:50:27.328 [4] INFO com.itan.ut.SemaphoreTest - 车辆 4 抢到车位* 00:50:27.328 [5] INFO com.itan.ut.SemaphoreTest - 车辆 5 抢到车位* 00:50:27.328 [6] INFO com.itan.ut.SemaphoreTest - 车辆 6 抢到车位* 00:50:30.339 [5] INFO com.itan.ut.SemaphoreTest - 车辆 5 离开车位* 00:50:30.339 [6] INFO com.itan.ut.SemaphoreTest - 车辆 6 离开车位* 00:50:30.339 [4] INFO com.itan.ut.SemaphoreTest - 车辆 4 离开车位*/

3、Semaphore实现互斥锁

1、如果Semaphore初始化时设置为1,则和synchronized效果类似
/*** @Date: 2022/6/11* Semaphore实现互斥锁,类似synchronized效果*/
@Slf4j
public class SemaphoreImplMutex {private static final Semaphore semaphore = new Semaphore(1);public static void main(String[] args) {SemaphoreImplMutex semaphoreMutex = new SemaphoreImplMutex();for (int i = 1; i < 4; i++) {new Thread(semaphoreMutex::method, "线程 " + i).start();}}public void method() {//同时只会有一个线程执行此方法!try {semaphore.acquire();log.info(Thread.currentThread().getName() + " 正在执行!");Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();log.info(Thread.currentThread().getName() + " 执行结束!");}}
}
/*** 23:05:07.154 [线程 1] INFO com.itan.syncutil.SemaphoreImplMutex - 线程 1 正在执行!* 23:05:08.180 [线程 1] INFO com.itan.syncutil.SemaphoreImplMutex - 线程 1 执行结束!* 23:05:08.186 [线程 3] INFO com.itan.syncutil.SemaphoreImplMutex - 线程 3 正在执行!* 23:05:09.200 [线程 3] INFO com.itan.syncutil.SemaphoreImplMutex - 线程 3 执行结束!* 23:05:09.200 [线程 2] INFO com.itan.syncutil.SemaphoreImplMutex - 线程 2 正在执行!* 23:05:10.201 [线程 2] INFO com.itan.syncutil.SemaphoreImplMutex - 线程 2 执行结束!*/

4、Semaphore原理(部分源码)

public class Semaphore implements java.io.Serializable {//实现自己的内部同步类private final Sync sync;//内部类,继承自AQSabstract static class Sync extends AbstractQueuedSynchronizer {//版本号private static final long serialVersionUID = 1192457210091910933L;//构造函数Sync(int permits) {//设置状态数,通过AbstractQueuedSynchronizer的状态机制实现信号量的设置setState(permits);}//获取许可final int getPermits() {return getState();}//共享模式下非公平策略获取final int nonfairTryAcquireShared(int acquires) {for (;;) {//获取许可数int available = getState();//剩余许可数int remaining = available - acquires;//许可小于0或者比较并且设置状态成功if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}//共享模式下进行释放protected final boolean tryReleaseShared(int releases) {for (;;) {//获取许可int current = getState();//可用的许可int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");//比较并进行设置成功if (compareAndSetState(current, next))return true;}}//根据指定的缩减量减小可用许可的数目final void reducePermits(int reductions) {for (;;) {//获取许可int current = getState();//可用的许可int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");//比较并进行设置成功if (compareAndSetState(current, next))return;}}//获取并返回立即可用的所有许可final int drainPermits() {for (;;) {//获取许可int current = getState();//许可为0或者比较并设置成功if (current == 0 || compareAndSetState(current, 0))return current;}}}/*** NonfairSync类继承了Sync类,表示采用非公平策略获取资源,其只有一个tryAcquireShared方法,重写了AQS的该方法* 说明: 从tryAcquireShared方法的源码可知,其会调用父类Sync的nonfairTryAcquireShared方法,表示按照非公平策略进行资源的获取*/static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {//共享模式下获取return nonfairTryAcquireShared(acquires);}}/*** FairSync类继承了Sync类,表示采用公平策略获取资源,其只有一个tryAcquireShared方法,重写了AQS的该方法* 说明: 从tryAcquireShared方法的源码可知,它使用公平策略来获取资源,它会判断同步队列中是否存在其他的等待节点*/static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {//同步队列中存在其他节点if (hasQueuedPredecessors())return -1;//获取许可int available = getState();//剩余的许可int remaining = available - acquires;//剩余的许可小于0或者比较设置成功if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}//从此信号量中获取一个许可,在提供一个许可前一直将线程阻塞public void acquire() throws InterruptedException {//调用原生的AbstractQueuedSynchronizersync.acquireSharedInterruptibly(1);}//释放一个许可,将其返回给信号量public void release() {sync.releaseShared(1);}
}

五、同步计数器及源码相关推荐

  1. [五]RabbitMQ-客户端源码之AMQChannel

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  2. 工具类五合一小程序源码星座配对+星座运势+恶搞放屁音效+引流工具+流量主

    今天给大家带来一套5合一小程序 星座配对+星座运势+恶搞放屁音效+引流工具+流量主 怎么弄?需要用到分包功能, 问:为什么是三套小程序,跟你演示的不一样啊? 答:为了防止有些人拿了东西就去闲鱼卖,只要 ...

  3. Python输入音频wav同步嘴型源码方案

    这是由ACM MM2020发表了的一篇论文,提出一个AI模型,只需要一段人物视频和一段目标语音,就能够让音频和视频合二为一,人物嘴型与音频完全匹配. [订阅栏目 获取全部的源码方案] 选一张蒙娜丽莎的 ...

  4. Java并发包源码学习系列:同步组件CountDownLatch源码解析

    文章目录 CountDownLatch概述 使用案例与基本思路 类图与基本结构 void await() boolean await(long timeout, TimeUnit unit) void ...

  5. 平方变换载波同步 matlab,matlab源码-costas载波同步环.docx

    matlab源码-costas载波同步环.docx 在利用相干解调的数字通信系统中,载波同步是正确解调的前提,也是实际通信中的一项关键技术,没有载波同步就不可能正确的恢复出数字信号.常用的载波同步方法 ...

  6. 遥望星空FTP文件同步工具(附源码)1.0 发布

    FTP文件同步工具1.0 发布 主要功能: 1.支持多任务多线程 2.支持自动启动 3.支持2种时间触发方式 4.支持任务进度显示 5.支持WinFtp Server.ftpserver.exe.Se ...

  7. datax 定时执行多个job_数据同步神器Datax源码重构

    每日一句永远不要认为我们可以逃避, 我们的每一步都决定着最后的结局, 我们的脚步正在走向我们自己选定的终点.Do not ever think about that we can escape , o ...

  8. 手把手教你五分钟扒个源码写个无敌外挂

    大家好,我是若川.源码共读<1个月,200+人,一起读了4周源码> 活动进行到第五期了,欢迎点链接加我微信 ruochuan12 报名参加. 前言 前段时间群里分享了一个小游戏,多次怀疑自 ...

  9. 五套企业网站源码下载!

    收集了一些,感觉还是不错,适合用于企业公司建站之用. 都是以ASP编写的,很实用,做了防盗链,所以只提供原下载地址啦! 一.科技工程公司网站源码 下载:http://www.codejia.com/a ...

最新文章

  1. 前端日报-20160527-underscore 源码解读
  2. Swift 与 Objective-C混合编程
  3. [3]工欲善其事必先利其器-------UML常用的图(三)
  4. 【动态规划】关于转移方程的简单理解
  5. 【深度学习】最先进的图像分类算法:FixEfficientNet-L2
  6. linux增量安装tomcat_linux与windows下tomcat的java内存设置
  7. 输入参数的数目不足_sklearn.decomposition.PCA 参数速查手册
  8. python-列表演练-根据学生id获取学生数据-获取学生数据中得分较高的前N条数据
  9. JAVA导出excel如何设置表头跨行或者跨列,跪求各位大神了
  10. 易宝典文章——怎样配置TMG能够使外部用户成功访问Outlook Anywhere?
  11. 苹果鼠标怎么充电_双十一苹果无线充电宝怎么选?充电兼容性强的品牌推荐_...
  12. 分区工具parted的详解及常用分区使用方法
  13. Markdown初使用
  14. 计算机打音乐醉赤壁,抖音确认过眼神我遇上对的人是什么歌,醉赤壁歌曲介绍...
  15. 数据分析--企业的贤内助 附下载地址
  16. latex写加上标题不显示页眉页脚
  17. 树莓派无法解析域名(即无法连网,更新软件失败)
  18. 华为鸿蒙2.0系统电脑安装步骤,华为鸿蒙系统2.0怎么安装,鸿蒙系统2.0安装教程...
  19. xnio-nio解决方法
  20. GB/T 20272-2006与GB/T 20008-2005

热门文章

  1. 2023年依照市场行情定制开发一个手机App软件需要多少钱?
  2. MindNode针对apple M1进行了优化更新
  3. 2021Java高级面试题,享学课堂java架构师课程
  4. 微信小程序开店这么火,怎么挑选第三方小程序服务商
  5. 安卓天气预报mysql_android天气预报app源码(包运行)
  6. js搭建网站 web服务器,AngularJS如何搭建web服务器?angularjs搭建web服务器的详细过程...
  7. 生鲜超市管理系统(JavaSSH)
  8. RK3399平台开发系列讲解(时间篇)RTC设备构建过程
  9. android 屏蔽 广播,Android中使用BroadcastReceiver打开和关闭WIFI
  10. JS判断是否是JSON数据