目录

一、Semaphore

1、使用

2、定义

3、acquire / acquireUninterruptibly / acquireUninterruptibly / tryAcquire

4、release

5、availablePermits / drainPermits / reducePermits

二、Exchanger

1、使用

2、定义

3、exchange

4、slotExchange

5、arenaExchange


Semaphore是基于AbstractQueuedSynchronizer共享锁机制实现的,用于控制并行的执行某个任务的线程数,通常称之为信号量,注意可以一次获取或者释放多个信号量,如果使用不当会导致信号量的总数与初始时设置的总数不一致;Exchanger用于实现多个线程之间两两匹配交互数据,注意多线程下匹配是由调度决定的,基本是随机的。本篇博客就来详细探讨这两个类的实现细节。

一、Semaphore

1、使用

 @Testpublic void name() throws Exception {int num=10;//默认的非公平实现Semaphore semaphore=new Semaphore(2);CountDownLatch countDownLatch=new CountDownLatch(num);Runnable runnable=new Runnable() {@Overridepublic void run() {String name=Thread.currentThread().getName();System.out.println(name+" start,time->"+System.currentTimeMillis());try {semaphore.acquire();System.out.println(name+" acquire succ,time->"+System.currentTimeMillis());Thread.sleep(1000);}catch (Exception e){e.printStackTrace();} finally {semaphore.release();countDownLatch.countDown();}}};for(int i=0;i<num;i++) {new Thread(runnable).start();}countDownLatch.await();}

其输出如下,每次只有2个线程被唤醒:

Semaphore也是区分公平锁和非公平锁,其语义和之前的ReentrantLock中的公平锁和非公平锁是一样,两者都是基于AbstractQueuedSynchronizer实现的,测试用例如下:

@Testpublic void test() throws Exception {int num=4;//默认的非公平实现Semaphore semaphore=new Semaphore(2);//公平锁实现
//        Semaphore semaphore=new Semaphore(2,true);CountDownLatch countDownLatch=new CountDownLatch(num+4);CyclicBarrier cyclicBarrier=new CyclicBarrier(6);Runnable runnable=new Runnable() {@Overridepublic void run() {String name=Thread.currentThread().getName();System.out.println(name+" start,time->"+System.currentTimeMillis());try {semaphore.acquire();System.out.println(name+" acquire succ,time->"+System.currentTimeMillis());Thread.sleep(3000);}catch (Exception e){e.printStackTrace();} finally {semaphore.release();try {cyclicBarrier.await();} catch (Exception e) {e.printStackTrace();}}}};//先起两个线程,让他们跑起来,占有信号量for(int i=0;i<2;i++) {new Thread(runnable).start();}Thread.sleep(100);Runnable runnable2=new Runnable() {@Overridepublic void run() {String name=Thread.currentThread().getName();System.out.println(name+" start,time->"+System.currentTimeMillis());try {semaphore.acquire();System.out.println(name+" acquire succ,time->"+System.currentTimeMillis());}catch (Exception e){e.printStackTrace();} finally {semaphore.release();countDownLatch.countDown();}}};//再起4个线程排队for(int i=0;i<num;i++) {new Thread(runnable2).start();}Thread.sleep(100);Runnable runnable3=new Runnable() {@Overridepublic void run() {String name=Thread.currentThread().getName();System.out.println(name+" start,time->"+System.currentTimeMillis());try {cyclicBarrier.await();semaphore.acquire();System.out.println(name+" acquire succ,time->"+System.currentTimeMillis());}catch (Exception e){e.printStackTrace();} finally {semaphore.release();countDownLatch.countDown();}}};//起4个线程,跟上述的4个线程抢占信号量for(int i=0;i<4;i++) {new Thread(runnable3).start();}countDownLatch.await();}

非公平锁下的输出如下:

理论上,非公平锁下后面起来的4个线程6,7,8,9 会跟已经加入到等待队列准备被唤醒的2,3,4,5同时竞争信号量,上述用例中7跟5竞争时就抢到了信号量,而公平锁下,后面起来的4个线程6,7,8,9都会加入到等待队列中,按照等待队列的入队顺序被唤醒抢占信号量,6,7,8,9永远会在2,3,4,5的后面,其输出如下:

2、定义

Semaphore只有一个属性,private final Sync sync;  Sync是一个内部类,其类继承关系如下:

子类FairSync和NonfairSync就是公平锁和非公平锁的实现了,后面会详细讲解其实现细节。Semaphore默认是使用非公平锁,可以通过参数指定使用公平锁,参考其构造方法的实现,如下:

//permits表示可用的信号量的个数
public Semaphore(int permits) {sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}

AbstractQueuedSynchronizer相关方法的实现细节可以参考之前的《Java8 ReentrantLock 源码解析》和《Java8 ReentrantReadWriteLock 源码解析》,后面不再赘述。

3、acquire / acquireUninterruptibly / acquireUninterruptibly / tryAcquire

//等待的过程中被中断会抛出异常
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}//允许一次获取多个
public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);}//会一直等待直到获取成功,被中断也不会抛出异常
public void acquireUninterruptibly() {sync.acquireShared(1);}public void acquireUninterruptibly(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.acquireShared(permits);}//尝试获取,如果获取失败则直接返回,不会等待
public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;}public boolean tryAcquire(int permits) {if (permits < 0) throw new IllegalArgumentException();return sync.nonfairTryAcquireShared(permits) >= 0;}//尝试获取,如果获取失败则等待指定的时间
public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}public boolean tryAcquire(int permits, long timeout, TimeUnit unit)throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));}

