实现优先级队列的线程池
线程池更多知识点请阅读另一篇博客。
在实际的开发中,会将各种不同的异步任务提交到线程池执行,它们有轻重缓急。
如果任务量少,一来就有空闲线程处理,哦那没事了。
如果任务量多,我们希望队列根据任务的优先级有序存储,即优先级高的将会被优先消费。
实现的话有两个关键点:
- 线程池的任务队列具备排序功能。
- 提交的任务具备可比性。
第1点,ThreadPoolExecutor的构造函数有一个BlockingQueue<Runnable> workQueue参数,这个接口有一个实现类PriorityBlockingQueue<E>,它的构造函数可以传入一个比较器Comparator,能够满足要求。
第2点,ThreadPoolExecutor的submit、invokeXxx、execute方法入参都是Runnable、Callable,均不具备可排序的属性。我们可以弄一个实现类,加一些额外的属性,这样就可以让它们具备可比较性了。
除此之外,还有一些难点:
- PriorityBlockingQueue是无界的,它的offer方法永远返回true。有什么问题吗?第一,OOM风险;第二,设置的最大线程数和拒绝执行策略没有意义(没有机会触发,说shutdown的就不讲武德了)。
- 别人在使用ThreadPoolExecutor时,只要是提交Runnable、Callable及其子类都可以,我们就无法保证传进来的任务具备可比较性。有时候,把选择权交给用户不一定是好事。
怎么解决呢?
第1点,我们需要重写一下这个类的offer方法,如果元素超过指定数量直接返回false,否则调用原来逻辑。
第2点,扩展线程池的submit、invokeXxx、execute方法,可以选择继承或组合的方式,类似装饰者模式/静态代理模式。这里我们选择组合,原因:更灵活(开放自定义的方法、屏蔽原有的方法),模仿AQS。
呃,直接上代码吧!!!
package thread_pool;import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;/*** 优先级线程池执行器** @author Zhou huanghua*/
@SuppressWarnings("all")
public class PriorityThreadPoolExecutor {private static final Logger LOGGER = Logger.getAnonymousLogger();private final PriorityBlockingQueue<Runnable> priorityBlockingQueue;private final ThreadPoolExecutor threadPoolExecutor;// 自定义默认拒绝执行处理器:让调用线程执行最先提交到队列的任务,然后提交当前任务private static final RejectedExecutionHandler defaultHandler = new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (e.isShutdown()) {return;}Optional.ofNullable(e.getQueue().poll()).ifPresent(Runnable::run);e.execute(r);}};/*** 默认构造器*/public PriorityThreadPoolExecutor() {this(2, Runtime.getRuntime().availableProcessors() * 2 + 1, 60, TimeUnit.SECONDS, 1000, defaultHandler);}/*** 构造器** @param corePoolSize 核心线程数* @param maximumPoolSize 最大线程数* @param keepAliveTime 保留时间* @param unit 保留时间单位* @param workQueueSize 任务队列容量* @param handler 拒绝执行处理器*/public PriorityThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,int workQueueSize,RejectedExecutionHandler handler) {LOGGER.log(Level.INFO, "创建优先级线程池执行器:corePoolSize={0},maximumPoolSize={1},keepAliveTime={2},unit={3},workQueueSize={4},handler={5}",new Object[]{corePoolSize, maximumPoolSize, keepAliveTime, unit.name(), workQueueSize, handler.getClass().getSimpleName()});/** 优先级阻塞队列* 第一优先级priority,第二优先级taskNumber* 重写offer方法,改为有界*/priorityBlockingQueue = new PriorityBlockingQueue(10, (Comparator<PriorityTask>) (t1, t2) ->t1.priority != t2.priority ? Integer.compare(t1.priority, t2.priority) : Integer.compare(t1.taskNumber, t2.taskNumber)) {@Overridepublic boolean offer(Object o) {return super.size() < workQueueSize ? super.offer(o) : false;}};// 线程池执行器,使用默认线程工厂threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,priorityBlockingQueue,handler);}/*** 异步执行,默认最低优先级** @param command 任务*/public void asyncExecute(Runnable command) {asyncExecute(command, TaskPriority.LOW);}/*** 异步执行** @param command 任务* @param priority 优先级*/public void asyncExecute(Runnable command, TaskPriority priority) {threadPoolExecutor.execute(new PriorityTask(command, priority.value));LOGGER.log(Level.INFO, "活跃线程数量:{0},队列任务数量:{1}", new Object[]{threadPoolExecutor.getActiveCount(), priorityBlockingQueue.size()});}/*** 优先级任务*/private static class PriorityTask implements Runnable {private static final AtomicInteger NUMBER_GENERATOR = new AtomicInteger(1);private final Runnable command;private final int priority;private final int taskNumber;public PriorityTask(Runnable command, Integer priority) {this.command = command;this.priority = priority;this.taskNumber = NUMBER_GENERATOR.getAndIncrement();}@Overridepublic void run() {command.run();}}/*** 任务优先级枚举:HIGH > MEDIUM > LOW*/public enum TaskPriority {HIGH(1),MEDIUM(2),LOW(3);private Integer value;TaskPriority(Integer value) {this.value = value;}}
}
补充说明一下:
- 优先级任务类除了优先级属性外,额外增加一个递增的全局序号属性,提供优先级相等时FIFO。
- 开放两个构造函数和两个对外提交任务方法。默认的构造函数设置CPU核心*2+1的最大线程数和结合了CallerRunsPolicy、DiscardOldestPolicy的拒绝执行处理器。默认的提交任务方法设置最低优先级。任务优先级提供一个高中低级别的枚举。
- 每次提交任务后记个日志,那个LOG不要太在意,没有引入其它依赖,就暂时用JDK自带的顶一下。
又到了最令人兴奋的测试环节。
关注下几个参数:核心线程数2,最大线程数4,任务队列容量4,拒绝执行处理器采用调用者执行策略。
任务名称采用优先级+同等优先级的递增序号组成,如31表示第3优先级的第1个任务;任务主体在输出打印之前等待2秒模仿实际情况和测试效果。别问我为啥不用JUnit
实现优先级队列的线程池相关推荐
- 【JUC】第五章 JUC 阻塞队列、线程池
第五章 JUC 阻塞队列.线程池 文章目录 第五章 JUC 阻塞队列.线程池 一.阻塞队列 1.简介 2.BlockingQueue 的方法 3.常见的 BlockingQueue 二.线程池 1.简 ...
- Java队列、线程池及ThreadPoolExecutor自定义线程池实现
目录 1.阻塞队列 2.队列分类 3.API使用 4.线程池 4.1.线程池参数 4.2.线程池实现 4.3.任务执行流程 4.4.拒绝策略 4.5.参数合理值设置 5.自定义线程池流程 6.自定义线 ...
- Java多线程闲聊(四):阻塞队列与线程池原理
Java多线程闲聊(四)-阻塞队列与线程池原理 前言 复用永远是人们永恒的主题,这能让我们更好地避免重复制造轮子. 说到多线程,果然还是绕不开线程池,那就来聊聊吧. 人们往往相信,世界是存在一些规律的 ...
- java 阻塞锁_Java实现锁、公平锁、读写锁、信号量、阻塞队列、线程池等常用并发工具...
锁的实现 锁的实现其实很简单,主要使用Java中synchronized关键字. public class Lock { private volatile boolean isLocked = fal ...
- python锁机制_python基础(锁机制,守护线程,线程队列,线程池)
一. 互斥锁(Lock)与递归锁(RLock)机制 1.1 由互斥锁(Lock)产生的死锁现象: #互斥锁(死锁现象): #死锁现象: from threading importLock lock=L ...
- 并发队列、线程池、锁
1.CountDownLatch(计数器) CountDownLatch 类位于java.util.concurrent包下,利用它可以实现类似计数器的功能.比如有一个任务A,它要等待其他任 ...
- 面试官:使用无界队列的线程池会导致内存飙升吗?
Executors创建线程池方式有如下几种: Executors.newFixedThreadPool(10);//LinkedBlockingQueue 无限加入队列 Executors.newSc ...
- 【Linux入门】多线程(线程概念、生产者消费者模型、消息队列、线程池)万字解说
目录 1️⃣线程概念 什么是线程 线程的优点 线程的缺点 线程异常 线程异常 Linux进程VS线程 2️⃣线程控制 创建线程 获取线程的id 线程终止 等待线程 线程分离 3️⃣线程互斥 进程线程间 ...
- JAVA阻塞队列和线程池原理
阻塞队列 队列 队列是一种特殊的线性表,遵循先入先出.后入后出的基本原则,一般来说,它只允许在表的前端进行删除操作,而在表的后端进行插入操作. 什么是阻塞队列 支持阻塞的插入方法,当队列满时,队列会阻 ...
最新文章
- golomb哥伦布编码——本质上就是通过0来区分商和余数
- mq 接口 java_Rabbitmq Java Client Api详解
- POE交换机应用技术知识大全
- 直播这把“开鱼刀”能否救蘑菇街于“扑街”?
- java中集合的结构Set类型
- SAP MM 如何取到供应商付款条款描述信息?
- sql查询条件为空的另类写法o( ̄▽ ̄)d
- PIC仿真器接口定义及连接注意事项
- 【记录】win11安装ubuntu子系统教程
- 竹笛的分类有哪些?来认识竹笛的大家族。
- vb6集成ad登录共享文件_肇庆学院校园网WiFi认证自动登录指南
- Android实现FM收音机
- 字符串Hash函数对比
- CentOS 7 使用 Nginx 搭建视频点播服务器
- 21届秋招ATL宁德新能源一面面经[数据分析工程师]
- 沐风微信营销水库模型二:建设专属秘密武器库!
- 越狱苹果手机导出网易云音乐歌曲(以及缓存文件转换)
- 新式单片机视频教程下载
- STM32和ESP32- 主讲esp
- 单链表 尾插法 C语言
热门文章
- Apache绿色版 官网下载+安装(win7)
- 创业阶段如何找客户_创业初期,如何寻找最初的合伙人?
- linux--代码对比工具Meld Diff
- 轻码云双赢:成就创业梦同时造就自身
- HDU 3999 BST + 先序遍历
- Python 单位(亿、万)转数字
- CSS3属性之text-shadow和box-shadow(立体效果的实现)
- Your branch and ‘origin/master‘ have diverged,and have 1 and 353 different commits each, respective
- Your branch and ‘origin/develop‘ have diverged,
- 用PyTorch实现MNIST手写数字识别(非常详细)