线程池更多知识点请阅读另一篇博客

在实际的开发中,会将各种不同的异步任务提交到线程池执行,它们有轻重缓急。

如果任务量少,一来就有空闲线程处理,哦那没事了。

如果任务量多,我们希望队列根据任务的优先级有序存储,即优先级高的将会被优先消费。

实现的话有两个关键点

  1. 线程池的任务队列具备排序功能。
  2. 提交的任务具备可比性。

第1点,ThreadPoolExecutor的构造函数有一个BlockingQueue<Runnable> workQueue参数,这个接口有一个实现类PriorityBlockingQueue<E>,它的构造函数可以传入一个比较器Comparator,能够满足要求。

第2点,ThreadPoolExecutor的submit、invokeXxx、execute方法入参都是Runnable、Callable,均不具备可排序的属性。我们可以弄一个实现类,加一些额外的属性,这样就可以让它们具备可比较性了。

除此之外,还有一些难点

  1. PriorityBlockingQueue是无界的,它的offer方法永远返回true。有什么问题吗?第一,OOM风险;第二,设置的最大线程数和拒绝执行策略没有意义(没有机会触发,说shutdown的就不讲武德了)。
  2. 别人在使用ThreadPoolExecutor时,只要是提交Runnable、Callable及其子类都可以,我们就无法保证传进来的任务具备可比较性。有时候,把选择权交给用户不一定是好事。

怎么解决呢?

第1点,我们需要重写一下这个类的offer方法,如果元素超过指定数量直接返回false,否则调用原来逻辑。

第2点,扩展线程池的submit、invokeXxx、execute方法,可以选择继承或组合的方式,类似装饰者模式/静态代理模式。这里我们选择组合,原因:更灵活(开放自定义的方法、屏蔽原有的方法),模仿AQS。

呃,直接上代码吧!!!

package thread_pool;import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;/*** 优先级线程池执行器** @author Zhou huanghua*/
@SuppressWarnings("all")
public class PriorityThreadPoolExecutor {private static final Logger LOGGER = Logger.getAnonymousLogger();private final PriorityBlockingQueue<Runnable> priorityBlockingQueue;private final ThreadPoolExecutor threadPoolExecutor;// 自定义默认拒绝执行处理器:让调用线程执行最先提交到队列的任务,然后提交当前任务private static final RejectedExecutionHandler defaultHandler = new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (e.isShutdown()) {return;}Optional.ofNullable(e.getQueue().poll()).ifPresent(Runnable::run);e.execute(r);}};/*** 默认构造器*/public PriorityThreadPoolExecutor() {this(2, Runtime.getRuntime().availableProcessors() * 2 + 1, 60, TimeUnit.SECONDS, 1000, defaultHandler);}/*** 构造器** @param corePoolSize    核心线程数* @param maximumPoolSize 最大线程数* @param keepAliveTime   保留时间* @param unit            保留时间单位* @param workQueueSize   任务队列容量* @param handler         拒绝执行处理器*/public PriorityThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,int workQueueSize,RejectedExecutionHandler handler) {LOGGER.log(Level.INFO, "创建优先级线程池执行器:corePoolSize={0},maximumPoolSize={1},keepAliveTime={2},unit={3},workQueueSize={4},handler={5}",new Object[]{corePoolSize, maximumPoolSize, keepAliveTime, unit.name(), workQueueSize, handler.getClass().getSimpleName()});/** 优先级阻塞队列* 第一优先级priority,第二优先级taskNumber* 重写offer方法,改为有界*/priorityBlockingQueue = new PriorityBlockingQueue(10, (Comparator<PriorityTask>) (t1, t2) ->t1.priority != t2.priority ? Integer.compare(t1.priority, t2.priority) : Integer.compare(t1.taskNumber, t2.taskNumber)) {@Overridepublic boolean offer(Object o) {return super.size() < workQueueSize ? super.offer(o) : false;}};// 线程池执行器,使用默认线程工厂threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,priorityBlockingQueue,handler);}/*** 异步执行,默认最低优先级** @param command 任务*/public void asyncExecute(Runnable command) {asyncExecute(command, TaskPriority.LOW);}/*** 异步执行** @param command  任务* @param priority 优先级*/public void asyncExecute(Runnable command, TaskPriority priority) {threadPoolExecutor.execute(new PriorityTask(command, priority.value));LOGGER.log(Level.INFO, "活跃线程数量:{0},队列任务数量:{1}", new Object[]{threadPoolExecutor.getActiveCount(), priorityBlockingQueue.size()});}/*** 优先级任务*/private static class PriorityTask implements Runnable {private static final AtomicInteger NUMBER_GENERATOR = new AtomicInteger(1);private final Runnable command;private final int priority;private final int taskNumber;public PriorityTask(Runnable command, Integer priority) {this.command = command;this.priority = priority;this.taskNumber = NUMBER_GENERATOR.getAndIncrement();}@Overridepublic void run() {command.run();}}/*** 任务优先级枚举:HIGH > MEDIUM > LOW*/public enum TaskPriority {HIGH(1),MEDIUM(2),LOW(3);private Integer value;TaskPriority(Integer value) {this.value = value;}}
}

