开篇

本文将从以下三个方面介绍线程池

  • 线程池的七个参数
  • 线程的使用
  • 自定义一个线程池
  • 一个任务提交会经历哪些步骤
  • JDK线程池是如何保证核心线程一直存活的

线程池的七个参数

java源码

/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize 核心线程数*     * @param maximumPoolSize 最大线程数* * @param keepAliveTime 超出核心线程数的线程的存活时间*      * @param unit 存活时间单位* * @param 工作队列,也就是存放任务的阻塞队列*       * @param threadFactory 创建线程的工厂* * @param handler 当核心线程没有空闲,祖舍队列已满,当前线程大于最大线程的拒绝策略* * @throws IllegalArgumentException if one of the following holds:<br>*         {@code corePoolSize < 0}<br>*         {@code keepAliveTime < 0}<br>*         {@code maximumPoolSize <= 0}<br>*         {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}*         or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

线程池的使用

线程池的使用非常简单

普通线程池的使用

public class TestMyBlockQueue {private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();public static void main(String[] args) {MyBlockQueue<String> stringMyBlockQueue = new MyBlockQueue<>(5);// 直接创建一个线程池 推荐用法ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(AVAILABLE_PROCESSORS*4,AVAILABLE_PROCESSORS*4,30,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(100));// 借助Executors创建线程池 不推荐使用ExecutorService threadPool = Executors.newFixedThreadPool(20);for (int i = 0;i<10;i++) {// 10个线程不put数据threadPool.execute(()-> {try {while (true) {stringMyBlockQueue.put("DSADSAD");TimeUnit.SECONDS.sleep(10);}} catch (InterruptedException e) {e.printStackTrace();}});}// 10个线程不take数据for (int i = 0;i<2;i++) {threadPool.execute(()-> {try {while (true) {stringMyBlockQueue.take();TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();}});}}
}

ScheduledThreadPool

除了普通线程池jdk还提供了一种带定时的线程池,很多框架的延迟加载就是使用这种线程实现的,这个线程池就是ScheduledThreadPool,比如Dubbo的延迟加载,此外这个线程池也可以用来实现定时任务

public class TestScheduledThreadPool {public static void main(String[] args) {ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);// 提交一个任务1s后执行scheduledThreadPool.schedule(new Runnable() {public void run() {System.out.println("你好1");}},1, TimeUnit.SECONDS);// 创建并执行并结束一个runnable在延迟指定initialDelay时间,然后,每隔initialDelay+period*n时间执行一次scheduledThreadPool.scheduleAtFixedRate(new Runnable() {public void run() {System.out.println("你好2");}}, 1, 1, TimeUnit.SECONDS);// 创建并执行并结束一个runnable在延迟指定initialDelay时间,然后第一次执行完成后,间隔delay时间继续执行一次,无限循环。scheduledThreadPool.scheduleWithFixedDelay(new Runnable() {public void run() {System.out.println("你好3");}},1000,100,TimeUnit.MILLISECONDS);}
}

线程池的使用我们要切记不要定义方法中,大家可以思考一下为什么,这是一个相当危险的动作,此外线程池的关闭要注意哪些,比如在单例模式这个线程池是否可以关闭(关闭后会有什么影响),不可的话如何让线程池中的任务执行完成后再让程序往下执行,有兴趣的可以看一下下面的这个CountDownLatch的用法

public class CountDownLatchDemo {private static CountDownLatch startSignal = new CountDownLatch(1);//用来表示裁判员需要维护的是6个运动员private static CountDownLatch endSignal = new CountDownLatch(6);public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(6);for (int i = 0; i < 6; i++) {executorService.execute(() -> {try {System.out.println(Thread.currentThread().getName() + " 运动员等待裁判员响哨!!!");// 所有的线程阻塞在这个地方startSignal.await();System.out.println(Thread.currentThread().getName() + "正在全力冲刺");// 到达终点数量-1endSignal.countDown();System.out.println(Thread.currentThread().getName() + "  到达终点");} catch (InterruptedException e) {e.printStackTrace();}});}TimeUnit.SECONDS.sleep(2);System.out.println("裁判员发号施令啦!!!");// 所有运动员准备完成startSignal.countDown();endSignal.await();System.out.println("所有运动员到达终点,比赛结束!");executorService.shutdown();}}

自定义线程池

这个自定义线程池基本就是参照jdk线程池来实现的,只是有些细节实现的比较粗糙,例如和拒绝,关闭线程池,关闭后还有没执行完的任务该怎么处理这些都是没有实现的。如果看jdk源码有点难度可以先看这个,然后再看jdk源码可能会轻松一下,代码如下

public class MyThreadPool {/*** 当前线程数*/int threadCount;/*** 核心线程数*/int coreSize;/*** 阻塞队可容纳的任务数*/int workerQueueCount;private ArrayBlockingQueue<Runnable> workQueue;public MyThreadPool(int coreSize,int workerQueueCount) {this.workerQueueCount = workerQueueCount;this.coreSize = coreSize;this.workQueue = new ArrayBlockingQueue<>(workerQueueCount);}public void submit(Runnable task){if (task == null){throw new NullPointerException();}if (threadCount < coreSize){System.out.println("创建核心线程执行任务");MyWorKer myWorKer = new MyWorKer(task);myWorKer.thread.start();threadCount++;}else if (workQueue.size() < workerQueueCount){try {System.out.println("加入到阻塞队列");workQueue.put(task);} catch (InterruptedException e) {e.printStackTrace();}}else{// jdk线程池不是这样处理的System.out.println("直接创建一个线程执行");new Thread(task).start();}}class MyWorKer implements Runnable {private Runnable firstTask;private Thread thread ;MyWorKer(Runnable firstTask) {this.firstTask = firstTask;this.thread = new Thread(this);}void runWorker(MyWorKer myWorKer) throws InterruptedException {Runnable task = myWorKer.firstTask;// 线程存活,其实就是利用阻塞队列让线程阻塞在这个地方while (task != null || (task = workQueue.take()) != null) {task.run();task = null;}}@Overridepublic void run() {try {runWorker(this);} catch (InterruptedException e) {e.printStackTrace();}}}
}

测试代码

public class TestMyThreadPool {public static void main(String[] args) {MyThreadPool myThreadPool = new MyThreadPool(3,5);for (int i = 0; i < 6; i++) {myThreadPool.submit(()->{System.out.println(Thread.currentThread().getName());try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}});}int i = Thread.activeCount();System.out.println(i);}
}

一个任务提交会经历哪些步骤

这个网上会有很多文章有兴趣的可以看一下,jdk源码的注释写的也很明白,源码也比较简单

总结就是先判断是否核心线程数已满,不满创建线程执行任务,满了就判断阻塞队列是否已满,不满就放入到阻塞队列中,满了就判断当前线程数是否大于最大线程数,大于直接根据拒绝策略拒绝,不大于就创建线程执行

源码如下,这个源码还是比较简单的

public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}

JDK线程池的工作原理(重点)

认真看过自定义部分代码的应该会发现,自定义的线程池使用的阻塞的方式是线程阻塞,来保证核心线程数存活(线程的生命周期这种老生常谈的问题就不用介绍了,随便都可以百度的到)

我们看一下jdk的线程池是如何保证核心线程存活的

关键代码

private boolean addWorker(Runnable firstTask, boolean core)

boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 创建一个任务,注意任务的参数w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}

t.start()执行的是什么这个很重要,点进去可以看到真执行的是runWorker方法,runWorker会不停的从阻塞队列中获取任务执行

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// getTask是线程阻塞while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

看一下take方法

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?// 重点allowCoreThreadTimeOut 是可以设置默认false,只有当线程数大于核心线程数的时候才会是trueboolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?// 如果是true就是用可以中断的方法获取workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// 如果是false就是不是中断的方法获取任务workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

总的来说,就是allowCoreThreadTimeOut 这个参数来控制线程池是否允许存活,默认是false表示可以存活

彩蛋

核心线程会不会被替换,换言之就是核心线程会一直是之前创建的吗?后面创建的最大线程有没有可能成为核心线程?

写在后面的话

一个缓解内心迷茫最好的方式就是强迫自己静下心来学习,只要有收获就不会感到迷茫

线程池的使用和工作原理相关推荐

