


CyclicBarrier cyclicBarrier = new CyclicBarrier(3, ()->{System.out.println("人满发车了");});System.out.println("一车3人,车满发车");
for (int i = 0; i < 10; i++) {int finalI = i;new Thread(()->{System.out.println("第"+ finalI +"个人上车了");try {cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}).start();



1. 先看下CyclicBarrier的结构有哪些方法和属性

private static class Generation {//是否要打破boolean broken = false;
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count;


public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();//标识屏障拦截的线程数量,相当于一个副本,作用是在达到屏障后进行重置count操作this.parties = parties;//线程计数器this.count = parties;//达到屏障点后执行的方法this.barrierCommand = barrierAction;
}public CyclicBarrier(int parties) {this(parties, null);

2. 接着我们分析一下CyclicBarrier的await()方法

public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}
}private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException//实际用的是一把独占锁           final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;if (g.broken)throw new BrokenBarrierException();//线程是否中断,如果中断的话,需要打破栅栏,并抛出中断异常    if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}int index = --count;if (index == 0) {  // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;//到达屏障点时,执行需要处理的业务if (command != null)command.run();ranAction = true;//执行下一代屏障,在这儿会将条件等待队列的元素转到同步队列,并将之前阻塞的线程唤醒,nextGeneration();return 0;} finally {//是否要打破栅栏代if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed outfor (;;) {try {if (!timed)//在线程计数器count不等于0时,会走到Condition.await()trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}
}/*** Implements interruptible condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,*      throwing IllegalMonitorStateException if it fails.* <li> Block until signalled or interrupted.* <li> Reacquire by invoking specialized version of*      {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* </ol>*/
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//添加到条件等待队列中Node node = addConditionWaiter();//释放当前拿到的ReentrantLock锁int savedState = fullyRelease(node);int interruptMode = 0;//判断是否在同步队列上while (!isOnSyncQueue(node)) {//不在的话,就阻塞当前队列LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}/*** Adds a new waiter to wait queue.* @return its new wait node*/
private Node addConditionWaiter() {//第一次进来时,lastWaiter为NullNode t = lastWaiter;// If lastWaiter is cancelled, clean out.//如果最后一个节点不为空而且不是条件等待状态if (t != null && t.waitStatus != Node.CONDITION) {//剔除已取消的等待队列,并重新给t复制lastWaiterunlinkCancelledWaiters();t = lastWaiter;}//新创建一个thread当前线程的NodeNode node = new Node(Thread.currentThread(), Node.CONDITION);//lastWaiter为空的话,新增firstWaiter为当前节点;lastWaiter不为空的话,新增t.nextWaiter为当前节点;if (t == null)firstWaiter = node;elset.nextWaiter = node;//将lastWaiter指向当前节点lastWaiter = node;return node;
}/*** Invokes release with current state value; returns saved state.* Cancels node and throws exception on failure.* @param node the condition node for this wait* @return previous sync state*/
final int fullyRelease(Node node) {boolean failed = true;try {//拿到当前的锁状态标识stateint savedState = getState();//释放锁if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}
}/*** Unlinks cancelled waiter nodes from condition queue.* Called only while holding lock. This is called when* cancellation occurred during condition wait, and upon* insertion of a new waiter when lastWaiter is seen to have* been cancelled. This method is needed to avoid garbage* retention in the absence of signals. So even though it may* require a full traversal, it comes into play only when* timeouts or cancellations occur in the absence of* signals. It traverses all nodes rather than stopping at a* particular target to unlink all pointers to garbage nodes* without requiring many re-traversals during cancellation* storms.*/
private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) {Node next = t.nextWaiter;if (t.waitStatus != Node.CONDITION) {//出队操作,由于条件等待队列是个单向链表,所以只需要将t.nextWaiter=null即可完成出队操作t.nextWaiter = null;if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)lastWaiter = trail;}elsetrail = t;t = next;}
}/*** Updates state on barrier trip and wakes up everyone.* Called only while holding lock.*/
private void nextGeneration() {// signal completion of last generation//唤醒所有队列trip.signalAll();// set up next generation //重置计数器count = parties;//新创建一个屏障generation = new Generation();
}/*** Moves all threads from the wait queue for this condition to* the wait queue for the owning lock.** @throws IllegalMonitorStateException if {@link #isHeldExclusively}*         returns {@code false}*/
public final void signalAll() {//判断是否为当前独占线程if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);
}/*** Removes and transfers all nodes.* @param first (non-null) the first node on condition queue*/
private void doSignalAll(Node first) {//直接清空条件等待队列,设计精髓所在lastWaiter = firstWaiter = null;do {//保存first的下一个节点Node next = first.nextWaiter;//first出单项链表first.nextWaiter = null;//转换到同步队列Sync并依次唤醒transferForSignal(first);//first指向下一个节点first = next;} while (first != null);
}/*** Transfers a node from a condition queue onto sync queue.* Returns true if successful.* @param node the node* @return true if successfully transferred (else the node was* cancelled before signal)*/
final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*///当前节点入队同步队列Sync,并返回node的前驱节点Node p = enq(node);int ws = p.waitStatus;//CAS将node的前驱节点p的waitStatus修改为待唤醒if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;
}/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/
private Node enq(final Node node) {//用for(;;)必须保证入队成功for (;;) {//t指向队尾Node t = tail;//如果队尾为空,说明队列为空if (t == null) { // Must initialize//CAS操作将head指向一个空Node,并将tail也指向空Nodeif (compareAndSetHead(new Node()))tail = head;} else {//如果队尾不为空,则将当前节点的前驱指向队尾node.prev = t;//CAS操作将tail指向当前节点if (compareAndSetTail(t, node)) {//由于是个双向链表,将t的后继节点指向当前节点t.next = node;return t;}}}


3. 再来看一下我们的reset()方法,可以重置我们的计数器,并释放所有线程

/*** Resets the barrier to its initial state.  If any parties are* currently waiting at the barrier, they will return with a* {@link BrokenBarrierException}. Note that resets <em>after</em>* a breakage has occurred for other reasons can be complicated to* carry out; threads need to re-synchronize in some other way,* and choose one to perform the reset.  It may be preferable to* instead create a new barrier for subsequent use.*/
public void reset() {final ReentrantLock lock = this.lock;lock.lock();try {//打破栅栏breakBarrier();   // break the current generationnextGeneration(); // start a new generation} finally {lock.unlock();}
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
private void breakBarrier() {//将是否打破标识置为truegeneration.broken = true;//重置计数器count = parties;//重置后,会唤醒之前阻塞的所有的线程trip.signalAll();




  1. CountDownLatch的计数器只能使用一次,而CyclicBarrier可以通过reset()使用多次.
  2. CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
  3. CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不 同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。 CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
  4. CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
  5. CyclicBarrier是通过ReentrantLock的"独占锁"和Conditon来实现一组线程的阻塞唤 醒的,而CountDownLatch则是通过AQS的“共享锁”实现。



