ThreadPoolExecutor线程池原理

  • 线程池原理
    • 1. 线程池的简单介绍
      • 1.1 线程池是什么
      • 1.2 线程池解决的核心问题是什么
    • 2. 线程池的实现原理
      • 2.1 线程池的执行流程
      • 2.2 源码分析
    • 3. 线程池的使用
      • 3.1 线程池的创建
      • 3.2 向线程池提交任务
      • 3.3 生命周期管理
      • 3.4 关闭线程池
      • 3.5 合理地配置线程池
        • 如何判断是 CPU 密集任务还是 IO 密集任务?
    • 4. 线程池的监控

线程池原理

随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流。使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器。J.U.C提供的线程池:ThreadPoolExecutor类,帮助开发人员管理线程并方便地执行并行任务。了解并合理使用线程池,是一个开发人员必修的基本功。

1. 线程池的简单介绍

1.1 线程池是什么

线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。

线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

本文描述线程池是JDK中提供的ThreadPoolExecutor类。

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来一系列好处:

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗
  • 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行
  • 提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源, 还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行

1.2 线程池解决的核心问题是什么

线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:

  • 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
  • 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
  • 系统无法合理管理内部的资源分布,会降低系统的稳定性。

为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。

Pooling is the grouping together of resources (assets, equipment, personnel, effort, etc.) for the purposes of maximizing advantage or minimizing risk to the users. The term is used in finance, computing and equipment management.——wikipedia

2. 线程池的实现原理

Java中的线程池核心实现类是ThreadPoolExecutor,首先来看一下ThreadPoolExecutor的UML类图,了解下ThreadPoolExecutor的继承关系。

Executor框架的使用示意图

  • ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。
  • ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。
  • AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。
  • 最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

2.1 线程池的执行流程

介绍完线程池的作用以及解决的核心问题后,下面我们来看看当向线程池提交一个任务后,线程池是如何处理这个任务的?

ThreadPoolExecutor执行execute()方法的示意图,如下图所示。

如上图所示,线程池的处理流程如下:

  • 线程池会首先判断核心线程池里的线程是否都在执行任务。

    • 如果不是,创建一个新的核心线程来执行任务
    • 如果核心线程池里的线程都在执行任务,则进入下一个流程
  • 线程池判断工作队列是否已满
    • 如果工作队列未满,则将提交的任务放在工作队列里,等待核心线程去获取执行
    • 如果工作队列满,进入下一个流程
  • 判断线程池的线程是否都处于工作状态
    • 如果没有,则创建一个新的工作线程来执行任务
    • 如果已经满了,则交给饱和策略来处理这个任务

介绍完大致的流程,再来看看ThreadPoolExecutor执行示意图

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。

如上图所示,ThreadPoolExecutor执行execute方法分下面4种情况:

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤 需要获取全局锁)
  2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue
  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)
  4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后 (当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

2.2 源码分析

上面的流程分析让我们很直观地了解了线程池的工作原理,让我们再通过源代码来看看是如何实现的,线程池执行任务的方法如下。

public void execute(Runnable command) {if (command == null)throw new NullPointerException();// 如果线程数小于基本线程数,则创建线程并执行当前任务if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {// 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。if (runState == RUNNING && workQueue.offer(command)) {if (runState != RUNNING || poolSize == 0)ensureQueuedTaskHandled(command);}// 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,// 则创建一个线程执行任务。else if (!addIfUnderMaximumPoolSize(command))// 抛出RejectedExecutionException异常reject(command); // is shutdown or saturated}
}

工作线程:线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行。我们可以从Worker类的run()方法里看到这点。

public void run() {try {Runnable task = firstTask;firstTask = null;while (task != null || (task = getTask()) != null) {runTask(task);task = null;}} finally {workerDone(this);}
}

Worker执行任务的模型如下图所示:

线程不断从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现,其执行流程如下图所示:

ThreadPoolExecutor中线程执行任务的示意图如下所示:

线程池中的线程执行任务分两种情况,如下:

  • 直接由新创建的线程执行:在execute()方法中创建一个线程时,会让这个线程执行当前任务。

  • 线程从任务队列中获取任务然后执行:这个线程执行完上图中1的任务后,会反复从BlockingQueue获取任务来执行

  • 第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。

3. 线程池的使用

3.1 线程池的创建

通过ThreadPoolExecutor来创建一个线程池

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,milliseconds,runnableTaskQueue, handler);

