欢迎大家关注我的微信公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。

一、线程池遇到的挑战

我们上一篇 《一文读懂线程池的实现原理 》已经从线程池如何维护自身状态、线程池如何管理任务、线程池如何管理线程三个维度来深入剖析线程池的底层原理与源码剖析,这让我们对线程池的原理有了较为深入的理解。这对我们多线程编程有很大的帮助,但在使用线程池时还是会面临几个棘手的问题。

  • 开发人员个人经验与水平参差不齐,配置线程池参数都是按照自己想法来,没有统一的一个配置标准。
  • 线程池执行情况与任务类型相关性较大,IO 密集型和 CPU 密集型的任务运行起来的情况差异非常大。
  • 当你配置好线程池后,有的时间段流量高峰期,导致线程池忙不过来;有的时间段流量低峰期,线程池比较空闲。这就会导致资源调度失衡,降低了系统的稳定性。

我们先来看下美团调研的业界一些线程池参数配置方案:

  • 第一种方案是出自《Java并发编程实践》,显然和业务场景有所偏离。
  • 第二种方案也不太合理,为什么呢?我们一个项目里一般来说不止一个自定义线程池吧?比如有专门处理数据异步持久化的线程池,有专门处理查询请求的线程池,这样去做一个简单的线程隔离。但是如果都用这样的参数配置的话,显然是不合理的。
  • 第三种方案虽然考虑到了业务场景,但这是理想状态。流量是不可能这么均衡的,就拿美团来说,下午 3、4 点的流量,能和 12 点左右午饭时的流量比吗?

基于上面线程池的几个痛点,那有没有好的解决方案呢?有的,那就是动态调整线程池参数。

尽管业界没有一些成熟的经验配置策略,那么我们是不是可以从修改线程池参数的成本入手?毕竟每次线上线程池故障的话,都得修改代码里的线程池相应的参数,然后再部署上线,这个过程在对可用性要求极高的项目中那是极其慢的,可能给公司造成巨大的损失。

既然改代码里的线程池相应的参数并上线这个过程慢,那我们是不是可以把相应的线程池参数配置到分布式配置中心上去?实现线程池参数可动态配置和即时生效,线程池参数动态化前后的参数修改流程对比如下:

二、动态化线程池

2.1 整体设计

动态化线程池的核心设计包括以下三个方面:

2.1.1 简化线程池配置

线程池构造参数有 7 个,但是最核心的是 3 个:corePoolSize、maximumPoolSize、workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。考虑到在实际应用中我们获取并发性的场景主要是两种:

  • 并行执行子任务,提高响应速度。这种情况下,应该使用同步队列,没有什么任务应该被缓存下来,而是应该立即执行。
  • 并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,队列容量必须声明,防止任务无限制堆积。

所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择,就可以满足绝大多数的业务需求。

2.1.2 参数可动态修改

为了解决参数不好配,修改参数成本高等问题。在 Java 线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。将线程池的配置放置在平台侧,允许开发同学简单的查看、修改线程池配置。

2.1.3 增加线程池监控

对某事物缺乏状态的观测,就对其改进无从下手。在线程池执行任务的生命周期添加监控能力,帮助开发同学了解线程池状态。


2.2 功能架构

动态化线程池提供如下功能:

  • 动态调参:支持线程池参数动态调整、界面化操作;包括修改线程池核心大小、最大核心大小、队列长度等;参数修改后及时生效。
  • 任务监控:支持应用粒度、线程池粒度、任务粒度的 Transaction 监控;可以看到线程池的任务执行情况、最大任务执行时间、平均任务执行时间、P95/P99线等。
  • 负载告警:支持告警规则配置,当超过阈值时(线程池队列任务积压到一定值、线程池负载数达到一定阈值)会通知相关的开发负责人。
  • 操作监控:创建、修改和删除线程池都会通知到应用的开发负责人。
  • 操作日志:可以查看线程池参数的修改记录,谁在什么时候修改了线程池参数、修改前的参数值是什么。
  • 权限校验:只有应用开发负责人才能够修改应用的线程池参数。

三、Apollo 分布式配置中心

