线程并发,那就牵扯到内存共享的问题,在并发编程中,有三个理念:原子性、可见性、有序性。这里分享一个转载。

转载:Java volatile关键字最全总结:原理剖析与实例讲解(简单易懂)

在ThreadPoolExecutor中,我们用到了AtomicInteger完成了列队中数量、状态的管理。为什么选择AtomicInteger呢?AtomicInteger是Number的子类,里面的值,就是一个volatile修饰的value,那么value就有了对多线程的可见性和一定的有序性了。只有这些,还是不够,那谁来保证他的原子性呢?Unsafe。Unsafe是个极其强大但也很危险的东西,很多牛逼的工具底层都基于他来实现的,他可以直接使用allocateMemory为对象分配内存,理论上可以直接使用堆的内存大小。

Unsafe为AtomicInteger提供了CAS等算法,这里不讲太多Unsafe的东西,大家有兴趣可以搜索下,文章很多。给大家分享一个。

转载:深入理解sun.misc.Unsafe原理

UnSage的功能图

至此,大家都明白了为什么用AtomicInteger了吧,因为他在多线程的情况下,能保证原子性、可见性、有序性。

以ThreadPoolExecutor为例,他的workerCount的增减,就是使用了CAS。

    /*** Attempts to CAS-increment the workerCount field of ctl.*/private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);}/*** Attempts to CAS-decrement the workerCount field of ctl.*/private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}

CAS(Compare And Swap)先比较在替换,是我们所知道的最小操作单元。CAS使用JNI直接完成Java的非阻塞算法,直接交换内存地址。但他不是保证肯定会成功的。

所以,ThreadPoolExecutor中的addWorker的嵌套死循环,后面的再次检查。

在回过头来看下ThreadPoolExecutor中的AtomicInteger,他使用CompareAndSet将当前workerCount和各种状态都存起来,以便多线程之间共享使用。以便池子可以按照列队进行调度。除了线程间是贡献,ThreadPoolExecutor本身也是线程安全的。

重点:如何控制开启线程

接Android并发之Executor(线程池)家族(一)是个跨度,上一篇我没有将的如何控制线程的开启,这期补上。

先来代码。

private void moreYourBody() throws RuntimeException {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 10, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), new ThreadPoolExecutor.DiscardOldestPolicy());for (int i = 0; i < 10; i++) {System.out.println("dtl:" + ctl.get());threadPool.execute(() -> {System.out.println("thread name: " + Thread.currentThread().getName());});}}

I/System.out: thread name: pool-7-thread-1
I/System.out: thread name: pool-7-thread-1
I/chatty: uid=10137(com.wjf.binderclient) pool-7-thread-1 identical 1 line
I/System.out: thread name: pool-7-thread-1
I/System.out: thread name: pool-7-thread-2
I/System.out: thread name: pool-7-thread-5
I/System.out: thread name: pool-7-thread-3
I/System.out: thread name: pool-7-thread-7
I/System.out: thread name: pool-7-thread-6
I/System.out: thread name: pool-7-thread-4

I/chatty: uid=10137(com.wjf.binderclient) pool-7-thread-1 identical 1 line这一行,其实是日志折叠再来。

private void moreYourBody() throws RuntimeException {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 10, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), new ThreadPoolExecutor.DiscardOldestPolicy());for (int i = 0; i < 10; i++) {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("dtl:" + ctl.get());threadPool.execute(() -> {System.out.println("thread name: " + Thread.currentThread().getName());});}}

I/System.out: thread name: pool-1-thread-1
I/System.out: thread name: pool-1-thread-2
I/System.out: thread name: pool-1-thread-3
I/System.out: thread name: pool-1-thread-4
I/System.out: thread name: pool-1-thread-1
I/System.out: thread name: pool-1-thread-2
I/System.out: thread name: pool-1-thread-3
I/System.out: thread name: pool-1-thread-4
I/System.out: thread name: pool-1-thread-1
I/System.out: thread name: pool-1-thread-2

这个没问题,以最少的线程数量开启异步操作。

