Java多线程高并发编程代码笔记(四)
文章目录
- Executor
- ExecutorService
- Callable
- Executors
- Future
- 线程池
- FixedThreadPool
- CachedThreadPool
- SingleThreadPool
- ScheduledThreadPool
- WorkStealingPool
- ForkJoinPool
- 线程池的底层实现
- ThreadPoolExecutor
- FixedThreadPool
- CachedThreadPool
- ScheduledThreadPool
- SingleThreadExecutor
- 参考
- 源代码
Executor
参考:https://blog.csdn.net/qq_34993631/article/details/82713550
执行器,这是一个接口,内部维护了一个方法execute它负责执行一项任务。参数为Runnable,方法的具体实现由我们自己来执行。如下面的代码,我们既可以使用单纯的方法调用也可以新启一个新的线程去执行Runnable的run方法。
package demo34;import java.util.concurrent.Executor;public class MyExecutor implements Executor {public static void main(String[] args) {new MyExecutor().execute(() -> System.out.println(Thread.currentThread().getName() + "hello executor"));}@Overridepublic void execute(Runnable command) {new Thread(command).start();
// command.run();}}
运行结果
Thread-0hello executor
ExecutorService
代表着启动一系列的线程为用户提供服务(本质上也是一个执行器,继承自Executor)
同时它可以提交Callable与Runnable的对象返回一个未来的执行结果对象Future。这里顺便说一下,Callable是一个增强版的Runnable,它的call方法可以抛出异常可以有返回值。其中它的返回值放在了Future对象中,我们可以使用Future对象的get方法来获得返回值。同时它也是一系列线程池的接口比如说ForkJoinPool、ScheduledThreadPoolExecutor,、ThreadPoolExecutor等。
Callable
Callable是一个增强版的Runnable,它的call方法可以抛出异常可以有返回值。其中它的返回值放在了Future对象中,我们可以使用Future对象的get方法来获得返回值。
Executors
是一个操作Executor的工具方法。
Future
Future常与Callable联合使用,Future可以获得Callable执行后的返回值。如果想新建一个线程执行一个这个Callable中的call方法而且获得返回值的话我们可以使用以下的思路。
方案一:new Thread(new FutureTask(一个实现了Callable的类的对象)).start();使用FutureTask来接收任务的返回值。
方案二:new一个线程池然后然后提交Callable的实现的对象。使用Future来获得Callable的返回值。
package demo34;import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;/*** new Thread(new FutureTask(一个实现了Callable的类的对象)).start();使用FutureTask来接收任务的返回值*/
public class MyFuture01 {public static void main(String[] args) throws InterruptedException, ExecutionException {FutureTask<Integer> task = new FutureTask<>(() -> {TimeUnit.MILLISECONDS.sleep(500);return 1000;}); //new Callable () { Integer call();}new Thread(task).start();System.out.println(task.get()); //阻塞}
}
运行结果
1000
package demo34;import java.util.concurrent.*;/*** new一个线程池然后然后提交Callable的实现的对象。使用Future来获得Callable的返回值。*/
public class MyFuture02 {public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService service = Executors.newFixedThreadPool(5);Future<Integer> f = service.submit(() -> {TimeUnit.MILLISECONDS.sleep(500);return 1;});System.out.println(f.get());System.out.println(f.isDone());}
}
运行结果
1
true
线程池
FixedThreadPool
package demo35;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class MyFixedThreadPool {public static void main(String[] args) throws InterruptedException {ExecutorService service = Executors.newFixedThreadPool(5); //execute submitfor (int i = 0; i < 6; i++) {service.execute(() -> {try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName());});}System.out.println(service);service.shutdown();System.out.println(service.isTerminated());System.out.println(service.isShutdown());System.out.println(service);TimeUnit.SECONDS.sleep(5);System.out.println(service.isTerminated());System.out.println(service.isShutdown());System.out.println(service);}
}
运行结果
java.util.concurrent.ThreadPoolExecutor@7cca494b[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
false
true
java.util.concurrent.ThreadPoolExecutor@7cca494b[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-1
pool-1-thread-2
pool-1-thread-4
pool-1-thread-5
pool-1-thread-3
pool-1-thread-1
true
true
java.util.concurrent.ThreadPoolExecutor@7cca494b[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
- 整个程序new了一个5个线程的线程池,使用for循环向这个线程池抛了5个任务。它的执行原则是哪一个线程空闲就由哪个线程来执行这个任务。所以我们看到的线程池的线程序号是不固定的乱序的,但是它有个规则就是先执行完任务的线程会在新线程到来时优先分配到任务。
- 线程池shutdown之后程序不会立刻停止而是要等待的所有线程都执行完毕之后再停止服务,所以我们看到的就是Running Shutting down Terminated
- 线程池的任务大体上分为两类,等待就绪队列与已完成任务的队列。通过输出结果我们可以看出在开始有5个正在执行的任务1个任务驻留在就绪队列等待执行,在执行结束后我们的已执行队列中就会有6个元素。
利用线程池做并行计算。
/*** 线程池的概念* 并行计算*/
package demo35;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;public class ParallelComputing {public static void main(String[] args) throws InterruptedException, ExecutionException {long start = System.currentTimeMillis();List<Integer> results = getPrime(1, 200000); long end = System.currentTimeMillis();System.out.println(end - start);final int cpuCoreNum = 4;ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);MyTask t1 = new MyTask(1, 80000);MyTask t2 = new MyTask(80001, 130000);MyTask t3 = new MyTask(130001, 170000);MyTask t4 = new MyTask(170001, 200000);Future<List<Integer>> f1 = service.submit(t1);Future<List<Integer>> f2 = service.submit(t2);Future<List<Integer>> f3 = service.submit(t3);Future<List<Integer>> f4 = service.submit(t4);start = System.currentTimeMillis();f1.get();f2.get();f3.get();f4.get();end = System.currentTimeMillis();System.out.println(end - start);}static class MyTask implements Callable<List<Integer>> {int startPos, endPos;MyTask(int s, int e) {this.startPos = s;this.endPos = e;}@Overridepublic List<Integer> call() throws Exception {List<Integer> r = getPrime(startPos, endPos);return r;}}static boolean isPrime(int num) {for(int i=2; i<=num/2; i++) {if(num % i == 0) return false;}return true;}static List<Integer> getPrime(int start, int end) {List<Integer> results = new ArrayList<>();for(int i=start; i<=end; i++) {if(isPrime(i)) results.add(i);}return results;}
}
这是一个质数计算的问题,我们把质数计算划分为不同的数据段是因为越大的质数越难计算,所以直观上计算大量的小数字的质数的时间相当于计算少量的大数字花的时间。这样一来我们就将这个大的任务相对均匀的拆分开来避免了任务分配不均匀造成的等待(也就是时间浪费)。
运行结果
3204
954
CachedThreadPool
CachedPool的主要特点就是如果新来的一个任务需要这个线程池来执行的话,如果当前线程池没有闲置的线程那么就新启动一个线程,如果有空闲线程那么就使用其中的一个空闲线程。就是这样的一个有弹性的线程池。默认情况下当一个线程空闲超过60s那么就会销毁,而且线程数量最大不能超过int类型的最大值或者是计算机内存的大小。
package demo35;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class CachedPool {public static void main(String[] args) throws InterruptedException {ExecutorService service = Executors.newCachedThreadPool();System.out.println(service);for (int i = 0; i < 2; i++) {service.execute(() -> {try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName());});}System.out.println(service);TimeUnit.SECONDS.sleep(80);System.out.println(service);}
}
运行结果
java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
pool-1-thread-1
pool-1-thread-2
java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
SingleThreadPool
这个线程池中只有一个线程,它的使用场景就是当我们需要保证任务执行的先后顺序的时候就可以使用它。
package demo35;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class MySingleThreadPool {public static void main(String[] args) {ExecutorService service = Executors.newSingleThreadExecutor();for (int i = 0; i < 5; i++) {final int j = i;service.execute(() -> {System.out.println(j + "" + Thread.currentThread().getName());});}}
}
运行结果
0pool-1-thread-1
1pool-1-thread-1
2pool-1-thread-1
3pool-1-thread-1
4pool-1-thread-1
ScheduledThreadPool
一个定时执行任务的一个线程池它所执行的任务的参数如下:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period, TimeUnit unit)
- initialDelay:在开始多少单位时间的时候执行第一个任务。
- Period:每隔多长时间执行下一个任务。
- Unit:时间的单位。
- 它的底层基于DelayedWorkQueue。
package demo35;import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** 以下代码展示了已启动就开始执行的而且步幅为0.5s的线程执行方式*/
public class MyScheduledPool {public static void main(String[] args) {ScheduledExecutorService service = Executors.newScheduledThreadPool(4);service.scheduleAtFixedRate(() -> {try {TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName());}, 0, 500, TimeUnit.MILLISECONDS);}
}
运行结果
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3....
WorkStealingPool
工作窃取线程池,一般情况下CPU是几核的就会启动几个线程,每一个线程都维护者自己的一个执行队列的,当某些线程将自己队列中的任务都执行完毕的时候就会去其他线程的队列中窃取任务来执行以此提高效率。它的底层是基于ForkJoinPool的,常常用于任务分配不均匀的场景中。
package demo35;import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class MyWorkStealingPool {public static void main(String[] args) throws IOException {ExecutorService service = Executors.newWorkStealingPool();System.out.println(Runtime.getRuntime().availableProcessors());service.execute(new R(1000));service.execute(new R(2000));service.execute(new R(2000));service.execute(new R(2000)); //daemonservice.execute(new R(2000));//由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出System.in.read();}static class R implements Runnable {int time;R(int t) {this.time = t;}@Overridepublic void run() {try {TimeUnit.MILLISECONDS.sleep(time);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(time + " " + Thread.currentThread().getName());}}
}
运行结果
4
1000 ForkJoinPool-1-worker-1
2000 ForkJoinPool-1-worker-0
2000 ForkJoinPool-1-worker-2
2000 ForkJoinPool-1-worker-3
2000 ForkJoinPool-1-worker-1
ForkJoinPool
这个线程池设计的思想就与MapReduce极其相似,将一个大的任务分解成一个个小的任务当多个线程来执行。然后将计算的结果汇总得到最终结果。这也是用到了递归的思想。其中它的任务分为两种一种没有返回值是RecursiveAction,一种有返回值RecursiveTask。常常用于大量数据的运算以下为示例代码:
package demo35;import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;public class MyForkJoinPool {static int[] nums = new int[1000000];static final int MAX_NUM = 50000;static Random r = new Random();static {for (int i = 0; i < nums.length; i++) {nums[i] = r.nextInt(100);}System.out.println(Arrays.stream(nums).sum()); //stream api}/*static class AddTask extends RecursiveAction { int start, end;AddTask(int s, int e) {start = s;end = e;}@Overrideprotected void compute() {if(end-start <= MAX_NUM) {long sum = 0L;for(int i=start; i<end; i++) sum += nums[i];System.out.println("from:" + start + " to:" + end + " = " + sum);} else {int middle = start + (end-start)/2;AddTask subTask1 = new AddTask(start, middle);AddTask subTask2 = new AddTask(middle, end);subTask1.fork();subTask2.fork();}}}*/static class AddTask extends RecursiveTask<Long> {int start, end;AddTask(int s, int e) {start = s;end = e;}@Overrideprotected Long compute() {if (end - start <= MAX_NUM) {long sum = 0L;for (int i = start; i < end; i++) sum += nums[i];return sum;}int middle = start + (end - start) / 2;AddTask subTask1 = new AddTask(start, middle);AddTask subTask2 = new AddTask(middle, end);subTask1.fork();subTask2.fork();return subTask1.join() + subTask2.join();}}public static void main(String[] args) throws IOException {ForkJoinPool fjp = new ForkJoinPool();AddTask task = new AddTask(0, nums.length);fjp.execute(task);long result = task.join();System.out.println(result);//System.in.read();}
}
运行结果
49476407
49476407
线程池的底层实现
ThreadPoolExecutor
我们会发现FixedThreadPool、CachedThreadPool、ScheduledThreadPool、SingleThreadExecutor都是基于ThreadExecutor。而WorkStealingPool与ForkJoinPool的底层都是ForkJoinPool。
FixedThreadPool
CachedThreadPool
ScheduledThreadPool
SingleThreadExecutor
参考
https://www.bilibili.com/video/av33688545?p=20
https://blog.csdn.net/qq_34993631/article/details/82713550
源代码
https://gitee.com/cckevincyh/java_concurrent_learning
Java多线程高并发编程代码笔记(四)相关推荐
- Java JUC高并发编程(一)
目录 一.概述 二.Lock接口 三.线程间的通信 解决虚假唤醒问题 Lock通信示例: 四.线程间定制化通信 一.概述 JUC就是java.util.concurrent工具包的简称,这是一个处理线 ...
- Java JUC高并发编程(三)-CallableJUC辅助类
目录 一.Callable接口 二.JUC辅助类 1.减少计数CountDownLatch 2.循环栅栏CyclicBarrier 3.信号灯Semaphore 一.Callable接口 Callab ...
- Java 多线程与并发编程专题
Java 线程基础 Java 多线程开发 线程安全与同步 并发控制 非阻塞套接字(NIO) Java 5 中的并发 JDK 7 中的 Fork/Join 模式 相关书评 Java 平台提供了一套广泛而 ...
- java书籍_还搞不定Java多线程和并发编程面试题?你可能需要这一份书单!
点击蓝色"程序员书单"关注我哟 加个"星标",每天带你读好书! 在介绍本书单之前,我想先问一下各位读者,你们之前对于Java并发编程的了解有多少呢.经过了1 ...
- 为了在简历上写掌握【Java多线程和并发编程】,做了两万字总结
文章目录 概述 继承Thread类 实现Runnable接口 实现Callable接口 线程池 线程的五大状态 多线程买票案例 死锁 Lock锁 生产者消费者问题 八锁问题 volatile 写在后面 ...
- java线程高并发编程
java线程详解及高并发编程庖丁解牛 线程概述: 祖宗: 说起java高并发编程,就不得不提起一位老先生Doug Lea,这位老先生可不得了,看看百度百科对他的评价,一点也不为过: 如果IT的历史,是 ...
- Java岗:实打实掌握[Java多线程]和[并发编程]
概述 面试中,多线程和并发编程已经是必不可少的了,我经常看到此类问题,当时也简单了解过,什么继承Thread类,实现Runnable接口,这些都被说烂了,知道这些当然是远远不够的,于是这几天搜索相关资 ...
- 基于《狂神说Java》JUC并发编程--学习笔记
前言: 本笔记仅做学习与复习使用,不存在刻意抄袭. -------------------------------------------------------------------------- ...
- Java多线程与并发编程终极宝典
阅读本文需要了解的概念 原语 所谓原语,一般是指由若干条指令组成的程序段,用来实现某个特定功能,在执行过程中不可被中断.在操作系统中,某些被进程调用的操作,如队列操作.对信号量的操作.检查启动外设操作 ...
- 啃完阿里这份高并发编程核心笔记,反手涨了 5K
高并发编程 提到并发编程很多人就会头疼了:首先就是一些基础概念:并发,并行,同步,异步,临界区,阻塞,非阻塞还有各种锁全都砸你脸上,随之而来的就是要保证程序运行时关键数据在多线程中的可见性.核心业务的 ...
最新文章
- linux 编译 编解码
- 【算法】双指针算法 ( 双指针算法分类 | 相向双指针 | 有效回文串 )
- 【转】Ubuntu 安装截图工具Shutter,并设置快捷键 Ctrl+Alt+A
- 【Linux系统编程学习】Linux进程控制原语(fork、exec函数族、wait)
- SwitchHosts提示切换hosts失败!没有修改'C:\WINDOWS\system32\drivers\etc\hosts'的权限问题
- php 删除指定html标签,php删除html标签的三种解决办法
- json转excel_如何快速把json数据转到excel表格,方便个人查看
- Android 开发之 ---- 底层驱动开发(一) 【转】
- 如何安全地迁移到Exchange 2003?
- sunny底层android,Android网络通信概述
- 如果写文档发博客,你还在用Word文档你就out了,写文档神器Markdown的前世今生来了。(持续更新,欢迎关注点赞)
- Python学习之路9☞面向对象的程序设计
- 拓端tecdat|R语言回归和主成分PCA 回归交叉验证分析预测城市犯罪率
- Python数学问题2:求100以内素数之和
- 安装旧版本gcc的简便方法,软件包管理命令update-alternatives使用
- Java实现泛型解析工具类-GenericsUtils
- 说说财务系统中的月末结账功能
- 自己封装的Socket组件,实现服务端多进程共享Socket对象,协同处理客户端请求...
- 什么牌子的蓝牙耳机性价比高质量好?高性价比降噪蓝牙耳机推荐
- 萌系外表+丰富功能,i宝机器人成CES人气展品