了解完动态化线程池的整体设计与功能架构后,我相信你也可以设计出一款动态线程池组件出来的。下面跟着老周来实践一下动态调整线程池参数,可能不像上面设计的那样那么全面,但会把动态调整线程池参数的核心给实现一下。

不难发现动态化线程池的核心是配置管理,那我们就得找一个分布式配置中心,这里老周用的 Apollo,还有其它的像 Spring Cloud Config、disconf、某些大型互联网公司自研的分布式配置中心等,根据自己的项目情况以及使用场景来选择就行。

3.1 Apollo 总体设计

Apollo(阿波罗)是携程框架部门研发的分布式配置中心,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性,适用于微服务配置管理场景。

3.1.1 基础模型

如下图即是 Apollo 的基础模型:

  1. 用户在配置中心对配置进行修改并发布
  2. 配置中心通知 Apollo 客户端有配置更新
  3. Apollo 客户端从配置中心拉取最新的配置、更新本地配置并通知到应用


3.1.2 架构模块


上图简要描述了 Apollo 的总体设计,我们可以从下往上看:

  • Config Service 提供配置的读取、推送等功能,服务对象是 Apollo 客户端
  • Admin Service 提供配置的修改、发布等功能,服务对象是 Apollo Portal(管理界面)
  • Config Service 和 Admin Service 都是多实例、无状态部署,所以需要将自己注册到 Eureka 中并保持心跳
  • 在 Eureka 之上我们架了一层 Meta Server 用于封装 Eureka 的服务发现接口
  • Client 通过域名访问 Meta Server 获取 Config Service 服务列表(IP+Port),而后直接通过 IP+Port 访问服务,同时在 Client 侧会做 load balance、错误重试
  • Portal 通过域名访问 Meta Server 获取 Admin Service 服务列表(IP+Port),而后直接通过 IP+Port 访问服务,同时在 Portal 侧会做 load balance、错误重试
  • 为了简化部署,我们实际上会把 Config Service、Eureka 和 Meta Server 三个逻辑角色部署在同一个 JVM 进程中

3.2 服务端设计

3.2.1 配置发布后的实时推送设计

在配置中心中,一个重要的功能就是配置发布后实时推送到客户端。本文重点分析配置更新推送方式,下面我们简要看一下这块是怎么设计实现的。


上图简要描述了配置发布的大致过程:

  1. 用户在 Portal 操作配置发布
  2. Portal 调用 Admin Service 的接口操作发布
  3. Admin Service 发布配置后,发送 ReleaseMessage 给各个 Config Service
  4. Config Service 收到 ReleaseMessage 后,通知对应的客户端

上图的发送 ReleaseMessage 的实现方式详情请往下继续看:

Admin Service 在配置发布后,需要通知所有的 Config Service 有配置发布,从而 Config Service 可以通知对应的客户端来拉取最新的配置。

从概念上来看,这是一个典型的消息使用场景,Admin Service 作为 producer 发出消息,各个 Config Service 作为 consumer 消费消息。通过一个消息组件(Message Queue)就能很好的实现 Admin Service 和 Config Service 的解耦。

在实现上,考虑到 Apollo 的实际使用场景,以及为了尽可能减少外部依赖,我们没有采用外部的消息中间件,而是通过数据库实现了一个简单的消息队列。


3.3 客户端设计


上图简要描述了 Apollo 客户端的实现原理:

  1. 客户端和服务端保持了一个长连接,从而能第一时间获得配置更新的推送。(通过 Http Long Polling 实现)
  2. 客户端还会定时从 Apollo 配置中心服务端拉取应用的最新配置。
  3. 客户端从 Apollo 配置中心服务端获取到应用的最新配置后,会保存在内存中。
  4. 客户端会把从服务端获取到的配置在本地文件系统缓存一份。
  5. 应用程序可以从 Apollo 客户端获取最新的配置、订阅配置更新通知。

我们从基础模型、服务端设计、客户端设计三个维度来分析了 Apollo 总体设计,相信你对 Apollo 分布式配置中心有了全面且清晰的理解了。为了照顾没有用过 Apollo 这款分布式配置中心的同学,老周这里还是简单给个 Apollo 开发样例演示,希望对你后面的动态调整线程池参数实践有所帮助。

四、动态调整线程池参数实践

我们了解原理以及架构后,那我们开始实践了。