相比第一个,只是多做了点点的事情,看结果。

    private void moreYourBody() throws RuntimeException {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 10, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), new ThreadPoolExecutor.CallerRunsPolicy());for (int i = 0; i < 10; i++) {threadPool.execute(() -> {System.out.println("thread name: " + Thread.currentThread().getName() + " queue size:" + threadPool.getQueue().size());});}}

I/System.out: thread name: pool-3-thread-6 queue size:3
I/System.out: thread name: pool-3-thread-6 queue size:2
I/System.out: thread name: pool-3-thread-6 queue size:1
I/System.out: thread name: pool-3-thread-6 queue size:0
I/System.out: thread name: pool-3-thread-5 queue size:0
I/System.out: thread name: pool-3-thread-3 queue size:0
I/System.out: thread name: pool-3-thread-7 queue size:0
I/System.out: thread name: pool-3-thread-1 queue size:0
I/System.out: thread name: pool-3-thread-4 queue size:0
I/System.out: thread name: pool-3-thread-2 queue size:0

居然多出了不少的线程。如果想触发拒绝策略,调用这个threadPool.shutdown();,或者让创建线程的数量超出maximumPoolSize。使用不同的策略,结果也不同。比如说使用ThreadPoolExecutor.CallerRunsPolicy策略,那被拒绝的Runnable会被调度到主线程执行。pool-3-thread-6中的6,是ThreadPoolExecutor的第二个参数赋予的,你把循环拉到100,你再看看效果。

上代码!我们重头开始看。

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) { // 如果列队中的数量小于设置的corePoolSize,if (addWorker(command, true)) // 那就执行addWorker创建执行任务的线程。return;c = ctl.get(); // 如果workerCount大于corePoolSize,取出来进行下一个满列队的判断。}if (isRunning(c) && workQueue.offer(command)) { // 如果是运行状态,且可以放入workerQueue中。int recheck = ctl.get();if (!isRunning(recheck) && remove(command)) // 判断下,看是否满足拒绝策略的条件reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false); // 主要这里的第二个参数false,意思就是超出了corePoolSize数量,那我就跟maximumPoolSize对比。}else if (!addWorker(command, false)) // 要不然就走决绝策略了。reject(command);}

上面的方法,主要注意AddWorker的第二个参数的意义。

    private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize)) // 这里的core,就是区别判断阈值。return false; // 如果这里触发了,那就会触发外面的决绝策略。if (compareAndIncrementWorkerCount(c)) // 如果成功加进来了,那就退出嵌套循环break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start(); // 经过上面的一切渡劫。用于要执行worker了。workerStarted = true;}}} finally {if (!workerStarted)addWorkerFailed(w);}return workerStarted;}
    // worker 的start,就会到这里。final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) { // 这里的getTask。w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run(); // 这里就是执行池子里的Runnable} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
    private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?// 这里wc > corePoolSize判断了当前运行的线程数量,是否大于阈值boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 如果timed为true,就会调用时长阻塞列队,Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true; // 如果没有,就继续循环,继续上面的代码,会在compareAndDecrementWorkerCount(c)为true的时候退出。要不然,就一直等待。} catch (InterruptedException retry) {timedOut = false;}}}

这些大概明白了吧?

总结:当池子里已经存在的线程已经运行完了,那列队会回退到0,执行完的线程,会自动完成线程的生命周期,死亡、释放掉;如果超出了corePoolSize的阈值,会使用阻塞列队等待空闲位置新建线程。