上述方法的实现都是基于AbstractQueuedSynchronizer的方法, 除了nonfairTryAcquireShared方法外,该方法是Sync定义的,如下:

//返回负值表示获取失败,否则返回剩余可用通行证的数量
final int nonfairTryAcquireShared(int acquires) {for (;;) {//获取可用的通行证数量 int available = getState();//计算剩余的int remaining = available - acquires;if (remaining < 0 ||//如果remaining大于等于0,则cas修改statecompareAndSetState(available, remaining))return remaining;}}

其他方法的实现依赖于子类实现的tryAcquireShared方法,该方法表示尝试获取共享锁,返回负值表示获取锁失败,会被加入到等待队列中,两个子类的实现如下:

 Sync(int permits) {setState(permits);}static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}//非公平锁下会,如果有可用的信号量,会直接cas抢占protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}//公平锁下,会先判断是否有等待获取信号量的线程,如果有则返回-1,将新线程加入到等待队列中protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;//没有等待的线程,尝试cas抢占    int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}

4、release

//释放占有的信号量
public void release() {sync.releaseShared(1);}//释放多个信号量
public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);}

releaseShared方法是父类AbstractQueuedSynchronizer的方法,依赖于子类提供的tryReleaseShared方法表示尝试释放共享锁,返回true表示释放成功,该方法是Sync定义的,如下:

protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) //超过int的最大值了throw new Error("Maximum permit count exceeded");if (compareAndSetState(current, next)) //cas修改state,如果失败则重试return true;}}

从上述实现可知,如果使用带参数的release和acquire方法,两者必须都一致,比如都是1,否则会导致信号量比初始化设置的值要大,参考如下测试用例:

 @Testpublic void name() throws Exception {int num=6;Semaphore semaphore=new Semaphore(1);CountDownLatch countDownLatch=new CountDownLatch(num);Runnable runnable=new Runnable() {@Overridepublic void run() {String name=Thread.currentThread().getName();System.out.println(name+" start,time->"+System.currentTimeMillis());try {semaphore.acquire(1);System.out.println(name+" acquire succ,time->"+System.currentTimeMillis());Thread.sleep(3000);}catch (Exception e){e.printStackTrace();} finally {semaphore.release(5);countDownLatch.countDown();}}};for(int i=0;i<num;i++) {new Thread(runnable).start();}countDownLatch.await();}

其输出如下:

第一个线程获取信号量后休眠3s,然后归还5个信号量,将剩余的5个等待的线程依次唤醒,几乎同时获取信号量。

5、availablePermits / drainPermits / reducePermits

//获取可用的信号量数量
public int availablePermits() {return sync.getPermits();}//将当前可用信号量的数量置为0
public int drainPermits() {return sync.drainPermits();}//将信号量的数量减少
protected void reducePermits(int reduction) {if (reduction < 0) throw new IllegalArgumentException();sync.reducePermits(reduction);}

这几个方法的实现都是Sync定义的,如下:

final int getPermits() {return getState();}final void reducePermits(int reductions) {for (;;) {int current = getState();//此处没有校验reductions和current的大小,有可能减出一个负值//如果是负值,则归还信号量的时候可以抵消一部分int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");if (compareAndSetState(current, next)) //cas修改state,修改失败重试return;}}final int drainPermits() {for (;;) {int current = getState();//将state置为0if (current == 0 || compareAndSetState(current, 0))return current;}}

6、使用synchronized实现

可使用synchronized实现类似的效果,如下:

public class ObjectSemaphore {private Object lock = new Object();private int count;public ObjectSemaphore(int count) {if (count <= 0) throw new IllegalArgumentException();this.count = count;}public void acquire() throws InterruptedException {synchronized (lock) {if (count == 0) {lock.wait();}else{count--;}}}public void release() {synchronized (lock) {count++;lock.notify();}}}

可以使用第一个测试用例测试,输出是一样的。

二、Exchanger

1、使用

Exchanger用于两个线程之间交换数据,当然实际参与的线程可以不止两个,测试用例如下:

@Testpublic void test3() throws Exception {Exchanger<String> exchanger=new Exchanger<>();CountDownLatch countDownLatch=new CountDownLatch(5);Runnable runnable=new Runnable() {@Overridepublic void run() {try {String origMsg= RandomStringUtils.randomNumeric(6);//先到达的线程会在此等待,直到有一个线程跟他交互数据或者等待超时String exchangeMsg=exchanger.exchange(origMsg,5,TimeUnit.SECONDS);System.out.println(Thread.currentThread().getName()+",origMsg->"+origMsg+",exchangeMsg->"+exchangeMsg);} catch (Exception e) {e.printStackTrace();}finally {countDownLatch.countDown();}}};for(int i=0;i<5;i++){new Thread(runnable).start();}countDownLatch.await();}

第5个线程因为没有匹配的线程而等待超时,输出如下:

上述测试用例是比较简单的,可以模拟消息消费的场景来观察Exchanger的行为,测试用例如下:

@Testpublic void test4() throws Exception {Exchanger<String> exchanger=new Exchanger<>();CountDownLatch countDownLatch=new CountDownLatch(4);CyclicBarrier cyclicBarrier=new CyclicBarrier(4);//生产者Runnable producer=new Runnable() {@Overridepublic void run() {try {cyclicBarrier.await();for(int i=0;i<5;i++) {String msg=RandomStringUtils.randomNumeric(6);exchanger.exchange(msg, 5, TimeUnit.SECONDS);System.out.println(Thread.currentThread().getName() + " product msg->" + msg+",i->"+i);}} catch (Exception e) {e.printStackTrace();}finally {countDownLatch.countDown();}}};//消费者Runnable consumer=new Runnable() {@Overridepublic void run() {try {cyclicBarrier.await();for(int i=0;i<5;i++) {String msg=exchanger.exchange(null, 5, TimeUnit.SECONDS);System.out.println(Thread.currentThread().getName() + " consumer msg->" + msg+",i->"+i);}} catch (Exception e) {e.printStackTrace();}finally {countDownLatch.countDown();}}};for(int i=0;i<2;i++){new Thread(producer).start();new Thread(consumer).start();}countDownLatch.await();}

输出如下,上面生产者和消费者线程数是一样的,循环的次数也是一样的,但是还是出现等待超时的情形:

这种等待超时是概率出现的,这是为啥呢? 因为系统调度的不均衡和Exchanger底层的大量自旋等待导致这4个线程并不是调用exchange成功的次数并不一致,比如上面的Thread 2只调用了3次,另外3个线程已经调用了5次退出了,没有跟Thread 2匹配的线程了导致等待超时。另外从输出可以看出,消费者线程并没有像我们想的那样跟生产者线程一一匹配,生产者线程有时也充当了消费线程,这是为啥呢?因为Exchanger匹配时完全不关注这个线程的角色,两个线程之间的匹配完全由调度决定的,即CPU同时执行了或者紧挨着执行了两个线程,这两个线程就匹配成功了。下面来深入Exchanger的内部实现一探究竟。

2、定义

Exchanger包含的属性如下:

    //ThreadLocal变量,每个线程都有自己的一个副本private final Participant participant;//高并发下使用的,保存待匹配的Node实例private volatile Node[] arena;//低并发下,arena未初始化时使用的保存待匹配的Node实例private volatile Node slot;//初始值为0,当创建arena后会被赋值成SEQ,用来记录arena数组的可用最大索引,会随着并发的增大而增大直到等于最大值FULL,会随着并行的线程逐一匹配成功而减少恢复成初始值private volatile int bound;

还有多个表示字段偏移量的静态属性,通过static代码块初始化,如下:

Exchanger 定义了多个静态常量,如下:

    // 初始化arena时使用,1 << ASHIFT是一个缓存行的大小,避免不同的Node落入到同一个高速缓存行// 这里实际是把数组容量扩大了8倍,原来索引相邻的两个元素,扩容后中间隔了7个元素,从元素的起始地址上看就隔了8个元素,中间的7个都是空的,为了避免原来相邻的两个元素落到同一个缓存行中//因为arena是对象数组,一个元素占8字节,8个就是64字节private static final int ASHIFT = 7;//arena数组元素的索引最大值即255private static final int MMASK = 0xff;//arena数组的最大长度即256private static final int SEQ = MMASK + 1;//获取CPU核数private static final int NCPU = Runtime.getRuntime().availableProcessors();//实际的数组长度,因为是线程两两配对的,所以最大长度是核数除以2static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;//自旋等待的次数private static final int SPINS = 1 << 10;//如果交换的对象是null,则返回此对象private static final Object NULL_ITEM = new Object();//如果等待超时导致交换失败,则返回此对象private static final Object TIMED_OUT = new Object();

上述属性中Node和Participant都是内部类,其定义如下:

其中Contended注解是为了避免高速缓存行导致的伪共享问题,index用来记录arena数组的索引,bound用于记录上一次的Exchanger bound属性,collides用于记录在bound不变的情况下CAS抢占失败的次数,hash是自旋等待时计算随机数使用的,item表示当前线程请求交换的对象,match是同其他线程交换的结果,match不为null表示交换成功,parked为跟该Node关联的处于休眠状态的线程。

3、exchange

低并发,arena未初始化的情形下通过slotExchange进行交换,如果arena已经初始化了或者slotExchange因为并发的问题导致交换失败了,则通过arenaExchange进行交换。我们调用exchange方法时传入的对象可以是null,为了跟交换失败做区分,exchange方法会将其转换成NULL_ITEM,一个常量值,同理,如果交换后的结果是NULL_ITEM,则返回null。

public Exchanger() {participant = new Participant();}public V exchange(V x) throws InterruptedException {Object v;Object item = (x == null) ? NULL_ITEM : x; // translate null argsif ((arena != null ||(v = slotExchange(item, false, 0L)) == null) && //如果arena为null就会执行slotExchange,如果交换失败进入下面的判断((Thread.interrupted() || // 如果线程被中断,则抛出异常(v = arenaExchange(item, false, 0L)) == null))) //如果arenaExchange失败,因为是无期限等待,所以肯定是线程被中断了,抛出异常throw new InterruptedException();//如果交换的结果是NULL_ITEM,则返回null    return (v == NULL_ITEM) ? null : (V)v;}//逻辑同上,不过可以指定等待的时间
public V exchange(V x, long timeout, TimeUnit unit)throws InterruptedException, TimeoutException {Object v;Object item = (x == null) ? NULL_ITEM : x;long ns = unit.toNanos(timeout);if ((arena != null ||(v = slotExchange(item, true, ns)) == null) &&((Thread.interrupted() ||(v = arenaExchange(item, true, ns)) == null)))throw new InterruptedException();if (v == TIMED_OUT) //如果等待超时抛出异常throw new TimeoutException();return (v == NULL_ITEM) ? null : (V)v;}

4、slotExchange

slotExchange是基于slot属性来完成交换的,调用slotExchange方法时,如果slot属性为null,当前线程会将slot属性由null修改成当前线程的Node,如果修改失败则下一次for循环走slot属性不为null的逻辑,如果修改成功则自旋等待,自旋一定次数后通过Unsafe的park方法让当前线程休眠,可以指定休眠的时间,如果没有指定则无期限休眠直到被唤醒;无论是因为线程中断被唤醒,等待超时被唤醒还是其他线程unpark唤醒的,都会检查当前线程的Node的match属性是否为null,如果不为null说明交互成功,返回该对象;否则返回null或者TIMED_OUT,在返回前会将item,match等属性置为null,保存之前自旋时计算的hash值,方便下一次调用slotExchange。

调用slotExchange方法时,如果slot属性不为null,则当前线程会尝试将其修改null,如果cas修改成功,表示当前线程与slot属性对应的线程匹配成功,会获取slot属性对应Node的item属性,将当前线程交换的对象保存到slot属性对应Node的match属性,然后唤醒获取slot属性对应Node的waiter属性,即处理休眠状态的线程,至此交换完成,同样的在返回前需要将item,match等属性置为null,保存之前自旋时计算的hash值,方便下一次调用slotExchange;如果cas修改slot属性失败,说明有其他线程也在抢占该slot,则初始化arena属性,下一次for循环时因为arena属性不为null,直接返回null,从而通过arenaExchange完成交换。

//arena为null时会调用此方法,返回null表示交换失败
//item是交换的对象,timed表示是否等待指定的时间,为false表示无期限等待
private final Object slotExchange(Object item, boolean timed, long ns) {//获取当前线程关联的participant NodeNode p = participant.get();Thread t = Thread.currentThread();if (t.isInterrupted()) //被中断,返回nullreturn null;for (Node q;;) {if ((q = slot) != null) { //slot不为nullif (U.compareAndSwapObject(this, SLOT, q, null)) { //将slot置为null,slot对应的线程与当前线程匹配成功Object v = q.item;q.match = item;  //保存item,即完成交互Thread w = q.parked; //唤醒q对应的处于休眠状态的线程if (w != null)U.unpark(w);return v;}//slot修改失败,其他某个线程抢占了该slot,多个线程同时调用exchange方法会触发此逻辑//bound等于0表示未初始化,此处校验避免重复初始化if (NCPU > 1 && bound == 0 &&U.compareAndSwapInt(this, BOUND, 0, SEQ)) //将bound修改成seq成功arena = new Node[(FULL + 2) << ASHIFT]; //创建数组,下一次for循环就因为arena不为null,返回null了}else if (arena != null)return null; //arena不为null了,通过arenaExchange交换else {//slot和arena都为nullp.item = item;if (U.compareAndSwapObject(this, SLOT, null, p)) //修改slot为p,修改成功则终止循环break;//修改失败则继续for循环,将item恢复成null    p.item = null;}}//将slot修改为p后会进入此分支int h = p.hash; //hash初始为0long end = timed ? System.nanoTime() + ns : 0L;int spins = (NCPU > 1) ? SPINS : 1;Object v;//match保存着同其他线程交换的对象,如果不为null,说明交换成功了while ((v = p.match) == null) {//执行自旋等待if (spins > 0) {h ^= h << 1; h ^= h >>> 3; h ^= h << 10;if (h == 0)h = SPINS | (int)t.getId(); //初始化h//只有生成的h小于0时才减少spins,else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)Thread.yield();}else if (slot != p) //slot被修改了,已经有匹配的线程了,重新自旋,读取属性,因为是先修改slot再修改属性的,两者因为CPU调度的问题可能有时间差spins = SPINS;else if (!t.isInterrupted() && arena == null && //线程没有被中断且arena为null(!timed || (ns = end - System.nanoTime()) > 0L)) { //需要等待U.putObject(t, BLOCKER, this);p.parked = t;if (slot == p)U.park(false, ns);//线程被唤醒,继续下一次for循环    //如果是因为等待超时而被唤醒,下次for循环进入下面的else if分支,返回TIMED_OUTp.parked = null;U.putObject(t, BLOCKER, null);}else if (U.compareAndSwapObject(this, SLOT, p, null)) { //将slot修改成p//timed为false,无期限等待,因为中断被唤醒返回null//timed为true,因为超时被被唤醒,返回TIMED_OUT,因为中断被唤醒返回nullv = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;break;}}//修改match为null,item为null,保存h,下一次exchange是h就不是初始值0了U.putOrderedObject(p, MATCH, null);p.item = null;p.hash = h;return v;}

5、arenaExchange

arenaExchange是基于arena属性完成交换的,整体逻辑比较复杂,有以下几个要点:

  1. m的初始值就是0,index的初始值也是0,两个都是大于等于0且i不大于m,当某个线程多次尝试抢占index对应数组元素的Node都失败的情形下则尝试将m加1,然后抢占m加1对应的新数组元素,将其由null修改成当前线程关联的Node,然后自旋等待匹配;如果自旋结束,没有匹配的线程,则将m加1对应的新数组元素重新置为null,将m减1,然后再次for循环抢占其他为null的数组元素。极端并发下m会一直增加直到达到最大值FULL为止,达到FULL后只能通过for循环不断尝试与其他线程匹配或者抢占为null的数组元素,然后随着并发减少,m会一直减少到0。通过这种动态调整m的方式可以避免过多的线程基于CAS修改同一个元素导致CAS失败,提高匹配的效率,这种思想跟LongAdder的实现是一致的。
  2. 只有当m等于0的时候才会通过Unsafe park方法让线程休眠,如果不等于0,即此时存在多个并行的等待匹配的线程,则主要通过自旋的方式等待其他线程到来,这是因为交换动作本身是很快的很短暂的,通过自旋等待就可以让多个等待的线程快速的完成匹配;只有当前只剩下一个线程的时候,此时m肯定等于0,短期内没有匹配的线程,才会考虑通过park方法阻塞。
//抢占slot失败后进入此方法,arena不为空private final Object arenaExchange(Object item, boolean timed, long ns) {Node[] a = arena;Node p = participant.get();for (int i = p.index;;) { //index初始为0int b, m, c; long j;//在创建arena时,将本来的数组容量 << ASHIFT,为了避免数组元素落到了同一个高速缓存行,这里获取真实的数组元素索引时也需要 << ASHIFT                      Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);if (q != null && U.compareAndSwapObject(a, j, q, null)) { //如果q不为null,则将对应的数组元素置为null,表示当前线程和该元素对应的线程匹配了Object v = q.item;                     // releaseq.match = item; //保存item,交互成功Thread w = q.parked;if (w != null)  //唤醒等待的线程U.unpark(w);return v;}//q为null 或者q不为null,cas抢占q失败了//bound初始化时是SEQ,SEQ & MMASK就是0,即m的初始值就是0,m为0时,i肯定为0else if (i <= (m = (b = bound) & MMASK) && q == null) { //q为null,该数组元素未被占用,注意上面的cas会将数组元素置为null,下一次for循环q变成nullp.item = item;                         // offerif (U.compareAndSwapObject(a, j, null, p)) { //对应的数组元素修改为p//如果timed为true且m等于0的情形下才会等待指定的时间,否则是无期限等待long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;Thread t = Thread.currentThread(); // waitfor (int h = p.hash, spins = SPINS;;) {Object v = p.match;if (v != null) { //已经跟某个线程交换成功U.putOrderedObject(p, MATCH, null); //match和item置为null,方便下次使用p.item = null;           p.hash = h; //保存hash,下次自旋使用return v;}else if (spins > 0) { //自旋等待h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshiftif (h == 0)                // initialize hashh = SPINS | (int)t.getId();else if (h < 0 &&          // approx 50% true(--spins & ((SPINS >>> 1) - 1)) == 0)Thread.yield();        // two yields per wait}else if (U.getObjectVolatile(a, j) != p) //索引j处的数组元素发生变更spins = SPINS;       // releaser hasn't set match yetelse if (!t.isInterrupted() && m == 0 &&  //只有m等于0才会进入此分支让线程阻塞(!timed ||(ns = end - System.nanoTime()) > 0L)) { //如果需要等待U.putObject(t, BLOCKER, this); // emulate LockSupportp.parked = t;              // minimize windowif (U.getObjectVolatile(a, j) == p)  //再校验一遍,j处的数组元素未发生变更U.park(false, ns); //线程休眠//线程被唤醒,因为中断或者等待超时    p.parked = null;U.putObject(t, BLOCKER, null);}//线程被中断了或者等待超时或者m不等于0,进入此分支else if (U.getObjectVolatile(a, j) == p && // j处的数组元素未发生变更,将其置为nullU.compareAndSwapObject(a, j, p, null)) {if (m != 0)                // try to shrinkU.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); //修改bound,下一次跟MMASK求且的结果会减1,此时可能导致i大于mp.item = null;p.hash = h;i = p.index >>>= 1;        // index右移一位if (Thread.interrupted()) //线程被中断,返回nullreturn null;if (timed && m == 0 && ns <= 0L) //等待超时return TIMED_OUT;//m不等于0的情形,下一次for循环继续抢占    break;                     //终止内层for循环,继续外层的for循环,会尝试抢占其他的数组元素,然后自旋等待}}//内层for循环结束}else //数组元素修改失败,下一次for循环重试,q不为null,与该元素对应的线程匹配成功p.item = null;                     // clear offer}else { //i>m 或者q不为null,cas抢占q失败了 会进入此分支 //bound的初始值也是0if (p.bound != b) {                    //重置bound和collidesp.bound = b;p.collides = 0;//如果i等于m且m不等于0,则i=m-1,否则i=mi = (i != m || m == 0) ? m : m - 1;}//bound等于belse if ((c = p.collides) < m   || m == FULL ||!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { //修改bound,下一次跟MMASK求且的结果即m会加1,下一次进入此else分支p.bound != b//如果m小于FULL,会尝试最多m次,即进入下面的逻辑最多m次,如果还失败则增加m,然后继续尝试直到m等于FUll为止     p.collides = c + 1; //计数加1i = (i == 0) ? m : i - 1;          //i不等于0的时候,i等于i减1,等于0则i等于m,即循环的从m往后遍历arena数组的元素了}//上一个else if三个条件都为false,即p.collides >m 且m不等于FULL且cas 修改bound成功else//修改bound成功会导致下一次计算m时,m加1,此处i等于m+1,下一次for循环i等于m,会抢占m+1对应的rena数组的元素i = m + 1;                         //修改ip.index = i; //重置i}}}

Java8 Semaphore与Exchanger 源码解析相关推荐

  1. Java8 ConcurrentLinkedQueue和LinkedTransferQueue 源码解析

    目录 一.ConcurrentLinkedQueue 1.定义 2.构造方法 3.add / offer / addAll 4. peek / poll / remove 5.iterator / I ...

  2. Java并发之Semaphore源码解析

    Semaphore 前情提要 在学习本章前,需要先了解ReentrantLock源码解析,ReentrantLock源码解析里介绍的方法有很多是本章的铺垫.下面,我们进入本章正题Semaphore. ...

  3. Hotspot 对象引用Reference和Finalizer 源码解析

    目录 一.Reference 1.SoftReference / WeakReference / PhantomReference 2.定义 3.ReferenceHandler 4.Cleaner ...

  4. Java 并发编程Semaphore的应用与源码解析

    What Semaphore标识信号量,允许指定数量的线程同时访问某个资源 How 通过以下两部实现信号量: acquire方法用于获得准入许可(如果没有获得许可,则进行等待,直到有线程释放许可而获得 ...

  5. Java8 ForkJoinPool(一) 源码解析

    目录 一.ForkJoinWorkerThread 1.定义 2.run / getPoolIndex 二.InnocuousForkJoinWorkerThread 三.ForkJoinWorker ...

  6. 并发编程与源码解析 (三)

    并发编程 (三) 1 Fork/Join分解合并框架 1.1 什么是fork/join ​ Fork/Join框架是JDK1.7提供的一个用于并行执行任务的框架,开发者可以在不去了解如Thread.R ...

  7. dubbo源码解析(二)

    大家好,我是烤鸭: dubbo 源码解析: 1.服务导出 介绍: Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑.整个逻辑大致可分为三 ...

  8. dubbo源码解析(十)远程通信——Exchange层

    远程通讯--Exchange层 目标:介绍Exchange层的相关设计和逻辑.介绍dubbo-remoting-api中的exchange包内的源码解析. 前言 上一篇文章我讲的是dubbo框架设计中 ...

  9. 面试官系统精讲Java源码及大厂真题 - 08 HashMap 源码解析

    08 HashMap 源码解析 自信和希望是青年的特权. --大仲马 引导语 HashMap 源码很长,面试的问题也非常多,但这些面试问题,基本都是从源码中衍生出来的,所以我们只需要弄清楚其底层实现原 ...

最新文章

  1. SAP ABAP常用正则表达式大全
  2. ubuntu下用vagrant搭建集群环境
  3. 延迟加载并渐现内容的jquery插件lazyFade
  4. DB2 SQLCODE: -407, SQLSTATE: 23502
  5. android真机模拟gps,使用命令模拟更改gps位置
  6. 中州韵输入法 linux 小鹤双拼,‎App Store 上的“iRime输入法-小鹤双拼五笔郑码输入法”...
  7. SI9000学习笔记
  8. 三菱PLC QCPU用户手册(功能解说/程序基础篇)
  9. stata15中文乱码_如何解决 Stata 14 的中文乱码问题
  10. 计算机专业建设会议纪要,智能控制教研室会议纪要6号
  11. python回归分析
  12. C# string 保留数字英文字母
  13. SpringBoot+MyBatisPlus+Vue+ElementUI实现前后端分离的物业管理系统
  14. mac怎么设置桌面提醒
  15. 在移动端单行文本在不同的屏幕下当文本过长时增加省略号效果
  16. UG汽车模具设计之汽车模斜顶机构的设计思路,建议收藏
  17. 遥感影像分类算法C++实现(一)
  18. 好客php在线客服源码搭建教程可对接网页/小程序/微信公众号等
  19. ERP是什么?能做什么?
  20. 树莓派教程二-网络配置

热门文章

  1. 【敏感词汇过滤算法】基于DFA-前缀树的敏感词汇过滤算法(项目实用)
  2. 速卖通开店新手卖家专题 ▏速卖通新店铺运营思路分享
  3. 西储大学轴承数据处理--附MATLAB代码
  4. Elasticsearch基础2——es配置文件、jvm配置文件、log4j2.properties配置文件详解
  5. Inno Setup 打包的文件以管理员权限运行
  6. pyecharts 标准线_数据可视化之pyecharts
  7. 《认知觉醒》:第一章 大脑——一切问题的起源
  8. 【微信小程序-原生开发】系列教程目录(已完结)
  9. JVM 破坏双亲委派模型
  10. 沟通和认同:自我的塑造与展现