4.1 服务端安装

请看官方文档进行相应的安装:https://ctripcorp.github.io/apollo/#/zh/deployment/quick-start

执行启动脚本后,当看到如下输出后,就说明启动成功了!


启动成功后访问地址:http://localhost:8070


默认输入用户名:apollo、密码:admin,进行登录。


点击 SampleApp,我们看到在 DEV 环境包含一个 timeout 配置项,100 是这个配置项的值,下面我们在应用程序读取这个配置项:


4.2 应用程序

4.2.1 引入依赖

<dependencies><dependency><groupId>com.ctrip.framework.apollo</groupId><artifactId>apollo-client</artifactId><version>1.7.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version><scope>compile</scope></dependency>
</dependencies>

4.2.2 样例测试

/*** 动态获取Apollo配置* 注意要配置:-Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080** @author 微信公众号【老周聊架构】*/
public class GetApolloConfigTest {public static void main(String[] args) throws InterruptedException {Config config = ConfigService.getAppConfig();config.addChangeListener(new ConfigChangeListener() {@Overridepublic void onChange(ConfigChangeEvent changeEvent) {System.out.println("Changes for namespace " + changeEvent.getNamespace());for (String key : changeEvent.changedKeys()) {ConfigChange change = changeEvent.getChange(key);System.out.println(String.format("Found change - key: %s, oldValue: %s, newValue: %s, changeType: %s", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType()));}}});Thread.sleep(1000000L);}
}

我们现在把配置项默认的值 100 改为 200 程序输出结果如下:


控制台会出现以下日志,表明动态获取 Apollo 配置成功了。

4.3 动态线程池

上面我们把 Apollo 的动态监听修改配置的功能整明白了以后,再把线程池和 Apollo 结合起来构建动态线程池那就方便多了。首先我们用默认值构建一个线程池,然后线程池会监听 Apollo 关于相关配置项,如果相关配置有变化则刷新相关参数。

代码演示:

/*** 动态线程池工厂** @author 微信公众号【老周聊架构】*/
@Slf4j
@Component
public class DynamicThreadPoolFactory {/** 这里是你的namespace,我这里是默认的application **/private static final String NAME_SPACE = "application";/** 线程执行器 **/private volatile ThreadPoolExecutor executor;/** 核心线程数 **/private Integer corePoolSize = 10;/** 最大值线程数 **/private Integer maximumPoolSize = 20;/** 待执行任务的队列的长度 **/private Integer workQueueSize = 1000;/** 线程空闲时间 **/private Long keepAliveTime = 1000L;/** 线程名 **/private String threadName;public DynamicThreadPoolFactory() {Config config = ConfigService.getConfig(NAME_SPACE);init(config);listen(config);}/*** 初始化*/private void init(Config config) {if (executor == null) {synchronized (DynamicThreadPoolFactory.class) {if (executor == null) {String corePoolSizeProperty = config.getProperty(ParamsEnum.CORE_POOL_SIZE.getParam(), corePoolSize.toString());String maximumPoolSizeProperty = config.getProperty(ParamsEnum.MAXIMUM_POOL_SIZE.getParam(), maximumPoolSize.toString());String keepAliveTImeProperty = config.getProperty(ParamsEnum.KEEP_ALIVE_TIME.getParam(), keepAliveTime.toString());BlockingQueue<Runnable> workQueueProperty = new LinkedBlockingQueue<>(workQueueSize);executor = new ThreadPoolExecutor(Integer.valueOf(corePoolSizeProperty), Integer.valueOf(maximumPoolSizeProperty),Long.valueOf(keepAliveTImeProperty), TimeUnit.MILLISECONDS, workQueueProperty);}}}}/*** 监听器*/private void listen(Config config) {config.addChangeListener(new ConfigChangeListener() {@Overridepublic void onChange(ConfigChangeEvent changeEvent) {log.info("命名空间发生变化={}", changeEvent.getNamespace());for (String key : changeEvent.changedKeys()) {ConfigChange change = changeEvent.getChange(key);String newValue = change.getNewValue();refreshThreadPool(key, newValue);log.info("发生变化key={},oldValue={},newValue={},changeType={}", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType());}}});}/*** 刷新线程池*/private void refreshThreadPool(String key, String newValue) {if (executor == null) {return;}if (ParamsEnum.CORE_POOL_SIZE.getParam().equals(key)) {executor.setCorePoolSize(Integer.valueOf(newValue));log.info("修改核心线程数key={},value={}", key, newValue);}if (ParamsEnum.MAXIMUM_POOL_SIZE.getParam().equals(key)) {executor.setMaximumPoolSize(Integer.valueOf(newValue));log.info("修改最大线程数key={},value={}", key, newValue);}if (ParamsEnum.KEEP_ALIVE_TIME.getParam().equals(key)) {executor.setKeepAliveTime(Integer.valueOf(newValue), TimeUnit.MILLISECONDS);log.info("修改线程空闲时间key={},value={}", key, newValue);}}public ThreadPoolExecutor getExecutor(String threadName) {return executor;}
}
@AllArgsConstructor
public enum ParamsEnum {CORE_POOL_SIZE("corePoolSize", "核心线程数"),MAXIMUM_POOL_SIZE("maximumPoolSize", "最大线程数"),KEEP_ALIVE_TIME("keepAliveTime", "线程空闲时间"),;@Getterprivate String param;@Getterprivate String desc;
}
/*** 动态线程池执行器*  * @author 微信公众号【老周聊架构】*/
@Component
public class DynamicThreadExecutor {@Resourceprivate DynamicThreadPoolFactory threadPoolFactory;public void execute(String bizName, Runnable job) {threadPoolFactory.getExecutor(bizName).execute(job);}public Future<?> sumbit(String bizName, Runnable job) {return threadPoolFactory.getExecutor(bizName).submit(job);}
}
@Slf4j
public class DynamicThreadPoolExecutorTest {@Resourceprivate DynamicThreadExecutor dynamicThreadExecutor;/*** 记得 IDEA VM options 要记得加下面的参数* -Dapp.id=SampleApp -Denv=DEV -Dapollo.meta=http://localhost:8080*/@Testpublic void testExecute() throws InterruptedException {while (true) {dynamicThreadExecutor.execute("bizName", new Runnable() {@Overridepublic void run() {System.out.println("bizInfo");}});TimeUnit.SECONDS.sleep(1);}}
}

