使用场景

线程池ThreadPoolExecutor中经常使用SynchronousQueue作为阻塞队列,比如dubbo的provider的线程池默认会使用该队列,这里要先介绍下线程池ThreadPoolExecutor的逻辑,ThreadPoolExecutor的构造方法如下:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {...}
  • corePoolSize 核心线程数,即使线程池处于空闲状态也要保留在池中的线程数
  • maximumPoolSize 线程池中所允许的最大线程数
  • keepAliveTime 非核心线程存活时间
  • unit 非核心线程存活时间的单位
  • workQueue 阻塞队列
  • threadFactory 线程工厂类
  • handler 拒绝策略,当无法添加任务到阻塞队列 且 线程数达到maximumPoolSize时,会执行这个拒绝策略,默认是抛出RejectedExecutionException异常

当我们通过execute(Runnable r)提交任务时,会执行下面的逻辑:

  • 先检查线程数是否达到corePoolSize,如果没有则用threadFactory创建新线程来执行该任务
  • 如果已经达到corePoolSize,则调用阻塞队列workQueueoffer(E e)方法插入任务,如果插入成功则返回
  • 插入失败,则检查线程数是否达到maximumPoolSize,如果没有达到,则使用threadFactory生成新线程来执行该任务
  • 如果已经达到maximumPoolSize,则执行handler 拒绝策略

可以看出能否被成功通过阻塞队列的offer(E e)方法插入数据直接决定了是否创建新的非核心线程,这里与同样经常被线程池使用的阻塞队列LinkedBlockingQueue来对比下,这两个阻塞队列都实现了BlockingQueue接口,主要区别就在offer(E e)方法上:

  • 前者不存储数据,当没有空闲的消费线程,offer方法会直接返回失败并创建新的非核心线程;而后者会优先插入到队列中存储数据,只有在队列满了的情况下才会返回失败并创建新的非核心线程。
  • 基于上一点可以看出,使用前者的线程池是吞吐量优先,会优先开启尽可能多的线程来保证数据处理效率,适合高并发场景;而使用后者的线程池则是优先存储数据来缓冲下线程资源的消耗,尽量使用最少的线程资源,适合对机器资源敏感、吞吐量要求不高的场景。

下面来看下SynchronousQueue源码

offer(E e)take()源码

public boolean offer(E e) {if (e == null) throw new NullPointerException();return transferer.transfer(e, true, 0) != null;
}
public E take() throws InterruptedException {E e = transferer.transfer(null, false, 0);if (e != null)return e;Thread.interrupted();throw new InterruptedException();
}

可以看出这两个方法都会调用transferer.transfer方法,区别是传参不同,这里先看下方法签名:

/*** Performs a put or take.** @param e if non-null, the item to be handed to a consumer;*          if null, requests that transfer return an item*          offered by producer.* @param timed if this operation should timeout* @param nanos the timeout, in nanoseconds* @return if non-null, the item provided or received; if null,*         the operation failed due to timeout or interrupt --*         the caller can distinguish which of these occurred*         by checking Thread.interrupted.*/abstract E transfer(E e, boolean timed, long nanos);

第一个参数e:不为空的时候说明是个producer线程往队列中插数据,为null的时候,说明是个消费线程来取数据
第二个参数timed:表示是否允许超时,如果允许超时,则阻塞时长不会超过第三个参数设置的超时时间,否则不允许超时,会一直阻塞到有相对应的消费线程或者相对应的producer线程
第三个参数nanos:超时时间,单位纳秒
根据上面的参数定义可以看出offer(E e)方法设置了超时时间,并且超时时间是0,所以无能成功与否都会立即返回;而take()没有设置超时时间,所以会一直阻塞等待有producer提供任务
Transferer接口有两个实现类,TransferQueue是FIFO原则,而TransferStack则是通过类似栈结构的后进先出的LIFO原则,默认是后者,类图如下:


这里看下后者transfer方法的源码:

// 栈中存放的是SNode对象,所以先看下SNode类的属性
static final class SNode {// 下一个节点volatile SNode next;        // next node in stack// 匹配成功的节点,比如当前节点是一个consumer,那么当有producer节点与这个节点匹配成功后,就将这个match指向producer节点,同样的如果当前节点是一个producer节点,当匹配到consumer节点时,就将这个match指向匹配的consumer节点volatile SNode match;       // the node matched to this// 阻塞的线程volatile Thread waiter;     // to control park/unpark// 如果是producer,那么就是具体的数据,如果是consumer,这个属性就是nullObject item;                // data; or null for REQUESTs// 标识当前节点的mode,枚举值分三种:// int REQUEST= 0:表示当前节点是一个consumer节点,正在等待producer提供数据// int DATA  = 1:表示当前节点是一个producer节点,正在等待consumer来消费其提供的数据// int FULFILLING = 2:表示当前节点正与一个匹配的节点在交易中,处于一个中间状态(一个完整的交易分三步:先将node节点的mode值修改为交易中状态,然后将node的match引用通过CAS指向匹配的节点,最后修改head节点执行出栈操作,因为都是通过CAS来操作,没有加锁的逻辑,所以会有中间态,详细可以看下面的源码分析)int mode;
}// 核心方法
E transfer(E e, boolean timed, long nanos) {/** Basic algorithm is to loop trying one of three actions:* 核心算法是循环执行三个动作:* 1. If apparently empty or already containing nodes of same*    mode, try to push node on stack and wait for a match,*    returning it, or null if cancelled.* 1. 如果是空队列或者队列中存在相同模式的节点(节点模式有三种,一:等待其他线程取数据 二:等待其他线程提供数据 三:交易中),则将节点push进栈,等待互补线程来提供或者消费数据,如果节点被取消则返回null* 2. If apparently containing node of complementary mode,*    try to push a fulfilling node on to stack, match*    with corresponding waiting node, pop both from*    stack, and return matched item. The matching or*    unlinking might not actually be necessary because of*    other threads performing action 3:* 2. 如果存在互补模式的节点(即:取数据的线程发现队列中有一个负责提供数据的线程 或者         提供数据的线程 发现队列中有一个consumer线程),与相应的等待节点匹配,从堆栈中弹出两      者,  然后返回匹配项。 由于其他线程执行了操作3,因此实际上可能不需要匹配或取消链接。* 3. If top of stack already holds another fulfilling node,*    help it out by doing its match and/or pop*    operations, and then continue. The code for helping*    is essentially the same as for fulfilling, except*    that it doesn't return the item.3.如果堆栈顶部的节点正在与其他节点处于交易中(交易处于中间状态),那么帮助其匹配和/或弹出操作,然后继续。 帮助匹配和弹出操作的代码与实现代码基本相同,不同之处在于它不返回结果,而是继续循环。*/SNode s = null; // constructed/reused as needed// e为空,则说明当前线程是取数据的,e不为空,说明当前线程是提供数据的int mode = (e == null) ? REQUEST : DATA;for (;;) {SNode h = head;// h == null说明是空栈,h.mode == mode说明有相同mode的节点,那么就需要根据是否超时这个配置来决定是阻塞还是直接返回if (h == null || h.mode == mode) {  // empty or same-mode// 如果设置了超时,但是超时时间小于等于0,那么就说明不需要等待,直接返回if (timed && nanos <= 0L) {     // can't wait// 如果发现head节点已经被取消了,那么将head节点通过cas指向next节点if (h != null && h.isCancelled())casHead(h, h.next);     // pop cancelled node// 否则直接返回null,如果是通过offer方法调用的,那么就会返回false(注意看上面的offer方法代码),从而触发线程池创建新的线程的逻辑,所以说SynchronousQueue不存储具体数据elsereturn null;} // 需要阻塞的话,就将当前线程包装成Snode对象,插入栈顶,所以说是LIFO原则,后插入的数据会放到head节点上else if (casHead(h, s = snode(s, e, h, mode))) {// cas成功后,线程会阻塞,而在这个方法里,线程不会马上阻塞,而是先自旋一定次数,在高并发场景下,自旋可以避免线程挂起导致的用户态与内核态切换及上下文切换带来的消耗,从而提高效率,详细见下面对这个方法的单独分析SNode m = awaitFulfill(s, timed, nanos);// 当线程被唤醒或者超时后,如果返回的是自己,则说明超时后没有匹配到节点,则取消节点if (m == s) {               // wait was cancelled// 将这个因超时被取消的节点从栈中清除clean(s);return null;}// 如果自己匹配到了数据,而自己又不是head,那么就将head节点指向自己的next节点,因为自己匹配到了数据,那么head节点肯定也匹配到了数据,帮助执行出栈操作if ((h = head) != null && h.next == s)casHead(h, s.next);     // help s's fulfillerreturn (E) ((mode == REQUEST) ? m.item : s.item);}} // 发现互补节点,且head节点没有处于交易中的中间状态,则开始匹配逻辑else if (!isFulfilling(h.mode)) { // try to fulfill// 发现head节点已取消,则将head引用指向nextif (h.isCancelled())            // already cancelledcasHead(h, h.next);         // pop and retry// 构建新Snode,并标注节点状态为交易中的中间态,通过cas将新的节点压入栈中,而将next指针指向原head节点   else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {for (;;) { // loop until matched or waiters disappear// m节点就是s的互补节点SNode m = s.next;       // m is s's match// 如果没有互补节点,则说明栈已经空了,则重新开始循环,走第一个if里的逻辑(空栈的逻辑)if (m == null) {        // all waiters are gonecasHead(s, null);   // pop fulfill nodes = null;           // use new node next timebreak;              // restart main loop}SNode mn = m.next;// 互补节点不为空,则开始匹配,这个方法会将m的match引用通过CAS指向s节点,完成匹配,然后将m和s两个及节点出栈(就是将head节点跳过m和s,指向m的next节点),如果发现有Snode的waiter不为空,则说明有线程处于阻塞状态,则唤醒线程,详细见后面的单独分析if (m.tryMatch(s)) {casHead(s, mn);     // pop both s and mreturn (E) ((mode == REQUEST) ? m.item : s.item);} else// 没有匹配成功,则继续与m的next指向的节点进行匹配s.casNext(m, mn);   // help unlink}}} // 发现处于交易中的中间态节点,则帮助进行出栈操作,然后继续循环else {                            // help a fulfillerSNode m = h.next;               // m is h's matchif (m == null)                  // waiter is gonecasHead(h, null);           // pop fulfilling nodeelse {SNode mn = m.next;if (m.tryMatch(h))          // help matchcasHead(h, mn);         // pop both h and melse                        // lost matchh.casNext(m, mn);       // help unlink}}}
}