Android并发之Executor(线程池)家族(二)之AtomicInteger相关推荐

  1. Android线程和线程池(二)HandlerThread学习记录 使用+源码

    HandlerThreadAndroid线程和线程池(二)HandlerThread学习记录 使用+源码 一.作用 二.工作原理 三.HandlerThread的特点 优势: 劣势: 四.使用 五.源 ...

  2. Android AsyncTask两种线程池分析和总结

    转自:http://bbs.51cto.com/thread-1114378-1-1.html Android AsyncTask两种线程池分析和总结 (一)    前言 在android Async ...

  3. Python 线程池 ThreadPoolExecutor(二) - Python零基础入门教程

    目录 一.Python 线程池前言 二.Python 线程池 ThreadPoolExecutor 常用函数 1.线程池 as_completed 函数使用 2.线程池 map 函数使用 3.线程池 ...

  4. 小豹子带你看源码:Java 线程池(二)实例化

    承上启下:上一篇文章小豹子讲了我为什么想要研究线程池的代码,以及我计划要怎样阅读代码.这篇文章我主要阅读了线程池实例化相关的代码,并提出了自己的疑问. 3 千里之行,始于实例化 3.1 先创建一个线程 ...

  5. executor线程池框架_如何使用Java 5 Executor框架创建线程池

    executor线程池框架 Java 5以Executor框架的形式在Java中引入了线程池,它允许Java程序员将任务提交与任务执行分离. 如果要使用Java进行服务器端编程,则线程池是维护系统可伸 ...

  6. 并发编程之 Executor 线程池原理与源码解读

    并发编程之 Executor 线程池原理与源码解读 线程是调度 CPU 资源的最小单位,线程模型分为 KLT 模型与 ULT 模型,JVM使用的是 KLT 模型.java线程与 OS 线程保持 1:1 ...

  7. Android开发中的线程池使用

    一.前言 既然Android中已经有了线程的概念,那么为什么需要使用线程池呢?我们从两个方面给出使用线程池的原因. 首先线程的新建和销毁都是存在性能上的消耗的,如果一个时间段有大量的网络请求,那么就需 ...

  8. java executor_Java并发编程(08):Executor线程池框架

    一.Executor框架简介 1.基础简介 Executor系统中,将线程任务提交和任务执行进行了解耦的设计,Executor有各种功能强大的实现类,提供便捷方式来提交任务并且获取任务执行结果,封装了 ...

  9. Java并发编程(08):Executor线程池框架

    本文源码:GitHub·点这里 || GitEE·点这里 一.Executor框架简介 1.基础简介 Executor系统中,将线程任务提交和任务执行进行了解耦的设计,Executor有各种功能强大的 ...

最新文章

  1. 用GAN创造新蛋白只需几周,大幅缩短制药周期 | Nature子刊
  2. jQuery 学习之路(1):引子
  3. mysql 魔乐_MLDN 李兴华 魔乐科技网上最全笔记
  4. SpringBoot中的Quartz应用
  5. Complex Congratulation β
  6. 韩国浦项化学在中国斥资超2800亿韩元投建电动车电池材料厂
  7. oracle中的视图
  8. Windows Mobile如何得到资源文件中的文件
  9. ((ios开发学习笔记 十二))Nib加载的方式实现自定义TableView
  10. Docker使用(三)使用Dockerfile创建镜像以及为镜像添加SSH服务
  11. 我为大家分享永久免费空间 云专家
  12. RPA之家视频讲解RPA-3
  13. 县城中学计算机教师就业难吗,我县中小学信息技术教师现状及对策
  14. EXCEL 连接 ORACLE 查询数据到表格 中文乱码 中文变成?
  15. 英语教师杂志英语教师杂志社英语教师编辑部2022年第16期目录
  16. 程序猿怎么利用技术挣钱?——python量化实践
  17. 校园招聘--网易笔试
  18. word中最后一行留白太多
  19. Lua游戏中常用到的一些动作
  20. git 重新追踪索引_索引追踪差距

热门文章

  1. Java8-排序方法(正序、倒序)
  2. 照片的读取、显示和保存
  3. 输入n个整数,输出其中的最大值
  4. 2019年广州人才引进入户需要多久时间?
  5. Java算法_优先队列和PriorityQueue——HDU 1873:看病要排队
  6. MSF之ms17-010永恒之蓝漏洞利用
  7. 深入浅出系列之——并查集详解【武侠版】【简单有趣】
  8. 高中知识复习——log2(n)
  9. 检测图像中的椭圆 并求其长短轴...
  10. FBT熔融拉锥大芯径多模光纤耦合器简介