这里可以通过 JDK 自带的 JVisualVM 工具可以查看到相应的线程使用情况。

我们在配置中心修改配置项把核心线程数设置为 50,最大线程数设置为 100:


你会观察到线程数显著上升

这里还可以在代码中通过打印相应的线程状态,更加直观的从日志上观察到核心线程、最大线程数的修改情况。

private static void threadPoolStatus(ThreadPoolExecutor executor, String name) {BlockingQueue<Runnable> queue = executor.getQueue();System.out.println(Thread.currentThread().getName() + "-" + name + "-:" +"核心线程数:" + executor.getCorePoolSize() +" 活动线程数:" + executor.getActiveCount() +" 最大线程数:" + executor.getMaximumPoolSize() +" 线程池活跃度:" + divide(executor.getActiveCount(), executor.getMaximumPoolSize()) +" 任务完成数:" + executor.getCompletedTaskCount() +" 队列大小:" + (queue.size() + queue.remainingCapacity()) +" 当前排队线程数:" + queue.size() +" 队列剩余大小:" + queue.remainingCapacity() +" 队列使用度:" + divide(queue.size(), queue.size() + queue.remainingCapacity()));
}

这样的话就可以实现动态调整线程池参数,这就很好的解决了我们线程池现有的痛点,不至于线上出了问题还得改代码部署那么漫长的修复时间了,动态线程池大大简化了运维以及开发快速修复相关问题的难度。