阻塞的逻辑在awaitFulfill(QNode s, E e, boolean timed, long nanos) 中,代码如下:

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {/* Same idea as TransferStack.awaitFulfill */// 计算出阻塞的截止时间final long deadline = timed ? System.nanoTime() + nanos : 0L;Thread w = Thread.currentThread();// 计算自旋的次数int spins = ((head.next == s) ?(timed ? maxTimedSpins : maxUntimedSpins) : 0);for (;;) {// 检查线程的中断标识,如果已经被中断,则取消节点,直接返回if (w.isInterrupted())// 该方法会将节点e的match属性指向自己,这样者检查到匹配到的节点是自己时,就知道节点被取消,这个判断逻辑在上面的transfer方法里的调用处s.tryCancel(e);SNode m = s.match;// 匹配到了节点,则直接返回,这个match引用可能是上面tryCancel方法设置的自己if (m != null)return m;// 检查是否允许超时,如果已经超时,则通过tryCancel取消节点,并将节点中match属性指向自己    if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {s.tryCancel();continue;}}// 检查剩余自旋次数,大于0则继续自旋if (spins > 0)spins = shouldSpin(s) ? (spins-1) : 0;// 自旋结束,但是还没有匹配到数据,则将waiter设置为当前线程,便于后续线程通过这个属性唤醒自己else if (s.waiter == null)s.waiter = w; // establish waiter so can park next iter// 不允许超时,则调用没有超时时间的api来阻塞自己    else if (!timed)LockSupport.park(this);// 允许超时,则调用超时api    else if (nanos > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanos);}
}

在上面的transfer(E e, boolean timed, long nanos)源码中可以看出线程在成功入栈后就会进入这个阻塞方法,直到超时、被中断、匹配到节点三种情况下才会阻塞结束,这个方法加入了自旋的逻辑来避免内核态与用户态以及线程上下文的切换带来的消耗,提高并发能力。


匹配及唤醒逻辑在tryMatch(SNode s) 方法里,源码分析如下:

 boolean tryMatch(SNode s) {// 通过cas将节点的match指向s节点,表示当前节点与s节点匹配成功if (match == null &&UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {// 如果当前节点的waiter不为空,则说明这个waiter线程处于阻塞状态,设置waiter的逻辑见上面的阻塞方法里Thread w = waiter;if (w != null) {    // waiters need at most one unparkwaiter = null;// 唤醒线程LockSupport.unpark(w);}return true;}return match == s;
}

可以看出整个过程都是通过CAS和volatile属性的配合来实现线程安全,同时在真正阻塞线程前会加入自旋的逻辑来避免内核态与用户态以及线程上下文的切换带来的消耗,适合高并发场景。吞吐量比LinkedBlockingQueue要高,这也是dubbo的provider默认使用这个阻塞队列的原因。

