1、一个示例回顾Future

一些业务场景我们需要使用多线程异步执行任务,加快任务执行速度。

JDK5新增了Future接口,用于描述一个异步计算的结果。

虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,我们必须使用Future.get()的方式阻塞调用线程,或者使用轮询方式判断 Future.isDone 任务是否结束,再获取结果。

这两种处理方式都不是很优雅,相关代码如下:

    @Testpublic void testFuture() throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(5);Future<String> future = executorService.submit(() -> {Thread.sleep(2000);return "hello";});System.out.println(future.get());System.out.println("end");}

与此同时,Future无法解决多个异步任务需要相互依赖的场景,简单点说就是,主线程需要等待子线程任务执行完毕之后在进行执行,这个时候你可能想到了「CountDownLatch」,没错确实可以解决,代码如下。

这里定义两个Future,第一个通过用户id获取用户信息,第二个通过商品id获取商品信息。

    @Testpublic void testCountDownLatch() throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newFixedThreadPool(5);CountDownLatch downLatch = new CountDownLatch(2);long startTime = System.currentTimeMillis();Future<String> userFuture = executorService.submit(() -> {//模拟查询商品耗时500毫秒Thread.sleep(500);downLatch.countDown();return "用户A";});Future<String> goodsFuture = executorService.submit(() -> {//模拟查询商品耗时500毫秒Thread.sleep(400);downLatch.countDown();return "商品A";});downLatch.await();//模拟主程序耗时时间Thread.sleep(600);System.out.println("获取用户信息:" + userFuture.get());System.out.println("获取商品信息:" + goodsFuture.get());System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");}

「运行结果」

获取用户信息:用户A
获取商品信息:商品A
总共用时1110ms

从运行结果可以看出结果都已经获取,而且如果我们不用异步操作,执行时间应该是:500+400+600 = 1500,用异步操作后实际只用1110。

但是Java8以后我不在认为这是一种优雅的解决方式,接下来来了解下CompletableFuture的使用。

2、通过CompletableFuture实现上面示例

    @Testpublic void testCompletableInfo() throws InterruptedException, ExecutionException {long startTime = System.currentTimeMillis();//调用用户服务获取用户基本信息CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() ->//模拟查询商品耗时500毫秒{try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}return "用户A";});//调用商品服务获取商品基本信息CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->//模拟查询商品耗时500毫秒{try {Thread.sleep(400);} catch (InterruptedException e) {e.printStackTrace();}return "商品A";});System.out.println("获取用户信息:" + userFuture.get());System.out.println("获取商品信息:" + goodsFuture.get());//模拟主程序耗时时间Thread.sleep(600);System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");}

运行结果

获取用户信息:用户A
获取商品信息:商品A
总共用时1112ms

通过CompletableFuture可以很轻松的实现CountDownLatch的功能,你以为这就结束了,远远不止,CompletableFuture比这要强多了。

比如可以实现:任务1执行完了再执行任务2,甚至任务1执行的结果,作为任务2的入参数等等强大功能,下面就来学学CompletableFuture的API。

3、CompletableFuture创建方式

3.1、常用的4种创建方式

CompletableFuture源码中有四个静态方法用来执行异步任务

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
public static CompletableFuture<Void> runAsync(Runnable runnable){..}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}

一般我们用上面的静态方法来创建CompletableFuture,这里也解释下他们的区别:

  • 「supplyAsync」执行任务,支持返回值。

  • 「runAsync」执行任务,没有返回值。

3.1.1、「supplyAsync方法」

//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//自定义线程,根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

3.1.2、「runAsync方法」

//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable)
//自定义线程,根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)

3.2、结果获取的4种方式

对于结果的获取CompltableFuture类提供了四种方式

//方式一
public T get()
//方式二
public T get(long timeout, TimeUnit unit)
//方式三
public T getNow(T valueIfAbsent)
//方式四
public T join()

说明:

  • 「get()和get(long timeout, TimeUnit unit)」 => 在Future中就已经提供了,后者提供超时处理,如果在指定时间内未获取结果将抛出超时异常

  • 「getNow」 => 立即获取结果不阻塞,结果计算已完成将返回结果或计算过程中的异常,如果未计算完成将返回设定的valueIfAbsent值

  • 「join」 => 方法里不会抛出异常

示例

    @Testpublic void testCompletableGet() throws InterruptedException, ExecutionException {CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "商品A";});// getNow方法测试 System.out.println(cp1.getNow("商品B"));//join方法测试 CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((() -> 1 / 0));System.out.println(cp2.join());System.out.println("-----------------------------------------------------");//get方法测试CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((() -> 1 / 0));System.out.println(cp3.get());}

「运行结果」:

  • 第一个执行结果为 「商品B」,因为要先睡上1秒结果不能立即获取

  • join方法获取结果方法里不会抛异常,但是执行结果会抛异常,抛出的异常为CompletionException

  • get方法获取结果方法里将抛出异常,执行结果抛出的异常为ExecutionException

4、异步回调方法

4.1、thenRun/thenRunAsync

通俗点讲就是,「做完第一个任务后,再做第二个任务,第二个任务也没有返回值」

示例

    @Testpublic void testCompletableThenRunAsync() throws InterruptedException, ExecutionException {long startTime = System.currentTimeMillis();CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> {try {//执行任务AThread.sleep(600);} catch (InterruptedException e) {e.printStackTrace();}});CompletableFuture<Void> cp2 = cp1.thenRun(() -> {try {//执行任务BThread.sleep(400);} catch (InterruptedException e) {e.printStackTrace();}});// get方法测试System.out.println(cp2.get());//模拟主程序耗时时间Thread.sleep(600);System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");}//运行结果/***  null*  总共用时1610ms*/

「thenRun 和thenRunAsync有什么区别呢?」

如果你执行第一个任务的时候,传入了一个自定义线程池:

  • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。

  • 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。

说明: 后面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个。

4.2、thenAccept/thenAcceptAsync

第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。

示例

    @Testpublic void testCompletableThenAccept() throws ExecutionException, InterruptedException {long startTime = System.currentTimeMillis();CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {return "dev";});CompletableFuture<Void> cp2 = cp1.thenAccept((a) -> {System.out.println("上一个任务的返回结果为: " + a);});cp2.get();}

4.3、 thenApply/thenApplyAsync

表示第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

示例

    @Testpublic void testCompletableThenApply() throws ExecutionException, InterruptedException {CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {return "dev";}).thenApply((a) -> {if (Objects.equals(a, "dev")) {return "dev";}return "prod";});System.out.println("当前环境为:" + cp1.get());//输出: 当前环境为:dev}

5、异常回调

当CompletableFuture的任务不论是正常完成还是出现异常它都会调用「whenComplete」这回调函数。

  • 「正常完成」:whenComplete返回结果和上级任务一致,异常为null;

  • 「出现异常」:whenComplete返回结果为null,异常为上级任务的异常;

即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要你处理该异常。

下面来看看示例

5.1、只用whenComplete

    @Testpublic void testCompletableWhenComplete() throws ExecutionException, InterruptedException {CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) {throw new RuntimeException("出错了");}System.out.println("正常结束");return 0.11;}).whenComplete((aDouble, throwable) -> {if (aDouble == null) {System.out.println("whenComplete aDouble is null");} else {System.out.println("whenComplete aDouble is " + aDouble);}if (throwable == null) {System.out.println("whenComplete throwable is null");} else {System.out.println("whenComplete throwable is " + throwable.getMessage());}});System.out.println("最终返回的结果 = " + future.get());}

