高级技术之 JUC 高并发编程
1,什么是 JUC
1.1 JUC简介
JUC 就是java.util.concurrent 工具包的简称,这一个专门用来处理线程的工具包,JDK1.5开始出现
1.2 进程与线程
**进程(Process):**是系统进行资源分配和调度的基本单位,比如QQ启动就是一个进程。
**线程(Thread):**是操作系统能够进行运算调度的最小单位,是程序执行的最小单位,它被包含在进程中,比如QQ中的各个功能,都是不同的线程。
1.3 线程的状态
在Thread类中枚举了线程的6种状态;
public enum State {NEW,//新建状态RUNNABLE,//准备就绪BLOCKED,//阻塞状态WAITING,//等待状态:不见不散TIMED_WAITING,//等待状态:过期不候TERMINATED;//终止态}
wait/sleep的区别:
- sleep是Thread的静态方法;wait是Object类的方法,任何对象实例都可以调用
- sleep不会释放锁,它也不需要占有锁;wait会释放锁,但调用wait方法的前提是当前线程已经占有锁(即代码要在synchronized中)
- 它们都可以被interrupted方法中断
1.4 并发和并行
- 串行:一次只能执行一个任务,只有当前任务执行完成才能执行下一个
- 并行:多个任务同时执行,最后再进行汇总;比如:泡方便面,电水壶烧水,一边放调料包
- 并发:同一时刻多个线程访问同一个资源,多个线程对一个点;例子:春运抢票,电商秒杀
1.5 管程
管程(monitor)是保证了同一时刻只有一个进程在管程内活动,JVM中同步是基于进入和退出管程对象来实现的,每一个对象都会有一个管程对象,管程对象会随着java对象一同创建和销毁
执行线程首先要持有管程对象,然后执行完后再释放管程对象,
1.6 用户线程和守护线程
**用户线程:**平时自定义的线程
**守护线程:**运行在后台,是一种特殊的线程,比如垃圾回收
当main主线程结束后,用户线程还在运行,则JVM存活
如果没有用户线程,只剩守护线程,则JVM结束
可以通过Thread的setDaemon(true)方法把用户线程变为守护线程
2,Lock接口
2.1 Synchronized
synchronized 是 Java 中的关键字,是一种同步锁。它修饰的对象有以下几种:
修饰一个代码块:被修饰的代码块称为同步语句块,其作用的范围是大括号{}
括起来的代码,作用的对象是调用这个代码块的对象;
修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用
的对象是调用这个方法的对象;
虽然可以使用 synchronized 来定义方法,但 synchronized 并不属于方法定义的一部分,因此,synchronized 关键字不能被继承。如果在父类中的某个方
法使用了 synchronized 关键字,而在子类中覆盖了这个方法,在子类中的这
个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上
synchronized 关键字才可以。当然,还可以在子类方法中调用父类中相应的方
法,这样虽然子类中的方法不是同步的,但子类调用了父类的同步方法,因此,
子类的方法也就相当于同步了。
修饰一个静态的方法,其作用的范围是整个静态方法,作用的对象是这个类的
所有对象;
修饰一个类,其作用的范围是 synchronized 后面括号括起来的部分,作用主
的对象是这个类的所有对象。
Lock和synchronized不同:
- Lock是一个接口,而synchronized是java中的一个关键字,synchronized是内置语言的实现
- synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象;但Lock在发生异常时,必须手动释放锁,很可能造成死锁现象,因此必须在finally中手动释放锁
- Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能响应中断
- Lock可以知道线程有没有成功获取到锁,而synchronized则无法办到
- Lock可以提高多个线程执行读操作的效率
在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源
非常激烈时(即有大量线程同时竞争),此时 Lock 的性能要远远优于synchronized
多线程编程步骤:
- 创建资源类,创建属性和操作方法
- 创建多线程调用资源类的方法
- 为了控制多线程执行顺序,在操作方法中:判断,干活,通知
3,线程间通信
场景—两个线程,一个线程对当前数值加 1,另一个线程对当前数值减 1,要求用线程间通信
Synchronized:
//创建资源类
class Num{private int number = 0;public synchronized void incr() throws InterruptedException {while (number != 0){ //这里一定要用while而不能用if,if会导致假唤醒状态this.wait();}number++;System.out.println(Thread.currentThread().getName()+":"+number);notifyAll();}public synchronized void decr() throws InterruptedException {while (number == 0){this.wait();}number--;System.out.println(Thread.currentThread().getName()+":"+number);notifyAll();}
}public class SynchronizedDemo {public static void main(String[] args) {Num num = new Num();new Thread(()->{for (int i = 0; i < 10; i++) {try {num.incr();} catch (InterruptedException e) {e.printStackTrace();}}},"AA").start();new Thread(()->{for (int i = 0; i < 10; i++) {try {num.decr();} catch (InterruptedException e) {e.printStackTrace();}}},"BB").start();new Thread(()->{for (int i = 0; i < 10; i++) {try {num.incr();} catch (InterruptedException e) {e.printStackTrace();}}},"CC").start();new Thread(()->{for (int i = 0; i < 10; i++) {try {num.decr();} catch (InterruptedException e) {e.printStackTrace();}}},"DD").start();}
}
Lock:
//创建资源类
class Num1{private int number = 0;private Lock lock = new ReentrantLock();private Condition condition = lock.newCondition();public void incr() throws InterruptedException {lock.lock();try {while (number != 0){condition.await();}number++;System.out.println(Thread.currentThread().getName()+":"+number);condition.signalAll();} finally {lock.unlock();}}public void decr() throws InterruptedException {lock.lock();try {while (number == 0){condition.await();}number--;System.out.println(Thread.currentThread().getName()+":"+number);condition.signalAll();} finally {lock.unlock();}}
}public class LockDemo {public static void main(String[] args) {Num1 num = new Num1();new Thread(()->{for (int i = 0; i < 10; i++) {try {num.incr();} catch (InterruptedException e) {e.printStackTrace();}}},"AA").start();new Thread(()->{for (int i = 0; i < 10; i++) {try {num.decr();} catch (InterruptedException e) {e.printStackTrace();}}},"BB").start();new Thread(()->{for (int i = 0; i < 10; i++) {try {num.incr();} catch (InterruptedException e) {e.printStackTrace();}}},"CC").start();new Thread(()->{for (int i = 0; i < 10; i++) {try {num.decr();} catch (InterruptedException e) {e.printStackTrace();}}},"DD").start();}
}
定制线程通信顺序:
1->5,1->10,1->15 执行三轮
//资源类
class Resource{private int flag = 1;//标志位private Lock lock = new ReentrantLock();private Condition c1 = lock.newCondition();private Condition c2 = lock.newCondition();private Condition c3 = lock.newCondition();public void circle5(int loop) throws InterruptedException {lock.lock();try {while (flag != 1){c1.await();}for (int i = 1; i < 6; i++) {System.out.println(Thread.currentThread().getName()+":"+i+" "+loop);}flag = 2;c2.signal();} finally {lock.unlock();}}public void circle10(int loop) throws InterruptedException {lock.lock();try {while (flag != 2){c2.await();}for (int i = 1; i < 11; i++) {System.out.println(Thread.currentThread().getName()+":"+i+" "+loop);}flag = 3;c3.signal();} finally {lock.unlock();}}public void circle15(int loop) throws InterruptedException {lock.lock();try {while (flag != 3){c3.await();}for (int i = 1; i < 16; i++) {System.out.println(Thread.currentThread().getName()+":"+i+" "+loop);}flag = 1;c1.signal();} finally {lock.unlock();}}}
public class LockDemo1 {public static void main(String[] args) {Resource resource = new Resource();new Thread(()->{for (int i = 1; i < 4; i++) {try {resource.circle5(i);} catch (InterruptedException e) {e.printStackTrace();}}},"AA").start();new Thread(()->{for (int i = 1; i < 4; i++) {try {resource.circle10(i);} catch (InterruptedException e) {e.printStackTrace();}}},"BB").start();new Thread(()->{for (int i = 1; i < 4; i++) {try {resource.circle15(i);} catch (InterruptedException e) {e.printStackTrace();}}},"CC").start();}
}
synchronized实现同步的基础:Java中的每一个对象都可以作为锁
具体表现为以下3中形式:
- 对于普通同步方法,锁时当前实例对象
- 对于静态同步方法,锁是当前类的Class对象
- 对于同步方法块,锁是Synchronized括号里配置的对象
4,集合的线程安全
线程不安全集合案例:ArrayList
/*** @Author: DiTian* @Description: 多个线程同时对集合进行修改* @Date: Created in 9:33 2021/8/24*/
public class ArrayDemo1 {public static void main(String[] args) {List list = new ArrayList();for (int i = 0; i < 10; i++) {new Thread(()->{list.add(UUID.randomUUID().toString());System.out.println(list);},"线程"+i).start();}}
}
报并发修改异常:Exception in thread “线程5” java.util.ConcurrentModificationException
原因:ArrayList的add()方法并没有Synchronized修饰,未加锁
public boolean add(E e) {ensureCapacityInternal(size + 1); // Increments modCount!!elementData[size++] = e;return true;}
解决方案:Vector,Collections,CopyOnWriteArrayList
4.1 Vector
Vector 是矢量队列,它是 JDK1.0 版本添加的类。继承于 AbstractList,实现
了 List, RandomAccess, Cloneable 这些接口。 Vector 继承了 AbstractList,
实现了 List;所以,它是一个队列,支持相关的添加、删除、修改、遍历等功
能。 Vector 实现了 RandmoAccess 接口,即提供了随机访问功能。
RandmoAccess 是 java 中用来被 List 实现,为 List 提供快速访问功能的。在
Vector 中,我们即可以通过元素的序号快速获取元素对象;这就是快速随机访
问。 Vector 实现了 Cloneable 接口,即实现 clone()函数。它能被克隆。
和ArrayList不同,Vector中的操作是线程安全的
/*** @Author: DiTian* @Description: 多个线程同时对集合进行修改* @Date: Created in 9:33 2021/8/24*/
public class ArrayDemo1 {public static void main(String[] args) {List list = new Vector();for (int i = 0; i < 10; i++) {new Thread(()->{list.add(UUID.randomUUID().toString());System.out.println(list);},"线程"+i).start();}}
}
Vector的add()方法:
public synchronized boolean add(E e) {modCount++;ensureCapacityHelper(elementCount + 1);elementData[elementCount++] = e;return true;
}
4.2 Collections
Collections 提供了方法 synchronizedList 保证 list 是同步线程安全的
/*** @Author: DiTian* @Description: 多个线程同时对集合进行修改* @Date: Created in 9:33 2021/8/24*/
public class ArrayDemo1 {public static void main(String[] args) {List list = Collections.synchronizedList(new ArrayList());for (int i = 0; i < 10; i++) {new Thread(()->{list.add(UUID.randomUUID().toString());System.out.println(list);},"线程"+i).start();}}
}
查看源码:
public static <T> List<T> synchronizedList(List<T> list) {return (list instanceof RandomAccess ?new SynchronizedRandomAccessList<>(list) :new SynchronizedList<>(list));
}
4.3 CopyOnWriteArrayList(重点)
首先我们对 CopyOnWriteArrayList 进行学习,其特点如下:
它相当于线程安全的 ArrayList。和 ArrayList 一样,它是个可变数组;但是和
ArrayList 不同的时,它具有以下特性:
\1. 它最适合于具有以下特征的应用程序:List 大小通常保持很小,只读操作远多
于可变操作,需要在遍历期间防止线程间的冲突。
\2. 它是线程安全的。
\3. 因为通常需要复制整个基础数组,所以可变操作(add()、set() 和 remove()
等等)的开销很大。
\4. 迭代器支持 hasNext(), next()等不可变操作,但不支持可变 remove()等操作。
\5. 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代
器时,迭代器依赖于不变的数组快照。
1. 独占锁效率低:采用读写分离思想解决
2. 写线程获取到锁,其他写线程阻塞
**3. 复制思想:**当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容
器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素
之后,再将原容器的引用指向新的容器。
这时候会抛出来一个新的问题,也就是数据不一致的问题。如果写线程还没来
得及写会内存,其他的线程就会读到了脏数据。
这就是 CopyOnWriteArrayList 的思想和原理。就是拷贝一份。
import java.util.concurrent.CopyOnWriteArrayList;/*** @Author: DiTian* @Description: 多个线程同时对集合进行修改* @Date: Created in 9:33 2021/8/24*/
public class ArrayDemo1 {public static void main(String[] args) {List list = new CopyOnWriteArrayList();for (int i = 0; i < 10; i++) {new Thread(()->{list.add(UUID.randomUUID().toString());System.out.println(list);},"线程"+i).start();}}
}
没有线程安全问题
原因分析(重点):动态数组与线程安全
下面从“动态数组”和“线程安全”两个方面进一步对
CopyOnWriteArrayList 的原理进行说明。
• “动态数组”机制
o 它内部有个“volatile 数组”(array)来保持数据。在“添加/修改/删除”数据
时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该
数组赋值给“volatile 数组”, 这就是它叫做 CopyOnWriteArrayList 的原因
o 由于它在“添加/修改/删除”数据时,都会新建数组,所以涉及到修改数据的
操作,CopyOnWriteArrayList 效率很低;但是单单只是进行遍历查找的话,
效率比较高。
• “线程安全”机制
o 通过 volatile 和互斥锁来实现的。
o 通过“volatile 数组”来保存数据的。一个线程读取 volatile 数组时,总能看
到其它线程对该 volatile 变量最后的写入;就这样,通过 volatile 提供了“读
取到的数据总是最新的”这个机制的保证。
o 通过互斥锁来保护数据。在“添加/修改/删除”数据时,会先“获取互斥锁”,
再修改完毕之后,先将数据更新到“volatile 数组”中,然后再“释放互斥
锁”,就达到了保护数据的目的
4.4 小结(重点)
1.线程安全与线程不安全集合
集合类型中存在线程安全与线程不安全的两种,常见例如:
ArrayList ----- Vector
HashMap -----HashTable
但是以上都是通过 synchronized 关键字实现,效率较低
2.Collections 构建的线程安全集合
3.java.util.concurrent 并发包下
CopyOnWriteArrayList CopyOnWriteArraySet 类型,通过动态数组与线程安
全个方面保证线程安全
5,多线程锁
5.1 锁的八个问题演示
class Phone{public synchronized void SMS() throws InterruptedException {TimeUnit.SECONDS.sleep(4);//等待4秒System.out.println("sendSMS!");}public synchronized void Email(){System.out.println("sendEmail!");}public void Hello(){System.out.println("sendHello!");}
}
1,标准同步方法先打印短信还是邮件
sendSMS!
sendEmail!
2,停 4 秒在短信方法内,先打印短信还是邮件
sendSMS!
sendEmail!
3 ,新增普通的 hello 方法,停 4 秒在短信方法内,是先打短信还是 hello
sendHello!
sendSMS!
sendEmail!
4,现有两个手机对象,停 4 秒在短信方法内,先打印短信还是邮件
sendEmail!
sendSMS!
5, 两个静态同步方法,1 部手机,先打印短信还是邮件
sendSMS!
sendEmail!
6, 两个静态同步方法,2 部手机,先打印短信还是邮件
sendSMS!
sendEmail!
7, 1 个静态同步方法,1 个普通同步方法,1 部手机,先打印短信还是邮件
sendEmail!
sendSMS!
8, 1 个静态同步方法,1 个普通同步方法,2 部手机,先打印短信还是邮件
sendEmail!
sendSMS!
结论:
一个对象里面如果有多个 synchronized 方法,某一个时刻内,只要一个线程去调用其中的
一个 synchronized 方法了,
其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些
synchronized 方法
锁的是当前对象 this,被锁定后,其它的线程都不能进入到当前对象的其它的
synchronized 方法
加个普通方法后发现和同步锁无关
换成两个对象后,不是同一把锁了,情况立刻变化。
synchronized 实现同步的基础:Java 中的每一个对象都可以作为锁。
具体表现为以下 3 种形式。
对于普通同步方法,锁是当前实例对象。
对于静态同步方法,锁是当前类的 Class 对象。
对于同步方法块,锁是 Synchonized 括号里配置的对象
当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。
也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方
法必须等待获取锁的方法释放锁后才能获取锁,
可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁,
所以毋须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。
所有的静态同步方法用的也是同一把锁——类对象本身,这两把锁是两个不同的对象,所
以静态同步方法与非静态同步方法之间是不会有竞态条件的。
但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才
能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同
步方法之间,只要它们同一个类的实例对象!
5.2 公平锁和非公平锁
ReentrantLock()为例:
ReentrantLock类的有参构造和无参构造:
public ReentrantLock() {sync = new NonfairSync();
}public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
}
可以看到,无参构造默认为非公平锁,可以通过有参构造来决定使用公平还是非公平
公平和非公平区别:
非公平锁:可能导致线程饿死,但效率高
公平锁:不会导致线程饿死,但效率低,会询问
5.3 可重入锁
synchronized(隐式)和Lock(显示)都是可重入锁
代码演示:
public class Demo2 {public static void main(String[] args) {Object o = new Object();new Thread(()->{synchronized (o){System.out.println(Thread.currentThread().getName()+":外层");synchronized (o){System.out.println(Thread.currentThread().getName()+":中层");synchronized (o){System.out.println(Thread.currentThread().getName()+":内层");}}}},"T").start();}
}
public class Demo2 {public static void main(String[] args) {Object o = new Object();Lock lock = new ReentrantLock();new Thread(()->{try {//加锁lock.lock();System.out.println(Thread.currentThread().getName()+":外层");try {//加锁lock.lock();System.out.println(Thread.currentThread().getName()+":中层");try {//加锁lock.lock();System.out.println(Thread.currentThread().getName()+":内层");}finally {lock.unlock();}}finally {lock.unlock();}}finally {lock.unlock();}},"T").start();}
}
执行结果:
T:外层
T:中层
T:内层
进了外层锁,里面的锁都可以进入
5.4 死锁
举例:线程A持有锁a,等待锁b;线程B持有锁b,等待锁a;
public class DeadLock {public static void main(String[] args) {Object a = new Object();Object b = new Object();new Thread(()->{synchronized (a){System.out.println(Thread.currentThread().getName()+"持有锁a,请求锁b");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}synchronized (b){System.out.println(Thread.currentThread().getName()+"获取到锁b");}}},"A").start();new Thread(()->{synchronized (b){System.out.println(Thread.currentThread().getName()+"持有锁b,请求锁a");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}synchronized (a){System.out.println(Thread.currentThread().getName()+"获取到锁a");}}},"B").start();}
}
执行结果:
A持有锁a,请求锁b
B持有锁b,请求锁a
死锁产生原因:
- 资源不足
- 线程推进顺序不当
- 资源分配不当
验证是否产生死锁:
1,jps -l(查看运行的进程,类似linux ps -ef) 2,jstack [pid] (jvm自带的堆栈跟踪工具)
jps -l 命令如果无效,环境变量配置:%JAVA_HOME%\bin; %JAVA_HOME%\jre\bin;
6, Callable&Future 接口
创建线程池的四种方法:
- 继承Thread类
- 实现Runnable接口
- 实现Callable接口
- 线程池创建
6.1 Callable接口
Runnable接口和Callable接口区别:
- Runnable接口重写run()方法;Callable接口重写call()方法
- run()方法没有返回值;call()方法有返回值,当计算不出返回值时,报异常
- 不能用Callable直接替换Runnable,因为Thread类的构造方法根本没有Callable
创建新类 MyThread 实现 runnable 接口
class MyThread implements Runnable{@Override
public void run() {}
}
新类 MyThread2 实现 callable 接口
class MyThread2 implements Callable<Integer>{@Override
public Integer call() throws Exception {return 200; }
}
6.2 FutureTast
不能用Callable直接替换Runnable,但又想要返回结果,所以可以通过FutrueTask来实现
FutrueTask是Runnable接口的实现类,FutrueTask的构造函数中用到了Callable
FutrueTask原理:
在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些
作业交给 Future 对象在后台完成
• 当主线程将来需要时,就可以通过 Future 对象获得后台作业的计算结果或者执
行状态
• 一般 FutureTask 多用于耗时的计算,主线程可以在完成自己的任务后,再去
获取结果。
• 仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法
• 一旦计算完成,就不能再重新开始或取消计算
• get 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完
成状态,然后会返回结果或者抛出异常
• get 只计算一次,因此 get 方法放到最后
案例:
public class CallableDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask futureTask = new FutureTask(()->{System.out.println(Thread.currentThread().getName()+":进入Call方法!");return 200;});new Thread(futureTask,"T1").start();while (!futureTask.isDone()){System.out.println("waiting......");}System.out.println("返回结果:"+futureTask.get());System.out.println("返回结果:"+futureTask.get());}
}
7, JUC 三大辅助类
JUC 中提供了三种常用的辅助类,通过这些辅助类可以很好的解决线程数量过
多时 Lock 锁的频繁操作。这三种辅助类为:
• CountDownLatch: 减少计数
• CyclicBarrier: 循环栅栏
• Semaphore: 信号灯
7.1 减少计数 CountDownLatch
CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行
减 1 的操作,使用 await 方法等待计数器不大于 0,然后继续执行 await 方法
之后的语句。
• CountDownLatch 主要有两个方法,当一个或多个线程调用 await 方法时,这
些线程会阻塞
• 其它线程调用 countDown 方法会将计数器减 1(调用 countDown 方法的线程
不会阻塞)
• 当计数器的值变为 0 时,因 await 方法阻塞的线程会被唤醒,继续执行
场景: 6 个同学陆续离开教室后值班同学才可以关门
public class CountDownLatchDemo {public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(6); //初始化计数器值for (int i = 1; i <= 6; i++) {new Thread(()->{System.out.println(Thread.currentThread().getName()+"离开了教室!");countDownLatch.countDown(); //计数器减一},String.valueOf(i)).start();}countDownLatch.await(); //当计数器大于0时,线程阻塞System.out.println("教室关门!");}
}
7.2 循环栅栏 CyclicBarrier
CyclicBarrier 看英文单词可以看出大概就是循环阻塞的意思,在使用中
CyclicBarrier 的构造方法第一个参数是目标障碍数,每次执行 CyclicBarrier 一
次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后
的语句。可以将 CyclicBarrier 理解为加 1 操作
场景: 集齐7个龙珠才可以召唤神龙
public class CyclicBarrierDemo {private static final int NUMBER = 7;//集齐7颗龙族才可以召唤神龙public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> {System.out.println("集齐7颗龙珠召唤神龙!");});for (int i = 1; i <= 7; i++) {new Thread(()->{try {System.out.println("集齐第"+Thread.currentThread().getName()+"颗龙珠!");cyclicBarrier.await();//龙珠加1,当为7时执行Run方法} catch (Exception e) {e.printStackTrace();}},String.valueOf(i)).start();}}
}
7.3 信号灯 Semaphore
Semaphore 的构造方法中传入的第一个参数是最大信号量(可以看成最大线
程池),每个信号量初始化为一个最多只能分发一个许可证。使用 acquire 方
法获得许可证,release 方法释放许可
场景: 抢车位, 6 部汽车 3 个停车位
public class SemaphoreDemo {public static void main(String[] args) {Semaphore semaphore = new Semaphore(3);//共3个停车位for (int i = 1; i <= 6; i++) {new Thread(()->{try {semaphore.acquire();System.out.println(Thread.currentThread().getName()+"----获取车位!");TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName()+"----离开车位!");} catch (InterruptedException e) {e.printStackTrace();}finally {semaphore.release();}},String.valueOf(i)).start();}}
}
8, 读写锁
8.1 读写锁介绍
现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那
么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以
应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,
就不应该允许其他线程对该资源进行读和写的操作了。
针对这种场景,JAVA 的并发包提供了读写锁 ReentrantReadWriteLock,
它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称
为排他锁或独占锁
- 线程进入读锁的前提条件:
• 没有其他线程的写锁
• 没有写请求, 或者有写请求,但调用线程和持有锁的线程是同一个(可重入
锁)。
- 线程进入写锁的前提条件:
• 没有其他线程的读锁
• 没有其他线程的写锁
而读写锁有以下三个重要的特性:
(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公
平优于公平。
(2)重进入:读锁和写锁都支持线程重进入。
(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为
读锁。
8.2 入门案例
场景: 使用 ReentrantReadWriteLock 对一个 hashmap 进行读和写操作
//创建资源类
class MyCache{private volatile Map map = new HashMap<String,Object>();private ReadWriteLock rwLock = new ReentrantReadWriteLock();//写入数据public void put(String key,Object value){//加写锁rwLock.writeLock().lock();try {System.out.println(Thread.currentThread().getName()+"正在写入数据"+key);//暂停一会TimeUnit.MILLISECONDS.sleep(200);//写数据map.put(key,value);} catch (Exception e) {e.printStackTrace();} finally {System.out.println(Thread.currentThread().getName()+"写入完成");//释放写锁rwLock.writeLock().unlock();}}//读取数据public Object get(String key){Object result = null;//加上读锁rwLock.readLock().lock();try {System.out.println(Thread.currentThread().getName()+"正在读取数据"+key);//暂停一会TimeUnit.MILLISECONDS.sleep(200);//读数据result = map.get(key);} catch (Exception e) {e.printStackTrace();} finally {System.out.println(Thread.currentThread().getName()+"读取完成");//释放写锁rwLock.readLock().unlock();}return result;}}
public class ReadWriterDemo {public static void main(String[] args) {MyCache myCache = new MyCache();//分别创建5个读写线程for (int i = 1; i <= 5; i++) {final int num = i;new Thread(()->{myCache.put(num+"",num+"");},String.valueOf(i)).start();}for (int i = 1; i <= 5; i++) {final int num = i;new Thread(()->{myCache.get(num+"");},String.valueOf(i)).start();}}
8.3 小结(重要)
• 在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。
• 在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。
原因: 当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。
9, 阻塞队列
9.1 BlockingQueue 简介
Concurrent 包中,BlockingQueue 很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了 BlockingQueue 家庭中的所有成员,包括他们各自的功能以及常见使用场景。
阻塞队列,顾名思义,首先它是一个队列, 通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;
当队列是空的,从队列中获取元素的操作将会被阻塞
当队列是满的,从队列中添加元素的操作将会被阻塞
试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
常用的队列主要有以下两种:
• 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性
• 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件(栈)在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起为什么需要 BlockingQueue好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切
BlockingQueue 都给你一手包办了在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和 “消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。
• 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列
• 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒
9.2 BlockingQueue 核心方法
BlockingQueue 的核心方法:
1.放入数据
• offer(anObject):表示如果可能的话,将 anObject 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则返回 false.(本方法不阻塞当前执行方法的线程)
• offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入 BlockingQueue,则返回失败
• put(anObject):把 anObject 加到 BlockingQueue 里,如果 BlockQueue 没有空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续.
2.获取数据
• poll(time): 取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等time 参数规定的时间,取不到时返回 null
• poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
• take(): 取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到BlockingQueue 有新的数据被加入;
• drainTo(): 一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
9.3 入门案例
public class BlockingQueueDemo {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);//基于数组的阻塞队列定长为3//第一组/*System.out.println(blockingQueue.add("a"));System.out.println(blockingQueue.add("b"));System.out.println(blockingQueue.add("c"));System.out.println(blockingQueue.add("d")); //队列已满,继续添加报异常System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove()); //队列已空,继续取报异常*///第二组/*System.out.println(blockingQueue.offer("a"));System.out.println(blockingQueue.offer("b"));System.out.println(blockingQueue.offer("c"));System.out.println(blockingQueue.offer("d")); //falseSystem.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll()); //null*///第三组/*blockingQueue.put("a");blockingQueue.put("b");blockingQueue.put("c");blockingQueue.put("d"); //队列已满,一直阻塞System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take()); //队列已空,一直阻塞*///第四组System.out.println(blockingQueue.offer("a"));System.out.println(blockingQueue.offer("b"));System.out.println(blockingQueue.offer("c"));System.out.println(blockingQueue.offer("d",2, TimeUnit.SECONDS)); //等待2秒System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll()); //null}
}
9.4 常见的 BlockingQueue
9.4.1 ArrayBlockingQueue(常用)
基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue 完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea 之所以没这样去做,也许是因为 ArrayBlockingQueue 的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue 和LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于
GC 的影响还是存在一定的区别。而在创建 ArrayBlockingQueue 时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
一句话总结: 由数组结构组成的有界阻塞队列。
9.4.2 LinkedBlockingQueue(常用)
基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue 可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
ArrayBlockingQueue 和 LinkedBlockingQueue 是两个最普通也是最常用
的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个
类足以。
一句话总结: 由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。
9.4.3 DelayQueue
DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
一句话总结: 使用优先级队列实现的延迟无界阻塞队列。
9.4.4 PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来决定),但需要注意的是 PriorityBlockingQueue 并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。
因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。
在实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是公平锁。
一句话总结: 支持优先级排序的无界阻塞队列。
9.4.5 SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。声明一个 SynchronousQueue 有两种不同的方式,它们之间有着不太一样的行为。
公平模式和非公平模式的区别:
• 公平模式:SynchronousQueue 会采用公平锁,并配合一个 FIFO 队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
• 非公平模式(SynchronousQueue 默认):SynchronousQueue 采用非公平锁,同时配合一个 LIFO 队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。
一句话总结: 不存储元素的阻塞队列,也即单个元素的队列。
9.4.6 LinkedTransferQueue
LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和transfer 方法。LinkedTransferQueue 采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为 null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。
一句话总结: 由链表组成的无界阻塞队列。
9.4.7 LinkedBlockingDeque
LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。对于一些指定的操作,在插入或者获取队列元素时如果队列状态不允许该操作可能会阻塞住该线程直到队列状态变更为允许操作,这里的阻塞一般有两种情况
• 插入元素时: 如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再讲该元素插入,该操作可以通过设置超时参数,超时后返回 false 表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出InterruptedException 异 常
• 读取元素时: 如果当前队列为空会阻塞住直到队列不为空然后返回元素,同样可以通过设置超时参数
一句话总结: 由链表组成的双向阻塞队列
9.5 小结
1. 在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
2. 为什么需要 BlockingQueue?
在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。使用后我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了
10, ThreadPool 线程池
10.1 线程池简介
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
例子: 10 年前单核 CPU 电脑,假的多线程,像马戏团小丑玩多个球,CPU 需要来回切换。 现在是多核电脑,多个线程各自跑在独立的 CPU 上,不用切换效率高。
线程池的优势: 线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
它的主要特点为:
• 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
• 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
• 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资
源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
• Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors,ExecutorService,ThreadPoolExecutor 这几个类
10.2 线程池参数说明
本次介绍 5 种类型的线程池
10.2.1 常用参数(重点)
• corePoolSize 线程池的核心线程数
• maximumPoolSize 能容纳的最大线程数
• keepAliveTime 空闲线程存活时间
• unit 存活的时间单位
• workQueue 存放提交但未执行任务的队列
• threadFactory 创建线程的工厂类
• handler 等待队列满后的拒绝策略
线程池中,有三个重要的参数,决定影响了拒绝策略:
corePoolSize - 核心线程数,也即最小的线程数。
workQueue - 阻塞队列 。
maximumPoolSize -最大线程数当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。
总结起来,也就是一句话,当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。
10.2.2 拒绝策略(重点)
CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
AbortPolicy: 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
DiscardPolicy: 直接丢弃,其他啥都没有
DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
10.3 线程池的种类与创建
10.3.1 newCachedThreadPool(常用)
作用:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程.
特点:
• 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
• 线程池中的线程可进行缓存重复利用和回收(回收默认时间为 1 分钟)
• 当线程池中,没有可用线程,会重新创建一个线程
场景: 适用于创建一个可无限扩大的线程池,服务器负载压力较轻,执行时间较短,任务多的场景
10.3.2 newFixedThreadPool(常用)
作用:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。
特征:
• 线程池中的线程处于一定的量,可以很好的控制线程的并发量
• 线程可以重复被使用,在显示关闭之前,都将一直存在
• 超出一定量的线程被提交时候需在队列中等待
场景: 适用于可以预测线程数量的业务中,或者服务器负载较重,对线程数有严格限制的场景
10.3.3 newSingleThreadExecutor(常用)
作用:创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的newFixedThreadPool 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。
特征: 线程池中最多执行 1 个线程,之后提交的线程活动将会排在队列中以此执行
场景: 适用于需要保证顺序执行各个任务,并且在任意时间点,不会同时有多个线程的场景
public class ThreadPoolDemo {public static void main(String[] args) {ExecutorService executorService1 = Executors.newFixedThreadPool(5);//ExecutorService executorService2 = Executors.newSingleThreadExecutor();//ExecutorService executorService3 = Executors.newCachedThreadPool();for (int i = 1; i <=10 ; i++) {executorService1.execute(()->{System.out.println(Thread.currentThread().getName()+"执行!");});}}
}
10.3.4 newScheduleThreadPool(了解)
作用: 线程池支持定时以及周期性执行任务,创建一个 corePoolSize 为传入参
数,最大线程数为整形的最大数的线程池**
特征:
(1)线程池中具有指定数量的线程,即便是空线程也将保留 (2)可定时或者延迟执行线程活动
场景: 适用于需要多个后台线程执行周期任务的场景
10.3.5 newWorkStealingPool
jdk1.8 提供的线程池,底层使用的是 ForkJoinPool 实现,创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用 cpu 核数的线程来并行执行任务
场景: 适用于大耗时,可并行执行的场景
10.4 线程池底层工作原理(重要)
在创建了线程池后,线程池中的线程数为零
当调用 execute()方法添加一个请求任务时,线程池会做出如下判断:
2.1 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
2.2 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
2.3 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
2.4 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
当一个线程完成任务时,它会从队列中取下一个任务来执行
当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
4.1 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。
4.2 所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
10.5 注意事项(重要)
项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都有一定问题,原因是 FixedThreadPool 和 SingleThreadExecutor 底层都是用LinkedBlockingQueue 实现的,这个队列最大长度为 Integer.MAX_VALUE,容易导致 OOM。所以实际生产一般自己通过 ThreadPoolExecutor 的 7 个参数,自定义线程池
创建线程池推荐适用 ThreadPoolExecutor 及其 7 个参数手动创建
o corePoolSize 线程池的核心线程数
o maximumPoolSize 能容纳的最大线程数
o keepAliveTime 空闲线程存活时间
o unit 存活的时间单位
o workQueue 存放提交但未执行任务的队列
o threadFactory 创建线程的工厂类
o handler 等待队列满后的拒绝策略
阿里巴巴不允许 Executors.的方式手动创建线程池,而是通过ThreadPoollExecutor的方式
//自定义线程池创建
public class ThreadPoolDemo1 {public static void main(String[] args) {//自定义线程池ExecutorService threadPool = new ThreadPoolExecutor(3,5,2,TimeUnit.SECONDS,new ArrayBlockingQueue(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());for (int i = 1; i <=10 ; i++) {threadPool.execute(()->{System.out.println(Thread.currentThread().getName()+"执行!");});}}
}
11, Fork/Join
Fork/Join 它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子
任务结果合并成最后的计算结果,并进行输出。Fork/Join 框架要完成两件事
情:
Fork:把一个复杂任务进行分拆,大事化小
Join:把分拆任务的结果进行合并
场景: 生成一个计算任务,计算 1+2+3…+100,每 10个数切分一个子任务
//递归累加
class MyTask extends RecursiveTask<Long>{private int begin;private int end;private long result;public MyTask(int begin, int end) {this.begin = begin;this.end = end;}@Overrideprotected Long compute() {if ((end-begin)<=10){ //10个为一组进行计算for (int i = begin; i <= end; i++) {//累加result+=begin;}}else {//切分为2部分,递归计算int middle = (begin+end)/2;MyTask myTask01 = new MyTask(begin, middle);MyTask myTask02 = new MyTask(middle+1, end);//执行异步myTask01.fork();myTask02.fork();//同步阻塞获取结果result = myTask01.join()+myTask02.join();}//计算完返回return result;}
}
public class ForkJoinDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//定义任务MyTask myTask = new MyTask(1, 100);//定义执行对象ForkJoinPool forkJoinPool = new ForkJoinPool();//加入任务执行ForkJoinTask<Long> result = forkJoinPool.submit(myTask);//输出结果System.out.println(result.get());forkJoinPool.shutdown();}
}
12 ,CompletableFuture
CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。CompletableFuture 实现了 Future, CompletionStage 接口,实现了 Future接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture 类。
场景:主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们在一个子线程中使其终止。
/**
* 主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们
在一个子线程中使其终止
*/
public class CompletableFutureDemo {public static void main(String[] args) throws Exception{CompletableFuture<String> future = new CompletableFuture<>();new Thread(() -> {try{System.out.println(Thread.currentThread().getName() + "子线程开始干活");//子线程睡 5 秒Thread.sleep(5000);//在子线程中完成主线程future.complete("success");}catch (Exception e){e.printStackTrace();}}, "A").start();//主线程调用 get 方法阻塞System.out.println("主线程调用 get 方法获取结果为: " + future.get());System.out.println("主线程完成,阻塞结束!!!!!!");}
}
public class CompletableFutureDemo {public static void main(String[] args) throws Exception{//运行一个没有返回值的异步任务CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {System.out.println("子线程启动干活");Thread.sleep(5000);System.out.println("子线程完成");} catch (Exception e) {e.printStackTrace();}});//主线程阻塞future.get();System.out.println("主线程结束");//运行一个有返回值的异步任务CompletableFuture<String> future1 =CompletableFuture.supplyAsync(() -> {try {System.out.println("子线程开始任务");Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();}return "子线程完成了!";});//主线程阻塞String s = future1.get();System.out.println("主线程结束, 子线程的结果为:" + s);}
}
ic void main(String[] args) throws Exception{
CompletableFuture future = new CompletableFuture<>();
new Thread(() -> {
try{
System.out.println(Thread.currentThread().getName() + “子线程开始干活”);
//子线程睡 5 秒
Thread.sleep(5000);
//在子线程中完成主线程
future.complete(“success”);
}catch (Exception e){
e.printStackTrace();
}
}, “A”).start();
//主线程调用 get 方法阻塞
System.out.println("主线程调用 get 方法获取结果为: " + future.get());
System.out.println(“主线程完成,阻塞结束!!!”);
}
}
```java
public class CompletableFutureDemo {public static void main(String[] args) throws Exception{//运行一个没有返回值的异步任务CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {System.out.println("子线程启动干活");Thread.sleep(5000);System.out.println("子线程完成");} catch (Exception e) {e.printStackTrace();}});//主线程阻塞future.get();System.out.println("主线程结束");//运行一个有返回值的异步任务CompletableFuture<String> future1 =CompletableFuture.supplyAsync(() -> {try {System.out.println("子线程开始任务");Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();}return "子线程完成了!";});//主线程阻塞String s = future1.get();System.out.println("主线程结束, 子线程的结果为:" + s);}
}
高级技术之 JUC 高并发编程相关推荐
- 《尚硅谷高级技术之JUC高并发编程》学习笔记11—— Fork / Join 框架
文章目录 Fork / Join 框架简介 fork() 方法 join() 方法 Fork / Join 框架的异常处理 入门案例 总结 Fork / Join 框架简介 Fork / Join 它 ...
- Java JUC高并发编程(三)-CallableJUC辅助类
目录 一.Callable接口 二.JUC辅助类 1.减少计数CountDownLatch 2.循环栅栏CyclicBarrier 3.信号灯Semaphore 一.Callable接口 Callab ...
- Java JUC高并发编程(一)
目录 一.概述 二.Lock接口 三.线程间的通信 解决虚假唤醒问题 Lock通信示例: 四.线程间定制化通信 一.概述 JUC就是java.util.concurrent工具包的简称,这是一个处理线 ...
- 【JUC高并发编程】—— 初见JUC
一.JUC 概述 什么是JUC JUC 是 Java并发编程的缩写,指的是 Java.util.concurrent 即Java工具集下的并发编程库 [说白了就是处理线程的工具包] JUC提供了一套并 ...
- 【JUC高并发编程】—— 再见JUC
一.读写锁 读写锁概述 1️⃣ 什么是读写锁? 读写锁是一种多线程同步机制,用于在多线程环境中保护共享资源的访问 与互斥锁不同的是,读写锁允许多个线程同时读取共享资源,但在有线程请求写操作时,必须将其 ...
- JUC高并发编程从入门到精通(全)
目录 前言 1. Lock接口 1.1 复习synchronized 1.2 Lock接口 1.3 Lock方法 1.4 两者差异 2. 线程间通信 2.1 synchronized案例 2.2 Lo ...
- 详解JUC高并发编程
JUC并发编程 并发编程的本质:充分利用CPU的资源 问题:JAVA可以开启线程吗? 不能:最底层是本地方法,底层是C++,Java无法直接操作硬件 1.线程里有几个状态 NEW, 新生 RUNN ...
- JUC 高并发编程(13):LockSupport 概述, wait 与 sleep与park的区别
LockSupport LockSupport 是一个线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,阻塞之后也有对应的唤醒方法.归根结底,LockSupport调用的Unsafe中 ...
- 高并发编程-重新认识Java内存模型(JMM)
文章目录 从CPU到内存模型 内存模型如何确保缓存一致性 并发变成需要解决的问题 (原子性.可见性.有序性) 内存模型需要解决的问题 Java内存模型 JMM的API实现 原子性 synchroniz ...
最新文章
- linux mysql设置开机启动脚本_linux下添加oracle自启动脚本
- CentOS下安装JDK的三种方法
- Ubuntu 10.04下SSH配置
- maven 构建java项目,jdk版本问题
- 如何编写 Cloud9 JavaScript IDE 的功能扩展
- 安卓逆向_14 --- 单机和弱联网游戏内购 突破口 和 思路
- CentOs 虚拟LINUX系统安装与虚拟环境配置
- 《软件工程导论》考研复习
- Android动态设置view的大小及其位置
- 用ggplot2进行直线回归并添加回归方程和方差分析表
- 信息系统项目管理师学习笔记10-项目沟通管理和干系人管理
- 无监督图像分类《SCAN:Learning to Classify Images without》代码分析笔记(1):simclr
- python永久配置pip下载镜像源方法(window版本)
- javaScript打气球小游戏
- 代码随想录算法训练营第九天
- Python re模块 (正则表达式用法剖析详解)
- no-repeat失效
- 网络错误:windows无法访问\\,你没有权限访问\\
- PS快速调出天蓝色清新外景
- 苹果手机的siri在哪里_苹果手机Siri功能的设置和使用 原来是这样的