动态调整线程池参数实践相关推荐

  1. 基于 Nacos Config 事件监听 动态调整线程池参数

    一.Nacos Config 事件监听 在实际项目中一般都会使用线程池解决一些异步并发问题,不过线程池核心参数很大程度上一次性进行设置,但系统运行起来总有可能出现各种各样的问题,如果修改线程池的参数则 ...

  2. 动态调整线程池_调整线程池的重要性

    动态调整线程池 无论您是否知道,您的Java Web应用程序很可能都使用线程池来处理传入的请求. 这是许多人忽略的实现细节,但是迟早您需要了解如何使用该池以及如何为您的应用程序正确调整池. 本文旨在说 ...

  3. 今天我们不聊原理,能拿来即用的线程池最佳实践

    这篇文章篇幅在5000字左右,绝对是干货.标题稍微有点夸张,嘿嘿,实际都是自己使用线程池的时候总结的一些个人感觉比较重要的点. 为什么要使用线程池? " 池化技术相比大家已经屡见不鲜了,线程 ...

  4. 如何用利特尔法则调整线程池大小

    利特尔法则 利特尔法则派生于排队论,用以下数学公式表示: L=λWL = λW L=λW L 系统中存在的平均请求数量. λ 请求有效到达速率.例如:5/s 表示每秒有5个请求到达系统. W 请求在系 ...

  5. 【并发编程】线程池参数设置与动态调整

    看了美团的一篇技术文章后才知道原来线程池的参数还可以动态调节. 一.场景分析 1.1 一个线程池中的线程异常了,那么线程池会怎么处理这个线程? public class ThreadPoolExecu ...

  6. 线程池参数如何设置?

    前言 着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流.使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器.J.U.C提供的线程池:ThreadPoolExecutor类,帮助开发 ...

  7. 自定义线程池-参数设计分析

    自定义线程池-参数设计分析 通过观察Java中的内置线程池参数讲解和线程池工作流程总结,我们不难发现,要设计一个好的线程池,就必须合理的设置线程池的4个参数;那到底该如何合理的设计4个参数的值呢?我们 ...

  8. 线程池参数到底要怎么配?

    文章目录 1 线程池快速回顾 2 现有设置参数的方法及不足 3 如何设置核心线程数(corePoolSize) 4 如何设置最大线程数(maxPoolSize) 5 如何改变等待队列长度 想必大家对J ...

  9. Java多线程学习六:使用线程池比手动创建线程好在那里以及常用线程池参数的意义

    为什么要使用线程池 首先,回顾线程池的相关知识,在 Java 诞生之初是没有线程池的概念的,而是先有线程,随着线程数的不断增加,人们发现需要一个专门的类来管理它们,于是才诞生了线程池.没有线程池的时候 ...

最新文章

  1. java显示位图_java – 大图标位图在通知中显示为白色方块?
  2. 大二上学期做的不入眼的导航系统。
  3. Python Django 多对多表设计批量插入方法示例
  4. SDM For Face Alignment 流程介绍及Matlab代码实现之预处理篇
  5. java 独占锁_锁分类(独占锁、分拆锁、分离锁、分布式锁)
  6. Google Chrome 扩展程序开发
  7. 引入 javascript_在您JavaScript项目中引入类型安全性? 再想一想
  8. opencv3/C++ Harris角点、Shi-Tomasi角点亚像素角点
  9. jQuery UI Autocomplete 使用 ajax 方法传输Json数据出现乱码问题的解决
  10. C99标准的新特性介绍
  11. DOS命令大全:MS-DOS命令详解
  12. 文献笔记:Plasmonic metagratings for simultaneous determination of Stokes parameters
  13. 公务员面试综合分析真题解析
  14. Django基础(29): select_related和prefetch_related的用法与区别
  15. html5 调用系统相册,iOS之H5调用系统相册相机浏览文件
  16. 巨型机是一种什么的超级计算机,把计算机分为巨型机、大中型机按照什么分的...
  17. 泰戈编程答案第一期(每周日更新)
  18. Oracle-SQL中日期加减一年的写法
  19. 更换服务器IP有哪些步骤?如何操作?
  20. edge 黑色 护眼 深色模式

热门文章

  1. 计算机综合布线室,计算机综合布线实验室配置清单.doc
  2. 联想win10默认浏览器修改
  3. 跑通、ILRuntime,全程傻瓜式指导。
  4. 达梦查询计算百分比数据为零
  5. JavaWeb总结之通过Servlet生成验证码图片
  6. 从零开始学习的设计模式——外观模式
  7. 环境变量的用户变量与系统变量的简单说明
  8. 简单基于Oracle实现分布式锁
  9. 如何获得excel文件名和工作表名
  10. [Unity3D课堂作业] 打飞碟 PlayDarts