正常完成,没有异常时:

正常结束
whenComplete aDouble is 0.11
whenComplete throwable is null
最终返回的结果 = 0.11

出现异常时:get()会抛出异常

whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出错了java.util.concurrent.ExecutionException: java.lang.RuntimeException: 出错了at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)

5.2、whenComplete + exceptionally示例

    @Testpublic void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException {CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) {throw new RuntimeException("出错了");}System.out.println("正常结束");return 0.11;}).whenComplete((aDouble, throwable) -> {if (aDouble == null) {System.out.println("whenComplete aDouble is null");} else {System.out.println("whenComplete aDouble is " + aDouble);}if (throwable == null) {System.out.println("whenComplete throwable is null");} else {System.out.println("whenComplete throwable is " + throwable.getMessage());}}).exceptionally((throwable) -> {System.out.println("exceptionally中异常:" + throwable.getMessage());return 0.0;});System.out.println("最终返回的结果 = " + future.get());}

当出现异常时,exceptionally中会捕获该异常,给出默认返回值0.0。

whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出错了
exceptionally中异常:java.lang.RuntimeException: 出错了
最终返回的结果 = 0.0

6、多任务组合回调

6.1、AND组合关系

thenCombine / thenAcceptBoth / runAfterBoth都表示:「当任务一和任务二都完成再执行任务三」

区别在于:

  • 「runAfterBoth」 不会把执行结果当做方法入参,且没有返回值

  • 「thenAcceptBoth」: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值

  • 「thenCombine」:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值