创建一个线程池时需要输入几个参数,如下:

  • corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法, 线程池会提前创建并启动所有基本线程。

  • runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:

    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用 移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工 厂方法Executors.newCachedThreadPool使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
  • maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。

  • ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字,代码如下:

    new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
    
  • RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK 1.5中Java线程池框架提供了以下4种策略:

    • AbortPolicy:直接抛出异常。

    • CallerRunsPolicy:只用调用者所在线程来运行任务。

    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

    • DiscardPolicy:不处理,丢弃掉

      当然,也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化存储不能处理的任务。

  • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以, 如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。

  • TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟 (MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒 (NANOSECONDS,千分之一微秒)

3.2 向线程池提交任务

可以使用两个方法向线程池提交任务,分别为execute()和submit()方法:

  • execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功
  • submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功

以下代码可知execute()方法输入的任务是一个Runnable类的实例

threadsPool.execute(new Runnable() {@Overridepublic void run() {// TODO Auto-generated method stub}
});

通过sumbit方法得到的返回值future对象,可以调用futureget()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完

Future<Object> future = executor.submit(harReturnValuetask);
try {Object s = future.get();
} catch (InterruptedException e) {// 处理中断异常
} catch (ExecutionException e) {// 处理无法执行任务异常
} finally {// 关闭线程池executor.shutdown();
}

3.3 生命周期管理

线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。

关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:

private static int runStateOf(int c)     { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c)  { return c & CAPACITY; }  //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }   //通过状态和线程数生成ctl

ThreadPoolExecutor的运行状态有5种,分别为:

其生命周期转换如下入所示:

3.4 关闭线程池

可以通过调用线程池的shutdownshutdownNow方法来关闭线程池

  • 原理:遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止

    • shutdownNow首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表
    • shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。

3.5 合理地配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析:

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
  • 任务的优先级:高、中和低。
  • 任务的执行时间:长、中和短。
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理:

  • CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池

  • IO密集型任务线程由于并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。混合型的任务

    如果可以拆分,可以将要执行的任务其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解

注意:我们初始化线程池的时候,建议使用有界队列

对于一些阻塞线程,如果我们使用无界队列会造成线程池的队列就会越来越多,任务挤压在线程池中, 有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。

有界队列能增加系统的稳定性和预警能力

如何判断是 CPU 密集任务还是 IO 密集任务?

CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。

ThreadPoolExecutor线程池原理相关推荐

  1. 13.ThreadPoolExecutor线程池之submit方法

    jdk1.7.0_79  在上一篇<ThreadPoolExecutor线程池原理及其execute方法>中提到了线程池ThreadPoolExecutor的原理以及它的execute方法 ...

  2. Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理

    相关文章目录: Java线程池ThreadPoolExecutor使用和分析(一) Java线程池ThreadPoolExecutor使用和分析(二) - execute()原理 Java线程池Thr ...

  3. 手写一个线程池,带你学习ThreadPoolExecutor线程池实现原理

    摘要:从手写线程池开始,逐步的分析这些代码在Java的线程池中是如何实现的. 本文分享自华为云社区<手写线程池,对照学习ThreadPoolExecutor线程池实现原理!>,作者:小傅哥 ...

  4. java 线程池原理分析

    一.为什么使用线程池 1.降低资源消耗,减少线程创建和销毁次数,每个工作线程可以重复利用,执行多个任务 2.可根据系统承受能力,调整工作线程的数目,防止消耗过多的内存 二.java 线程池使用 Exe ...

  5. Java 并发编程——Executor框架和线程池原理

    Java 并发编程系列文章 Java 并发基础--线程安全性 Java 并发编程--Callable+Future+FutureTask java 并发编程--Thread 源码重新学习 java并发 ...

  6. JAVA线程池原理以及几种线程池类型介绍

    在什么情况下使用线程池? 1.单个任务处理的时间比较短      2.将需处理的任务的数量大 使用线程池的好处: 1.减少在创建和销毁线程上所花的时间以及系统资源的开销      2.如不使用线程池, ...

  7. java线程池_Java多线程并发:线程基本方法+线程池原理+阻塞队列原理技术分享...

    线程基本方法有哪些? 线程相关的基本方法有 wait,notify,notifyAll,sleep,join,yield 等. 线程等待(wait) 调用该方法的线程进入 WAITING 状态,只有等 ...

  8. Java多线程系列--【JUC线程池 02】- 线程池原理(一)

    参考:http://www.cnblogs.com/skywang12345/p/java_threads_category.html 概要 在前面一章"Java多线程系列--"J ...

  9. 并发编程--线程池原理

    阻塞队列和非阻塞队列 ConcurrentLinkedQueue类 适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于Bloc ...

最新文章

  1. Java设计模式——原型模式
  2. java队列等待唤醒_Java深入学习29:线程等待和唤醒的两个方案
  3. 特洛伊木马脚本linux,手动查杀特洛伊木马
  4. ppt中的流程图怎么整体移动_PPT中图片太丑了?该怎么办?
  5. Q - Tour - hdu 3488(最小匹配值)
  6. 高效率实现web自动完成功能-三叉搜索树
  7. DIgSILENT出图到Matlab画图到Visio画图全过程
  8. Android Adb 连接海马玩模拟器
  9. SCORM课程对接线上课程学习平台
  10. 解决Mac 80端口被占用
  11. 广东指导晚造水稻工作 国稻种芯·中国水稻节:惠州加强防治
  12. xgboost和随机森林特征重要性计算方法
  13. 油猴Tampermonkey简介
  14. Android12.0 默认开启WLAN热点设置默认热点名称和密码
  15. SAP SD 销售合同或者销售订单审批流搭建(状态管理)
  16. 使用软路由实现智能Qos(海蜘蛛)
  17. 什么是“敏捷教练”?
  18. Linux 桌面版太“惨”了。。。
  19. 第二周预习:异常类,常用类,容器
  20. 合金弹头An unknowed exception has occurred 多人联机报错解决办法

热门文章

  1. sparkmllib 推荐系统实现(学习)
  2. 二手手机交易存个人信息安全隐患?旧手机到底该怎么样处理?
  3. WIN10系统电脑休眠后唤醒自动退出夜间发光模式故障解决
  4. 中海达数据怎么转rinex_GPS-OEM原始数据向Rinex格式转换的方法
  5. 大学课程 | 基于WINDLX的系统结构实验
  6. 计算机体系结构流水线相关实验报告,计算机系统结构winDLX流水线实验报告汇编.doc...
  7. Java基础知识多线程,同步锁
  8. ros下启动robotiq-2f85电爪
  9. 中国电子学会2022年12月份青少年软件编程Scratch图形化等级考试试卷三级真题(含答案)
  10. Uncertainty Loss不确定损失