线程池中使用的SynchronousQueue的offer和take原理相关推荐

  1. Java 线程池中的线程复用是如何实现的?

    前几天,技术群里有个群友问了一个关于线程池的问题,内容如图所示: 关于线程池相关知识可以先看下这篇:为什么阿里巴巴Java开发手册中强制要求线程池不允许使用Executors创建? 那么就来和大家探讨 ...

  2. java线程池newfi_Java 线程池中的线程复用是如何实现的?

    前几天,技术群里有个群友问了一个关于线程池的问题,内容如图所示: 那么就来和大家探讨下这个问题,在线程池中,线程会从 workQueue 中读取任务来执行,最小的执行单位就是 Worker,Worke ...

  3. 线程池中各个参数如何合理设置

    欢迎大家关注我的公众号[老周聊架构],Java后端主流技术栈的原理.源码分析.架构以及各种互联网高并发.高性能.高可用的解决方案. 一.前言 在开发过程中,好多场景要用到线程池.每次都是自己根据业务场 ...

  4. C#如何判断线程池中所有的线程是否已经完成(转)

    其 实很简单用ThreadPool.RegisterWaitForSingleObject方法注册一个定时检查线程池的方法,在检查线程的方法内调用 ThreadPool.GetAvailableThr ...

  5. [.Net线程处理系列]专题二:线程池中的工作者线程

    目录: 一.上节补充 二.CLR线程池基础 三.通过线程池的工作者线程实现异步 四.使用委托实现异步 五.任务 六.小结 一.上节补充 对于Thread类还有几个常用方法需要说明的. 1.1 Susp ...

  6. 一个线程池中的线程异常了,那么线程池会怎么处理这个线程?

    一个线程池中的线程异常了,那么线程池会怎么处理这个线程? 参考文章: (1)一个线程池中的线程异常了,那么线程池会怎么处理这个线程? (2)https://www.cnblogs.com/fangua ...

  7. 【Android 异步操作】线程池 ( Worker 简介 | 线程池中的工作流程 runWorker | 从线程池任务队列中获取任务 getTask )

    文章目录 一.线程池中的 Worker ( 工作者 ) 二.线程池中的工作流程 runWorker 三.线程池任务队列中获取任务 getTask 在博客 [Android 异步操作]线程池 ( 线程池 ...

  8. 线程池中阻塞队列的作用?为什么是先添加列队而不是先创建最大线程?线程池中线程复用原理

    1.一般的队列只能保证作为一个有限长度的缓冲区,如果超出了缓冲长度,就无法保留当前的任务了,阻塞队列通过阻塞可以保留住当前想要继续入队的任务.阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使 ...

  9. Java Review - 线程池中使用ThreadLocal不当导致的内存泄漏案例源码分析

    文章目录 概述 Why 内存泄露 ? 在线程池中使用ThreadLocal导致的内存泄漏 概述 ThreadLocal的基本使用我们就不赘述了,可以参考 每日一博 - ThreadLocal VS I ...

最新文章

  1. matlab编程实现基于密度的聚类(DBSCAN)
  2. Python 初学者进阶的九大技能(附代码)
  3. svn 部署问题总结
  4. JS实现限制input上传文件的大小和格式
  5. linux 内存 段,Linux内存储器管理之分段机制
  6. springMVC 前台向后台传数组
  7. bzoj2751[HAOI2012]容易题(easy)
  8. 通信调制体制设计之64QAM性能分析MATLAB仿真及代码
  9. Redis学习手册(事务)
  10. 做折线图_python的visvis库做折线图(line.py)代码详解
  11. JavaScript原生实现《贪吃蛇》
  12. 工业相机:传感器尺寸与像元尺寸的关系
  13. devil may cry 4 android apk,Devil May Cry
  14. Excel/WPS做数据透视表,即对变量做交叉汇总(列联表)
  15. 综合latch 规避
  16. 汽车 Automotive > 汽车安全芯片调研
  17. TDD三定律和5条规则
  18. config语言和config.in文件
  19. 如何在linux编写perl脚本,关于linux:如何在perl脚本中插入awk命令?
  20. 《M8围棋谱》自定义皮肤设计指南

热门文章

  1. Elasticsearch RestHighLevelClient操作
  2. python获取股票数据接口
  3. Policy gradient(策略梯度详解)
  4. ip代理软件的原理到底是什么?适用场景有哪些?
  5. Google更新AIY套件包:门槛更低,适合初级玩家!
  6. matlab怎么画碎石图,成分分析中biplot函数不理解_主成分分析
  7. FCK上传图片问题解决
  8. java+lame实现wav到mp3的转换
  9. shu mei pai
  10. Nature综述:手把手教你分析菌群数据