线程池的使用和工作原理
开篇
本文将从以下三个方面介绍线程池
- 线程池的七个参数
- 线程的使用
- 自定义一个线程池
- 一个任务提交会经历哪些步骤
- JDK线程池是如何保证核心线程一直存活的
线程池的七个参数
java源码
/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize 核心线程数* * @param maximumPoolSize 最大线程数* * @param keepAliveTime 超出核心线程数的线程的存活时间* * @param unit 存活时间单位* * @param 工作队列,也就是存放任务的阻塞队列* * @param threadFactory 创建线程的工厂* * @param handler 当核心线程没有空闲,祖舍队列已满,当前线程大于最大线程的拒绝策略* * @throws IllegalArgumentException if one of the following holds:<br>* {@code corePoolSize < 0}<br>* {@code keepAliveTime < 0}<br>* {@code maximumPoolSize <= 0}<br>* {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}* or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
线程池的使用
线程池的使用非常简单
普通线程池的使用
public class TestMyBlockQueue {private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();public static void main(String[] args) {MyBlockQueue<String> stringMyBlockQueue = new MyBlockQueue<>(5);// 直接创建一个线程池 推荐用法ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(AVAILABLE_PROCESSORS*4,AVAILABLE_PROCESSORS*4,30,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(100));// 借助Executors创建线程池 不推荐使用ExecutorService threadPool = Executors.newFixedThreadPool(20);for (int i = 0;i<10;i++) {// 10个线程不put数据threadPool.execute(()-> {try {while (true) {stringMyBlockQueue.put("DSADSAD");TimeUnit.SECONDS.sleep(10);}} catch (InterruptedException e) {e.printStackTrace();}});}// 10个线程不take数据for (int i = 0;i<2;i++) {threadPool.execute(()-> {try {while (true) {stringMyBlockQueue.take();TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();}});}}
}
ScheduledThreadPool
除了普通线程池jdk还提供了一种带定时的线程池,很多框架的延迟加载就是使用这种线程实现的,这个线程池就是ScheduledThreadPool,比如Dubbo的延迟加载,此外这个线程池也可以用来实现定时任务
public class TestScheduledThreadPool {public static void main(String[] args) {ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);// 提交一个任务1s后执行scheduledThreadPool.schedule(new Runnable() {public void run() {System.out.println("你好1");}},1, TimeUnit.SECONDS);// 创建并执行并结束一个runnable在延迟指定initialDelay时间,然后,每隔initialDelay+period*n时间执行一次scheduledThreadPool.scheduleAtFixedRate(new Runnable() {public void run() {System.out.println("你好2");}}, 1, 1, TimeUnit.SECONDS);// 创建并执行并结束一个runnable在延迟指定initialDelay时间,然后第一次执行完成后,间隔delay时间继续执行一次,无限循环。scheduledThreadPool.scheduleWithFixedDelay(new Runnable() {public void run() {System.out.println("你好3");}},1000,100,TimeUnit.MILLISECONDS);}
}
线程池的使用我们要切记不要定义方法中,大家可以思考一下为什么,这是一个相当危险的动作,此外线程池的关闭要注意哪些,比如在单例模式这个线程池是否可以关闭(关闭后会有什么影响),不可的话如何让线程池中的任务执行完成后再让程序往下执行,有兴趣的可以看一下下面的这个CountDownLatch的用法
public class CountDownLatchDemo {private static CountDownLatch startSignal = new CountDownLatch(1);//用来表示裁判员需要维护的是6个运动员private static CountDownLatch endSignal = new CountDownLatch(6);public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(6);for (int i = 0; i < 6; i++) {executorService.execute(() -> {try {System.out.println(Thread.currentThread().getName() + " 运动员等待裁判员响哨!!!");// 所有的线程阻塞在这个地方startSignal.await();System.out.println(Thread.currentThread().getName() + "正在全力冲刺");// 到达终点数量-1endSignal.countDown();System.out.println(Thread.currentThread().getName() + " 到达终点");} catch (InterruptedException e) {e.printStackTrace();}});}TimeUnit.SECONDS.sleep(2);System.out.println("裁判员发号施令啦!!!");// 所有运动员准备完成startSignal.countDown();endSignal.await();System.out.println("所有运动员到达终点,比赛结束!");executorService.shutdown();}}
自定义线程池
这个自定义线程池基本就是参照jdk线程池来实现的,只是有些细节实现的比较粗糙,例如和拒绝,关闭线程池,关闭后还有没执行完的任务该怎么处理这些都是没有实现的。如果看jdk源码有点难度可以先看这个,然后再看jdk源码可能会轻松一下,代码如下
public class MyThreadPool {/*** 当前线程数*/int threadCount;/*** 核心线程数*/int coreSize;/*** 阻塞队可容纳的任务数*/int workerQueueCount;private ArrayBlockingQueue<Runnable> workQueue;public MyThreadPool(int coreSize,int workerQueueCount) {this.workerQueueCount = workerQueueCount;this.coreSize = coreSize;this.workQueue = new ArrayBlockingQueue<>(workerQueueCount);}public void submit(Runnable task){if (task == null){throw new NullPointerException();}if (threadCount < coreSize){System.out.println("创建核心线程执行任务");MyWorKer myWorKer = new MyWorKer(task);myWorKer.thread.start();threadCount++;}else if (workQueue.size() < workerQueueCount){try {System.out.println("加入到阻塞队列");workQueue.put(task);} catch (InterruptedException e) {e.printStackTrace();}}else{// jdk线程池不是这样处理的System.out.println("直接创建一个线程执行");new Thread(task).start();}}class MyWorKer implements Runnable {private Runnable firstTask;private Thread thread ;MyWorKer(Runnable firstTask) {this.firstTask = firstTask;this.thread = new Thread(this);}void runWorker(MyWorKer myWorKer) throws InterruptedException {Runnable task = myWorKer.firstTask;// 线程存活,其实就是利用阻塞队列让线程阻塞在这个地方while (task != null || (task = workQueue.take()) != null) {task.run();task = null;}}@Overridepublic void run() {try {runWorker(this);} catch (InterruptedException e) {e.printStackTrace();}}}
}
测试代码
public class TestMyThreadPool {public static void main(String[] args) {MyThreadPool myThreadPool = new MyThreadPool(3,5);for (int i = 0; i < 6; i++) {myThreadPool.submit(()->{System.out.println(Thread.currentThread().getName());try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}});}int i = Thread.activeCount();System.out.println(i);}
}
一个任务提交会经历哪些步骤
这个网上会有很多文章有兴趣的可以看一下,jdk源码的注释写的也很明白,源码也比较简单
总结就是先判断是否核心线程数已满,不满创建线程执行任务,满了就判断阻塞队列是否已满,不满就放入到阻塞队列中,满了就判断当前线程数是否大于最大线程数,大于直接根据拒绝策略拒绝,不大于就创建线程执行
源码如下,这个源码还是比较简单的
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}
JDK线程池的工作原理(重点)
认真看过自定义部分代码的应该会发现,自定义的线程池使用的阻塞的方式是线程阻塞,来保证核心线程数存活(线程的生命周期这种老生常谈的问题就不用介绍了,随便都可以百度的到)
我们看一下jdk的线程池是如何保证核心线程存活的
关键代码
private boolean addWorker(Runnable firstTask, boolean core)
boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 创建一个任务,注意任务的参数w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}
t.start()执行的是什么这个很重要,点进去可以看到真执行的是runWorker方法,runWorker会不停的从阻塞队列中获取任务执行
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// getTask是线程阻塞while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
看一下take方法
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?// 重点allowCoreThreadTimeOut 是可以设置默认false,只有当线程数大于核心线程数的时候才会是trueboolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?// 如果是true就是用可以中断的方法获取workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// 如果是false就是不是中断的方法获取任务workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
总的来说,就是allowCoreThreadTimeOut 这个参数来控制线程池是否允许存活,默认是false表示可以存活
彩蛋
核心线程会不会被替换,换言之就是核心线程会一直是之前创建的吗?后面创建的最大线程有没有可能成为核心线程?
写在后面的话
一个缓解内心迷茫最好的方式就是强迫自己静下心来学习,只要有收获就不会感到迷茫
线程池的使用和工作原理相关推荐
- java线程池的工作原理_Java 线程池的介绍以及工作原理
在什么情况下使用线程池? 1.单个任务处理的时间比较短 2.将需处理的任务的数量大 使用线程池的好处: 1. 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的消耗. 2. 提高响应速度 ...
- Java 线程池的介绍以及工作原理
在什么情况下使用线程池? 1.单个任务处理的时间比较短 2.将需处理的任务的数量大 使用线程池的好处: 1. 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的消耗. 2. 提高响应速度 ...
- JUC多线程:线程池的创建及工作原理 和 Executor 框架
一.什么是线程池: 线程池主要是为了解决 新任务执行时,应用程序为任务创建一个新线程 以及 任务执行完毕时,销毁线程所带来的开销.通过线程池,可以在项目初始化时就创建一个线程集合,然后在需要执行新任务 ...
- Java线程池,从使用到原理
转载自 Java线程池,从使用到原理 线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象 ...
- JAVA线程池_并发队列工作笔记0004---Callable原理_多线程执行Callable任务
技术交流QQ群[JAVA,C++,Python,.NET,BigData,AI]:170933152 注意之前咱们说的线程池队列中都是用的实现了Runnbale接口的线程, 这里咱们用的是实现了Cal ...
- 线程池作用、用法以及原理
线程池 作用 用法 建议设定大小 快捷构造线程池 submit与execute shutdown与shutdownNow Future与FutureTast 代码 状态 底层原理 继承关系 主要参数 ...
- 图解线程池——清新脱俗的讲原理
网上介绍线程池的文章很多,质量好坏不一.能讲的很透彻的,确实不多. 本人能力有限,本文先从原理入手,讲清楚线程池是怎么运行的. 至于源码的分析,将单独写一篇(<线程池源码详解>). 全文以 ...
- 操作系统:为什么IO操作不占用CPU却会导致进程阻塞?Web服务器每接收一个请求都会创建一个新的线程吗?Tomcat服务器工作原理?
为什么IO操作不占用CPU却会导致进程阻塞?Web服务器每接收一个请求都会创建一个新的线程吗?这两个问题在我学操作系统以前我都挺困惑的.现在我来尝试着解答一下. 1. 为什么IO操作不占用CPU却会导 ...
- JAVA线程池_并发队列工作笔记0002---认识线程池_在线程池中使用队列
技术交流QQ群[JAVA,C++,Python,.NET,BigData,AI]:170933152 上面是线程的执行周期 这个线程的生命周期,可以看到时间都浪费在了创建和销毁的这里了. 实际上执行业 ...
- 数据库连接池使用场景,工作原理和实现步骤
一.使用场景: 大型高并发应用里 使用连接池的好处:就是可以限制应用的连接数,另外,不用再额外地去创建每个连接,MySQL创建连接的开销也是较大的,因为创建一个新连接相当于MySQL创 ...
最新文章
- 3 汪博士解读pmp_备考两月,我顺利通过PMP考试
- python最早引入json的版本_详解Python在使用JSON时需要注意的编码问题
- SQL Server Management Studio清除历史登陆记录
- Redis数据库 【总结笔记】
- 计算机mips是什么,在计算机术语中,什么叫MIPS
- 平塘天眼和大数据有什么关系_贵州平塘的中国天眼,值得去吗?除了天眼,平塘还有什么好玩?...
- 2022春节档新片预售总票房达1.08亿
- 在线JSON转sarcastic工具
- android跳转app store,从微信跳转到appstore下载App
- stagefright与opencore对比
- u盘装服务器系统还原c盘失败,usb启动盘安装系统还原失败怎么办?
- android 手机 拍 全景 java_android如何进行全景拍照怎么实现
- 为什么那些美事没有实现---生活中小事有感
- 易语言服务器端口总被占用,易语言检测端口是否被占用的代码
- 闲鱼怎么发布宝贝引流?推广方法就是不断的在闲鱼上发布转让
- python中readlines是什么意思_Python中read,readline,readlines三种方式的区别
- MSE(均方误差)函数和RMSE函数
- 苹果计算机单位说明书,手把手教你用苹果电脑玩转办公
- macos上的ios虚拟机_如何将中级帖子转换为可在iOS和macOS上运行的SwiftUI应用
- 【面试大全-Java】Spring核心问答