  1. java线程池的工作原理_Java 线程池的介绍以及工作原理

    在什么情况下使用线程池? 1.单个任务处理的时间比较短 2.将需处理的任务的数量大 使用线程池的好处: 1. 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的消耗. 2. 提高响应速度 ...

  2. Java 线程池的介绍以及工作原理

    在什么情况下使用线程池? 1.单个任务处理的时间比较短 2.将需处理的任务的数量大 使用线程池的好处: 1. 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的消耗. 2. 提高响应速度 ...

  3. JUC多线程:线程池的创建及工作原理 和 Executor 框架

    一.什么是线程池: 线程池主要是为了解决 新任务执行时,应用程序为任务创建一个新线程 以及 任务执行完毕时,销毁线程所带来的开销.通过线程池,可以在项目初始化时就创建一个线程集合,然后在需要执行新任务 ...

  4. Java线程池,从使用到原理

    转载自  Java线程池,从使用到原理 线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象 ...

  5. JAVA线程池_并发队列工作笔记0004---Callable原理_多线程执行Callable任务

    技术交流QQ群[JAVA,C++,Python,.NET,BigData,AI]:170933152 注意之前咱们说的线程池队列中都是用的实现了Runnbale接口的线程, 这里咱们用的是实现了Cal ...

  6. 线程池作用、用法以及原理