示例

    @Testpublic void testCompletableThenCombine() throws ExecutionException, InterruptedException {//创建线程池ExecutorService executorService = Executors.newFixedThreadPool(10);//开启异步任务1CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());int result = 1 + 1;System.out.println("异步任务1结束");return result;}, executorService);//开启异步任务2CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());int result = 1 + 1;System.out.println("异步任务2结束");return result;}, executorService);//任务组合CompletableFuture<Integer> task3 = task.thenCombineAsync(task2, (f1, f2) -> {System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId());System.out.println("任务1返回值:" + f1);System.out.println("任务2返回值:" + f2);return f1 + f2;}, executorService);Integer res = task3.get();System.out.println("最终结果:" + res);}

「运行结果」

异步任务1,当前线程是:17
异步任务1结束
异步任务2,当前线程是:18
异步任务2结束
执行任务3,当前线程是:19
任务1返回值:2
任务2返回值:2
最终结果:4

6.2、OR组合关系

applyToEither / acceptEither / runAfterEither 都表示:「两个任务,只要有一个任务完成,就执行任务三」

区别在于:

  • 「runAfterEither」:不会把执行结果当做方法入参,且没有返回值

  • 「acceptEither」: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值

  • 「applyToEither」:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值

示例

    @Testpublic void testCompletableEitherAsync() {//创建线程池ExecutorService executorService = Executors.newFixedThreadPool(10);//开启异步任务1CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());int result = 1 + 1;System.out.println("异步任务1结束");return result;}, executorService);//开启异步任务2CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());int result = 1 + 2;try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("异步任务2结束");return result;}, executorService);//任务组合task.acceptEitherAsync(task2, (res) -> {System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId());System.out.println("上一个任务的结果为:" + res);}, executorService);}

运行结果

//通过结果可以看出,异步任务2都没有执行结束,任务3获取的也是1的执行结果
异步任务1,当前线程是:17
异步任务1结束
异步任务2,当前线程是:18
执行任务3,当前线程是:19
上一个任务的结果为:2

注意

如果把上面的核心线程数改为1也就是

 ExecutorService executorService = Executors.newFixedThreadPool(1);

运行结果就是下面的了,会发现根本没有执行任务3,显然是任务3直接被丢弃了。

异步任务1,当前线程是:17
异步任务1结束
异步任务2,当前线程是:17

6.3、多任务组合

  • 「allOf」:等待所有任务完成

  • 「anyOf」:只要有一个任务完成

示例

allOf:等待所有任务完成

    @Testpublic void testCompletableAallOf() throws ExecutionException, InterruptedException {//创建线程池ExecutorService executorService = Executors.newFixedThreadPool(10);//开启异步任务1CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());int result = 1 + 1;System.out.println("异步任务1结束");return result;}, executorService);//开启异步任务2CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());int result = 1 + 2;try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("异步任务2结束");return result;}, executorService);//开启异步任务3CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {System.out.println("异步任务3,当前线程是:" + Thread.currentThread().getId());int result = 1 + 3;try {Thread.sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("异步任务3结束");return result;}, executorService);//任务组合CompletableFuture<Void> allOf = CompletableFuture.allOf(task, task2, task3);//等待所有任务完成allOf.get();//获取任务的返回结果System.out.println("task结果为:" + task.get());System.out.println("task2结果为:" + task2.get());System.out.println("task3结果为:" + task3.get());}

anyOf: 只要有一个任务完成

    @Testpublic void testCompletableAnyOf() throws ExecutionException, InterruptedException {//创建线程池ExecutorService executorService = Executors.newFixedThreadPool(10);//开启异步任务1CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {int result = 1 + 1;return result;}, executorService);//开启异步任务2CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {int result = 1 + 2;return result;}, executorService);//开启异步任务3CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {int result = 1 + 3;return result;}, executorService);//任务组合CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task, task2, task3);//只要有一个有任务完成Object o = anyOf.get();System.out.println("完成的任务的结果:" + o);}

7、CompletableFuture使用有哪些注意点

CompletableFuture 使我们的异步编程更加便利的、代码更加优雅的同时,我们也要关注下它,使用的一些注意点。

7.1、Future需要获取返回值,才能获取异常信息

    @Testpublic void testWhenCompleteExceptionally() {CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {if (1 == 1) {throw new RuntimeException("出错了");}return 0.11;});//如果不加 get()方法这一行,看不到异常信息//future.get();}

Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。

小伙伴们使用的时候,注意一下哈,考虑是否加try...catch...或者使用exceptionally方法。

7.2、CompletableFuture的get()方法是阻塞的

CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。

//反例CompletableFuture.get();
//正例
CompletableFuture.get(5, TimeUnit.SECONDS);

7.3、不建议使用默认线程池

CompletableFuture代码中又使用了默认的「ForkJoin线程池」,处理的线程个数是电脑「CPU核数-1」。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。

7.4、自定义线程池时,注意饱和策略

CompletableFuture的get()方法是阻塞的,我们一般建议使用future.get(5, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。

但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离哈。

8、推荐阅读

CompletionService使用与源码分析

CompletableFuture使用详解

多线程与并发 - Java 8 CompletableFuture 异步多线程相关推荐

  1. JUC系列(十一) | Java 8 CompletableFuture 异步编程

    多线程一直Java开发中的难点,也是面试中的常客,趁着还有时间,打算巩固一下JUC方面知识,我想机会随处可见,但始终都是留给有准备的人的,希望我们都能加油!!! 沉下去,再浮上来,我想我们会变的不一样 ...

  2. Java之CompletableFuture异步、组合计算基本用法

    CompletableFuture是Java8新增的Api,该类实现了Future和ComplateStage两个接口,提供了强大的Future扩展功能,可以简化异步编程的复杂性,提供了函数编程的能力 ...

  3. 创建现成的四种方式 多线程与并发的基本概念:

    多线程与并发的基本概念: 多线程 一:什么是线程? 进程:进行中应用程序 线程:是进程组成者,一个进程中可能包含多个线程 cpu执行程序的最小单位是线程,cpu在同一时间内只能执行一个线程,在多个线程 ...

  4. CPU,多核,多线程,并发,并行,计算效率

    计算机原理:CPU.并发.并行.多核.多线程.多进程 0.计算机工作流程 0.0 基础概念:计算机组成 0.1 CPU(Central Processing Unit) (1)控制单元 (2)运算单元 ...

  5. Java 并发编程解析 | 如何正确理解Java领域中的多线程模型,主要用来解决什么问题?

    苍穹之边,浩瀚之挚,眰恦之美: 悟心悟性,善始善终,惟善惟道! -- 朝槿<朝槿兮年说> 写在开头 我国宋代禅宗大师青原行思在<三重境界>中有这样一句话:" 参禅之初 ...

  6. Java-多线程-Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比

    转载声明 本文大量内容系转载自以下文章,有删改,并参考其他文档资料加入了一些内容: [小家Java]Future.FutureTask.CompletionService.CompletableFut ...

  7. JAVA多线程和并发基础面试问答

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 多线程和并发问题是Java技术面试中面试官比较喜欢问的问题之一.在 ...

  8. ***JAVA多线程和并发基础面试问答

    多线程和并发问题是Java技术面试中面试官比较喜欢问的问题之一.在这里,从面试的角度列出了大部分重要的问题,但是你仍然应该牢固的掌握Java多线程基础知识来对应日后碰到的问题.(校对注:非常赞同这个观 ...

  9. java多线程面试_Java多线程和并发基础面试问答,看过后你不会后悔

    ***:Java多线程面试问题 1:进程和线程之间有什么不同? 一个进程是一个独立(self contained)的运行环境,它可以被看作一个程序或者一个应用.而线程是在进程中执行的一个任务.Java ...

最新文章

  1. python爬虫beautifulsoup4系列4-子节点
  2. 鸿蒙开发-在JS中获取hml页面中Input输入的值
  3. 回溯法之避免无用判断 UVA129 Krypton Factor困难的串
  4. 获取outlook 会议_如何仅在Microsoft Outlook中仅获取您关注的电子邮件的通知
  5. K-means算法应用:图片压缩
  6. vray铺装材质参数设计蓝海创意云渲染
  7. 优先队列优化迪杰斯特拉
  8. 可以自己diy壁纸的app_有什么APP可以做壁纸?
  9. ps html 优化,PS基线基线已优化连续的区别是什么?
  10. python实现最小二乘法
  11. long journey android,人类一败涂地感染模式mod
  12. 交叉编译Qt5.9.6
  13. ---- 招聘之数据结构 ----
  14. 从零开始——Emacs 安装配置使用教程 2015
  15. 中国android手机市场占有率,Android成全球第二大手机系统 市场份额达26%
  16. LCA(最近公共祖先)问题
  17. 一次真实的入侵-------记对一足球推荐站点的渗透
  18. 《X Cross:魔境传说》(X Cross)[DVDRip]
  19. 百度增值税发票调用示例
  20. ANSYS Maxwell 3D线圈磁场仿真分析

热门文章

  1. C#中Abstract和Virtual使用详解
  2. 2006最酷小说《上上床,谈谈心》
  3. c语言程序 电子钢琴按键,基于51单片机8按键电子琴-简易版系统设计(原理图+程序仿真+论文)...
  4. 龙芯2K1000使用nfs挂载文件系统进行使用
  5. QQ已删好友查询手机版
  6. 【数字逻辑】学习笔记 第五章 Part3 时序逻辑电路(常用时序逻辑电路及其应用)
  7. Windows Modules Installer禁用导致的windows2008R2 安装补丁“此更新不适用于你的计算机”解决办法
  8. 最新报告丨深入洞察九大车主 APP,落地汽车行业解决方案及最佳实践
  9. Linux常用命令详解 ls -l命令详解
  10. ONLYOFFICE与O2OA参加西部教育博览会圆满成功