Java ThreadConcurrency(9): 深入理解StampedLock及其实现原理
背景(注释):
一个基于容量并且带有三种模式的锁,用于控制读取/写入访问。StampedLock的状态由版本和模式组成。锁获取操作返回一个用于展示和访问锁状态的邮编(stamp)变量:这些方法的"try"版本通过返回0代表获取锁失败。锁释放以及其他相关方法需要使用邮编(stamps)变量作为参数,如果他们和当前锁状态不符则失败,这三种模式为:
- 写入:方法writeLock可能为了获取独占访问而阻塞当前线程,返回一个邮编变量,能够在unlockWrite方法中使用从而释放锁。限时和立即版本的tryWriteLock也提供了支持。当锁被写模式所占有,没有读或者乐观的读操作能够成功。
- 读取:方法readLock可能为了获取非独占访问而阻塞当前线程,返回一个邮编变量,能够在unlockRead方法中用于释放锁。限时和立即版本的tryReadLock也提供了支持。
- 乐观读取:是方法tryOptimisticRead返回一个非0邮编变量,仅在当前锁没有以写入模式被持有。方法validate返回true如果在获得邮编变量(stamp)之后没有被写模式持有。这种模式可以被看做一种弱版本的读锁,可以被一个写入者在任何时间打断。乐观读取模式仅用于短时间读取操作时经常能够降低竞争和提高吞吐量。当然,它的使用在本质上是脆弱的。乐观读取的区域应该只包括字段,并且在validation之后用局部变量持有它们从而在后续使用。乐观模式下读取的字段值很可能是非常不一致的,所以它应该只用于那些你熟悉如何展示数据,从而你可以不断检查一致性和调用方法validate。比如,当我们第一次读取对象或者数组引用,然后检查它的字段、元素以及方法时。
class Point {private double x, y;private final StampedLock sl = new StampedLock();void move(double deltaX, double deltaY) { // an exclusively locked methodlong stamp = sl.writeLock();try {x += deltaX;y += deltaY;} finally {sl.unlockWrite(stamp);}}double distanceFromOrigin() { // A read-only methodlong stamp = sl.tryOptimisticRead();double currentX = x, currentY = y;if (!sl.validate(stamp)) {stamp = sl.readLock();try {currentX = x;currentY = y;} finally {sl.unlockRead(stamp);}}return Math.sqrt(currentX * currentX + currentY * currentY);}void moveIfAtOrigin(double newX, double newY) { // upgrade// Could instead start with optimistic, not read modelong stamp = sl.readLock();try {while (x == 0.0 && y == 0.0) {long ws = sl.tryConvertToWriteLock(stamp);if (ws != 0L) {stamp = ws;x = newX;y = newY;break;}else {sl.unlockRead(stamp);stamp = sl.writeLock();}}} finally {sl.unlock(stamp);}}}
算法(注释):
public long readLock() {long s = state, next; // bypass acquireRead on common uncontended casereturn ((whead == wtail && (s & ABITS) < RFULL &&U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?next : acquireRead(false, 0L));}
private long acquireRead(boolean interruptible, long deadline) {WNode node = null, p;for (int spins = -1;;) {WNode h;if ((h = whead) == (p = wtail)) {for (long m, s, ns;;) {if ((m = (s = state) & ABITS) < RFULL ?U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))return ns;else if (m >= WBIT) {if (spins > 0) {if (LockSupport.nextSecondarySeed() >= 0)--spins;}else {if (spins == 0) {WNode nh = whead, np = wtail;if ((nh == h && np == p) || (h = nh) != (p = np))break;}spins = SPINS;}}}}if (p == null) { // initialize queueWNode hd = new WNode(WMODE, null);if (U.compareAndSwapObject(this, WHEAD, null, hd))wtail = hd;}else if (node == null)node = new WNode(RMODE, p);else if (h == p || p.mode != RMODE) {if (node.prev != p)node.prev = p;else if (U.compareAndSwapObject(this, WTAIL, p, node)) {p.next = node;break;}}else if (!U.compareAndSwapObject(p, WCOWAIT,node.cowait = p.cowait, node))node.cowait = null;else {for (;;) {WNode pp, c; Thread w;if ((h = whead) != null && (c = h.cowait) != null &&U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&(w = c.thread) != null) // help releaseU.unpark(w);if (h == (pp = p.prev) || h == p || pp == null) {long m, s, ns;do {if ((m = (s = state) & ABITS) < RFULL ?U.compareAndSwapLong(this, STATE, s,ns = s + RUNIT) :(m < WBIT &&(ns = tryIncReaderOverflow(s)) != 0L))return ns;} while (m < WBIT);}if (whead == h && p.prev == pp) {long time;if (pp == null || h == p || p.status > 0) {node = null; // throw awaybreak;}if (deadline == 0L)time = 0L;else if ((time = deadline - System.nanoTime()) <= 0L)return cancelWaiter(node, p, false);Thread wt = Thread.currentThread();U.putObject(wt, PARKBLOCKER, this);node.thread = wt;if ((h != pp || (state & ABITS) == WBIT) &&whead == h && p.prev == pp)U.park(false, time);node.thread = null;U.putObject(wt, PARKBLOCKER, null);if (interruptible && Thread.interrupted())return cancelWaiter(node, p, true);}}}}for (int spins = -1;;) {WNode h, np, pp; int ps;if ((h = whead) == p) {if (spins < 0)spins = HEAD_SPINS;else if (spins < MAX_HEAD_SPINS)spins <<= 1;for (int k = spins;;) { // spin at headlong m, s, ns;if ((m = (s = state) & ABITS) < RFULL ?U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {WNode c; Thread w;whead = node;node.prev = null;while ((c = node.cowait) != null) {if (U.compareAndSwapObject(node, WCOWAIT,c, c.cowait) &&(w = c.thread) != null)U.unpark(w);}return ns;}else if (m >= WBIT &&LockSupport.nextSecondarySeed() >= 0 && --k <= 0)break;}}else if (h != null) {WNode c; Thread w;while ((c = h.cowait) != null) {if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&(w = c.thread) != null)U.unpark(w);}}if (whead == h) {if ((np = node.prev) != p) {if (np != null)(p = np).next = node; // stale}else if ((ps = p.status) == 0)U.compareAndSwapInt(p, WSTATUS, 0, WAITING);else if (ps == CANCELLED) {if ((pp = p.prev) != null) {node.prev = pp;pp.next = node;}}else {long time;if (deadline == 0L)time = 0L;else if ((time = deadline - System.nanoTime()) <= 0L)return cancelWaiter(node, node, false);Thread wt = Thread.currentThread();U.putObject(wt, PARKBLOCKER, this);node.thread = wt;if (p.status < 0 &&(p != h || (state & ABITS) == WBIT) &&whead == h && node.prev == p)U.park(false, time);node.thread = null;U.putObject(wt, PARKBLOCKER, null);if (interruptible && Thread.interrupted())return cancelWaiter(node, node, true);}}}}
这里的readLock会首先尝试直接CAS改变state(在whead==wtail和(s & ABITS) < RFULL的情况下),成功的话直接返回stamp(next)。
- 传入的两参数:interruptible和deadline,前者标识是否允许中断,后者标识超时限时(0代表不限时),然后进入循环。
- 首先取得whead和wtail两个值,假如这两个值不等说明已经又入队者了,那么获取读锁没希望了。
- 否则再进入内层循环,会尽力尝试通过前7bit上递增state来获取锁 ,这里采取的方式极具特色,采用了另一个readerOverflow了计数另外增加的读者数量,state的前七位记录到126之后就会稳定在这个值,偶尔会到127,但是超出126的部分最终到了readerOverflow,加入获取了锁就返回stamp。
- 假如m >= WBIT,也就是说m(state前8位)值大于或等于128,那么说明当前锁已经被写者独占,那么我们尝试自旋+随机的方式来探测状态,并且在当前队列和进入循环前一样(说明还没有其他入队者)或者当前队列中已经有了入队者的情况下内层循环跳出,接着肯定会入队。
- 首先根据尾节点为null的情况探测是否初始化队列,使用一个WMODE模式的节点初始化whead和wtail。
- 然后假如当前节点为空则构建当前节点,模式为RMODE,前驱节点为p即尾节点。
- 接着查看,假如当前队列为空即只有一个节点(whead=wtail)或者当前尾节点的模式不是RMODE,那么我们会尝试在尾节点后面添加该节点作为尾节点,然后跳出外层循环;假如当前队列不为空并且当前尾节点模式就是RMODE,那么我们会尝试下一步:添加该节点到尾节点的cowait链(实际上构成一个stack)中。
- 通过CAS方法将该节点node添加至尾节点的cowait链中,node成为cowait中的顶元素,cowait构成了一个LIFO队列。
- 成功后进入另一个循环,首先尝试unpark头元素(whead)的cowait中的第一个元素,这只是一种辅助作用,因为头元素whead所伴随的那个线程(假如存在)必定是已经获得锁了,假如是读锁那么也必定会释放cowait链。
- 假如当前节点node所在的根节点p的前驱就是whead或者p已经是whead或者p的前驱为null,那么我们会根据state再次积极的尝试获取锁(当m < WBIT)。
- 否则我们探测当前队列是否稳定:whead == h && p.prev == pp,在稳定的情况下,假如发现p成为过head或者p已经被取消(status>0),我们尝试node=null,并且跳出当前循环,回到一开的循环里面去尝试获取锁(这样做的原因是被其他线程闯入夺取了锁,或者p已经被取消)。
- 接着我们判断是否为限时版本,以及限时版本所需时间。
- 然后park当前线程以及可能出现的中断情况下取消当前节点的cancelWaiter操作。
- 这样p作为当前节点的前驱节点,假如正好是whead的话,那么会尝试自旋+随机的方式在积极得探测state,从而能够取得锁。并且在获得锁,重置whead和node.prev=null之后释放当前cowait链中的节点。最后返回stamp。
- 否则只需h不为null时尝试释放当前头节点的cowait链,作为一种协作的积极行动。
- 然后在whead==h即队列稳定时,首先会CAS操作当前节点前驱的status,从0变为WAITING从而指示后面有等待的节点。假如发现p的状态已经为取消了,则重新选择node的前驱。
- 前面的这些都处理完成之后,使用类似的park以及cancelWaiter操作。区别在于这里的p.status<0必须保证(因为等待状态WAITING是-1)。
public long writeLock() {long s, next; // bypass acquireWrite in fully unlocked case onlyreturn ((((s = state) & ABITS) == 0L &&U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?next : acquireWrite(false, 0L));}
private long acquireWrite(boolean interruptible, long deadline) {WNode node = null, p;for (int spins = -1;;) { // spin while enqueuinglong m, s, ns;if ((m = (s = state) & ABITS) == 0L) {if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))return ns;}else if (spins < 0)spins = (m == WBIT && wtail == whead) ? SPINS : 0;else if (spins > 0) {if (LockSupport.nextSecondarySeed() >= 0)--spins;}else if ((p = wtail) == null) { // initialize queueWNode hd = new WNode(WMODE, null);if (U.compareAndSwapObject(this, WHEAD, null, hd))wtail = hd;}else if (node == null)node = new WNode(WMODE, p);else if (node.prev != p)node.prev = p;else if (U.compareAndSwapObject(this, WTAIL, p, node)) {p.next = node;break;}}for (int spins = -1;;) {WNode h, np, pp; int ps;if ((h = whead) == p) {if (spins < 0)spins = HEAD_SPINS;else if (spins < MAX_HEAD_SPINS)spins <<= 1;for (int k = spins;;) { // spin at headlong s, ns;if (((s = state) & ABITS) == 0L) {if (U.compareAndSwapLong(this, STATE, s,ns = s + WBIT)) {whead = node;node.prev = null;return ns;}}else if (LockSupport.nextSecondarySeed() >= 0 &&--k <= 0)break;}}else if (h != null) { // help release stale waitersWNode c; Thread w;while ((c = h.cowait) != null) {if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&(w = c.thread) != null)U.unpark(w);}}if (whead == h) {if ((np = node.prev) != p) {if (np != null)(p = np).next = node; // stale}else if ((ps = p.status) == 0)U.compareAndSwapInt(p, WSTATUS, 0, WAITING);else if (ps == CANCELLED) {if ((pp = p.prev) != null) {node.prev = pp;pp.next = node;}}else {long time; // 0 argument to park means no timeoutif (deadline == 0L)time = 0L;else if ((time = deadline - System.nanoTime()) <= 0L)return cancelWaiter(node, node, false);Thread wt = Thread.currentThread();U.putObject(wt, PARKBLOCKER, this);node.thread = wt;if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&whead == h && node.prev == p)U.park(false, time); // emulate LockSupport.parknode.thread = null;U.putObject(wt, PARKBLOCKER, null);if (interruptible && Thread.interrupted())return cancelWaiter(node, node, true);}}}}
- 这里同样是以自旋+随机的方式来探测锁,只不过探测值变为了m = (s = state) & ABITS) == 0L。
- 之后的过程同样是初始化队列,构造节点,设置前驱,拼接节点(对于写入锁只有在队列尾部拼接一种选择)。
- 这之后的操作事实上和读取锁中的根节点的操作类似,都是会先积极尝试自旋在whead节点上,然后清理头结点,以及前驱的被取消节点,最后才park住。
private long cancelWaiter(WNode node, WNode group, boolean interrupted) {if (node != null && group != null) {Thread w;node.status = CANCELLED;// unsplice cancelled nodes from groupfor (WNode p = group, q; (q = p.cowait) != null;) {if (q.status == CANCELLED) {U.compareAndSwapObject(p, WCOWAIT, q, q.cowait);p = group; // restart}elsep = q;}if (group == node) {for (WNode r = group.cowait; r != null; r = r.cowait) {if ((w = r.thread) != null)U.unpark(w); // wake up uncancelled co-waiters}for (WNode pred = node.prev; pred != null; ) { // unspliceWNode succ, pp; // find valid successorwhile ((succ = node.next) == null ||succ.status == CANCELLED) {WNode q = null; // find successor the slow wayfor (WNode t = wtail; t != null && t != node; t = t.prev)if (t.status != CANCELLED)q = t; // don't link if succ cancelledif (succ == q || // ensure accurate successorU.compareAndSwapObject(node, WNEXT,succ, succ = q)) {if (succ == null && node == wtail)U.compareAndSwapObject(this, WTAIL, node, pred);break;}}if (pred.next == node) // unsplice pred linkU.compareAndSwapObject(pred, WNEXT, node, succ);if (succ != null && (w = succ.thread) != null) {succ.thread = null;U.unpark(w); // wake up succ to observe new pred}if (pred.status != CANCELLED || (pp = pred.prev) == null)break;node.prev = pp; // repeat if new pred wrong/cancelledU.compareAndSwapObject(pp, WNEXT, pred, succ);pred = pp;}}}WNode h; // Possibly release first waiterwhile ((h = whead) != null) {long s; WNode q; // similar to release() but check eligibilityif ((q = h.next) == null || q.status == CANCELLED) {for (WNode t = wtail; t != null && t != h; t = t.prev)if (t.status <= 0)q = t;}if (h == whead) {if (q != null && h.status == 0 &&((s = state) & ABITS) != WBIT && // waiter is eligible(s == 0L || q.mode == RMODE))release(h);break;}}return (interrupted || Thread.interrupted()) ? INTERRUPTED : 0L;}
这里三个参数,node:需要删除的节点、group:可能的聚合节点、interrupted:是否被中断。
- 首先设置node的状态为CANCELLED,可以向其他线程传递这个节点是删除了的信息。
- 然后再聚合节点gruop上清理所有状态为CANCELLED的节点(即删除节点)
- 接下来假如当期node节点本身就是聚合节点,那么首先唤醒cowait链中的所有节点(读者),寻找到node后面的第一个非CANCELLED节点,直接拼接到pred上(从而删除当前节点),然后再检查前驱节点状态,假如为CANCELLED则需也需要重置前驱节点。
- 最后,在队列中不为空,并且头结点的状态为0即队列中的节点还未设置WAITING信号&当前没有持有写入锁模式&(当前没有锁或者只有乐观锁 | 队列中第一个等待者为读模式),那么就从队列头唤醒一次。
- 不可中断的锁获取操作中没有保存中断状态,造成CPU占有。
- 由于新型锁使用了新的数据结构,所以当读锁的第一个节点被中断取消时,它的cowait链接上的节点会全部重新拼接到队列上,这样会造成一定程序的"优先级反转"。具体可用如下方法再现:
public static void main(String[] args) throws InterruptedException{// TODO Auto-generated method stubStampedLock lock = new StampedLock();new Thread(new WriteThread(lock)).start();
// new Thread(new ReadThreadTry(lock)).start();Thread.sleep(200);new Thread(new ReadThreadTry(lock)).start();Thread.sleep(100);
// new Thread(new ReadThreadTry(lock)).start();for( int i = 0; i < 6; ++i)new Thread(new ReadThread(lock)).start();Thread.sleep(300);for( int i = 0; i < 6; ++i)new Thread(new WriteThreadLast(lock)).start();}
public static void main(String[] args) throws InterruptedException{// TODO Auto-generated method stubStampedLock lock = new StampedLock();new Thread(new WriteThread(lock)).start();
// new Thread(new ReadThreadTry(lock)).start();Thread.sleep(200);new Thread(new ReadThread(lock)).start();Thread.sleep(100);new Thread(new ReadThreadTry(lock)).start();for( int i = 0; i < 5; ++i)new Thread(new ReadThread(lock)).start();Thread.sleep(300);for( int i = 0; i < 6; ++i)new Thread(new WriteThreadLast(lock)).start();}
private static class WriteThread implements Runnable{private StampedLock lock;public WriteThread(StampedLock lock){this.lock = lock;}public void run(){long writeLong = lock.writeLock();System.out.println(Thread.currentThread().getName() + " WRITE");try {Thread.sleep(1000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}lock.unlockWrite(writeLong);}}private static class ReadThreadTry implements Runnable{private StampedLock lock;public ReadThreadTry(StampedLock lock){this.lock = lock;}public void run(){long readLong = 0;try {readLong = lock.tryReadLock(600, TimeUnit.MILLISECONDS);if(readLong > 0)System.out.println(Thread.currentThread().getName() + " READ LOCK");elseSystem.out.println(Thread.currentThread().getName() + " READ noLOCK");} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}finally{if(readLong>0)lock.unlockRead(readLong);}}}private static class ReadThread implements Runnable{private StampedLock lock;public ReadThread(StampedLock lock){this.lock = lock;}public void run(){long readLong = lock.readLock();System.out.println(Thread.currentThread().getName() + " READ");lock.unlockRead(readLong);}}private static class WriteThreadLast implements Runnable{private StampedLock lock;public WriteThreadLast(StampedLock lock){this.lock = lock;}public void run(){long writeLong = lock.writeLock();System.out.println(Thread.currentThread().getName() + " WRITE");lock.unlockWrite(writeLong);}}
观察输出信息,就可以看出问题。
Java ThreadConcurrency(9): 深入理解StampedLock及其实现原理相关推荐
- java 线程池的理解_JAVA线程池原理的理解
线程池原理基础理解: 线程池初始化规定个数的线程,然后这些线程一直运行,并且监控线程队列,只要线程队列被添加进线程,那么线程池不断从队列中取出线程运行.直到队列中的线程为空.实例代码如下: packa ...
- Java中 多态的理解
** Java中 多态的理解 ** 多态官方定义为: 所谓多态就是指程序中定义的引用变量所指向的具体类型和通过该引用变量发出的方法调用在编程时并不确定,而是在程序运行期间才确定,即一个引用变量倒底会指 ...
- 【转载】谈谈我对Java中CallBack的理解
谈谈我对Java中CallBack的理解 转载自: http://www.cnblogs.com/codingmyworld/archive/2011/07/22/2113514.html CallB ...
- Java基础-我所理解的泛型
Java基础-我所理解的泛型 引用 [java]泛型中,? extends T 与 ? super T 的区别.看法_winrh的博客-CSDN博客_泛型 extends 前言 Java基础系列,我所 ...
- Java 回调函数的理解
以下是我对java回调函数的理解,希望对各位有帮助. 简而言之,假设有两个类A与B,还有一个回调接口C(有一个c方法).其中A类持有一个B类对象作为属性和拥有一个a方法,并实现了接口C,所以A类中就有 ...
- Java常量字符串String理解
Java常量字符串String理解 以前关于String的理解仅限于三点: 1.String 是final类,不可继承 2.String 类比较字符串相等时时不能用" == ",只 ...
- 谈谈对java中分层的理解_让我们谈谈网页设计中的卡片设计
谈谈对java中分层的理解 "I want a card", this is the first demand point that the customer said in th ...
- 【转】java提高篇(二)-----理解java的三大特性之继承
[转]java提高篇(二)-----理解java的三大特性之继承 原文地址:http://www.cnblogs.com/chenssy/p/3354884.html 在<Think in ja ...
- Java类加载机制的理解
算上大学,尽管接触Java已经有4年时间并对基本的API算得上熟练应用,但是依旧觉得自己对于Java的特性依然是一知半解.要成为优秀的Java开发人员,需要深入了解Java平台的工作方式,其中类加载机 ...
最新文章
- Hinton最新演讲透露下一代神经网络模型的构想 | SIGIR 2020
- oracle 体系结构及内存管理 13_事务
- Glassfish3 asadmin 常用命令
- UPS电源报警器一直响是什么问题?
- 那些年踩过的Java异常,简直了!
- js关于字面量与构造函数创建对象的几点理解
- glew,glfw实现最新的opengl-学习笔记4实现纹理
- The partial sum problem
- 黑白风格android,颜色风格略不同 黑白华为Mate对比图赏
- C语言(从入门到精通)
- android签名命令行,Android系统签名位置及命令
- Android网络对讲机的实现
- CAN协议分析,120欧姆电阻原因
- Linux查看硬盘型号
- Vue组件化开发--脚手架的安装使用、目录结构说明
- 服务器winsxs文件夹怎么清理工具,win10系统winsxs文件夹该如何删除?win10删除winsxs文件夹的两种方法...
- 内存占用率过高怎么办 一分钟解决
- 我手机中舍不得删除的43条搞笑短信
- vscode CommandNotFoundError: Your shell has not been properly configured to use ‘conda activate‘.解决
- CUDA指定GPU的使用方法