    线程池 作用 用法 建议设定大小 快捷构造线程池 submit与execute shutdown与shutdownNow Future与FutureTast 代码 状态 底层原理 继承关系 主要参数 ...

  7. 图解线程池——清新脱俗的讲原理

    网上介绍线程池的文章很多,质量好坏不一.能讲的很透彻的,确实不多. 本人能力有限,本文先从原理入手,讲清楚线程池是怎么运行的. 至于源码的分析,将单独写一篇(<线程池源码详解>). 全文以 ...

  8. 操作系统:为什么IO操作不占用CPU却会导致进程阻塞?Web服务器每接收一个请求都会创建一个新的线程吗?Tomcat服务器工作原理?

    为什么IO操作不占用CPU却会导致进程阻塞?Web服务器每接收一个请求都会创建一个新的线程吗?这两个问题在我学操作系统以前我都挺困惑的.现在我来尝试着解答一下. 1. 为什么IO操作不占用CPU却会导 ...

  9. JAVA线程池_并发队列工作笔记0002---认识线程池_在线程池中使用队列

    技术交流QQ群[JAVA,C++,Python,.NET,BigData,AI]:170933152 上面是线程的执行周期 这个线程的生命周期,可以看到时间都浪费在了创建和销毁的这里了. 实际上执行业 ...

  10. 数据库连接池使用场景,工作原理和实现步骤

    一.使用场景: 大型高并发应用里         使用连接池的好处:就是可以限制应用的连接数,另外,不用再额外地去创建每个连接,MySQL创建连接的开销也是较大的,因为创建一个新连接相当于MySQL创 ...

最新文章

  1. 3 汪博士解读pmp_备考两月,我顺利通过PMP考试
  2. python最早引入json的版本_详解Python在使用JSON时需要注意的编码问题
  3. SQL Server Management Studio清除历史登陆记录
  4. Redis数据库 【总结笔记】
  5. 计算机mips是什么,在计算机术语中,什么叫MIPS
  6. 平塘天眼和大数据有什么关系_贵州平塘的中国天眼,值得去吗?除了天眼,平塘还有什么好玩?...
  7. 2022春节档新片预售总票房达1.08亿
  8. 在线JSON转sarcastic工具
  9. android跳转app store,从微信跳转到appstore下载App
  10. stagefright与opencore对比
  11. u盘装服务器系统还原c盘失败,usb启动盘安装系统还原失败怎么办?
  12. android 手机 拍 全景 java_android如何进行全景拍照怎么实现
  13. 为什么那些美事没有实现---生活中小事有感
  14. 易语言服务器端口总被占用,易语言检测端口是否被占用的代码
  15. 闲鱼怎么发布宝贝引流?推广方法就是不断的在闲鱼上发布转让
  16. python中readlines是什么意思_Python中read,readline,readlines三种方式的区别
  17. MSE(均方误差)函数和RMSE函数
  18. 苹果计算机单位说明书,手把手教你用苹果电脑玩转办公
  19. macos上的ios虚拟机_如何将中级帖子转换为可在iOS和macOS上运行的SwiftUI应用
  20. 【面试大全-Java】Spring核心问答

热门文章

  1. TMEA:源于音乐,高于盛典
  2. [wechart] 微信小程序使用粘性定位position: sticky的注意事项(避坑)
  3. 基于JAVA大数据在线考试系统在线阅卷系统及大数据统计分析计算机毕业设计源码+数据库+lw文档+系统+部署
  4. 接口测试常见问题及答案
  5. Java高并发编程实战1,那些年学过的锁
  6. Axure简易计算器
  7. OpenCV python下载和安装
  8. 通过dSYM文件分析crash日志
  9. 谷歌高管地震:谷歌大脑联合创始人Samy Bengio离职了
  10. 数学建模之lingo使用