补充说明一下:

  • 优先级任务类除了优先级属性外,额外增加一个递增的全局序号属性,提供优先级相等时FIFO。
  • 开放两个构造函数和两个对外提交任务方法。默认的构造函数设置CPU核心*2+1的最大线程数和结合了CallerRunsPolicy、DiscardOldestPolicy的拒绝执行处理器。默认的提交任务方法设置最低优先级。任务优先级提供一个高中低级别的枚举。
  • 每次提交任务后记个日志,那个LOG不要太在意,没有引入其它依赖,就暂时用JDK自带的顶一下。

又到了最令人兴奋的测试环节。

关注下几个参数:核心线程数2,最大线程数4,任务队列容量4,拒绝执行处理器采用调用者执行策略。

任务名称采用优先级+同等优先级的递增序号组成,如31表示第3优先级的第1个任务;任务主体在输出打印之前等待2秒模仿实际情况和测试效果。别问我为啥不用JUnit

实现优先级队列的线程池相关推荐

  1. 【JUC】第五章 JUC 阻塞队列、线程池

    第五章 JUC 阻塞队列.线程池 文章目录 第五章 JUC 阻塞队列.线程池 一.阻塞队列 1.简介 2.BlockingQueue 的方法 3.常见的 BlockingQueue 二.线程池 1.简 ...

  2. Java队列、线程池及ThreadPoolExecutor自定义线程池实现

    目录 1.阻塞队列 2.队列分类 3.API使用 4.线程池 4.1.线程池参数 4.2.线程池实现 4.3.任务执行流程 4.4.拒绝策略 4.5.参数合理值设置 5.自定义线程池流程 6.自定义线 ...

  3. Java多线程闲聊(四):阻塞队列与线程池原理

    Java多线程闲聊(四)-阻塞队列与线程池原理 前言 复用永远是人们永恒的主题,这能让我们更好地避免重复制造轮子. 说到多线程,果然还是绕不开线程池,那就来聊聊吧. 人们往往相信,世界是存在一些规律的 ...

  4. java 阻塞锁_Java实现锁、公平锁、读写锁、信号量、阻塞队列、线程池等常用并发工具...

    锁的实现 锁的实现其实很简单,主要使用Java中synchronized关键字. public class Lock { private volatile boolean isLocked = fal ...

  5. python锁机制_python基础(锁机制,守护线程,线程队列,线程池)

    一. 互斥锁(Lock)与递归锁(RLock)机制 1.1 由互斥锁(Lock)产生的死锁现象: #互斥锁(死锁现象): #死锁现象: from threading importLock lock=L ...

  6. 并发队列、线程池、锁

    1.CountDownLatch(计数器)      CountDownLatch 类位于java.util.concurrent包下,利用它可以实现类似计数器的功能.比如有一个任务A,它要等待其他任 ...

  7. 面试官:使用无界队列的线程池会导致内存飙升吗?

    Executors创建线程池方式有如下几种: Executors.newFixedThreadPool(10);//LinkedBlockingQueue 无限加入队列 Executors.newSc ...

  8. 【Linux入门】多线程(线程概念、生产者消费者模型、消息队列、线程池)万字解说

    目录 1️⃣线程概念 什么是线程 线程的优点 线程的缺点 线程异常 线程异常 Linux进程VS线程 2️⃣线程控制 创建线程 获取线程的id 线程终止 等待线程 线程分离 3️⃣线程互斥 进程线程间 ...

  9. JAVA阻塞队列和线程池原理

    阻塞队列 队列 队列是一种特殊的线性表,遵循先入先出.后入后出的基本原则,一般来说,它只允许在表的前端进行删除操作,而在表的后端进行插入操作. 什么是阻塞队列 支持阻塞的插入方法,当队列满时,队列会阻 ...

最新文章

  1. golomb哥伦布编码——本质上就是通过0来区分商和余数
  2. mq 接口 java_Rabbitmq Java Client Api详解
  3. POE交换机应用技术知识大全
  4. 直播这把“开鱼刀”能否救蘑菇街于“扑街”?
  5. java中集合的结构Set类型
  6. SAP MM 如何取到供应商付款条款描述信息?
  7. sql查询条件为空的另类写法o( ̄▽ ̄)d
  8. PIC仿真器接口定义及连接注意事项
  9. 【记录】win11安装ubuntu子系统教程
  10. 竹笛的分类有哪些?来认识竹笛的大家族。
  11. vb6集成ad登录共享文件_肇庆学院校园网WiFi认证自动登录指南
  12. Android实现FM收音机
  13. 字符串Hash函数对比
  14. CentOS 7 使用 Nginx 搭建视频点播服务器
  15. 21届秋招ATL宁德新能源一面面经[数据分析工程师]
  16. 沐风微信营销水库模型二:建设专属秘密武器库!
  17. 越狱苹果手机导出网易云音乐歌曲(以及缓存文件转换)
  18. 新式单片机视频教程下载
  19. STM32和ESP32- 主讲esp
  20. 单链表 尾插法 C语言

热门文章

  1. Apache绿色版 官网下载+安装(win7)
  2. 创业阶段如何找客户_创业初期,如何寻找最初的合伙人?
  3. linux--代码对比工具Meld Diff
  4. 轻码云双赢:成就创业梦同时造就自身
  5. HDU 3999 BST + 先序遍历
  6. Python 单位(亿、万)转数字
  7. CSS3属性之text-shadow和box-shadow(立体效果的实现)
  8. Your branch and ‘origin/master‘ have diverged,and have 1 and 353 different commits each, respective
  9. Your branch and ‘origin/develop‘ have diverged,
  10. 用PyTorch实现MNIST手写数字识别(非常详细)