文章目录

  • 1、多线程相关概念
    • 1.1、synchronized
    • 1.2、并发concurrent
    • 1.3、并行parallel
    • 1.4、进程
    • 1.5、线程
    • 1.6、管程
  • 2、线程分类
    • 2.1、用户线程
    • 2.2、守护线程
  • 3、聊聊CompletableFuture
    • 3.1、Future和Callable接口
    • 3.2、FutureTask实现类
    • 3.3、Future到CompletableFuture
      • 3.3.1、get()阻塞
      • 3.3.2、isDone()轮询
    • 3.4、CompletableFuture基本介绍
      • runAsync无返回值
      • supplyAsync有返回值
    • 3.5、CompletableFuture使用演示(日常使用)
      • CompletableFuture优点总结
    • 3.6、CompletableFuture案例精讲
      • 函数式接口
      • Chain链式调用
      • join和get对比
      • 实战精讲-比价网站case
    • 3.7、CompletableFuture常用API
      • 1、获得结果和触发计算
      • 2、对计算结果进行处理
      • 3、对计算结果进行消费
      • 4、对计算速度进行选用
      • 5、对计算结果进行合并
  • 4、CompleteFuture和线程池(非常重要)

1、多线程相关概念

1.1、synchronized

  • synchronized称之为”同步锁“
  • 作用是保证在同一时刻, 被修饰的代码块或方法只会有一个线程执行,以达到保证并发安全的效果。

1.2、并发concurrent

  • 是在同一实体上的多个事件
  • 是在一台机器上同时处理多个任务

1.3、并行parallel

  • 是在不同实体上的多个事件
  • 是在多台机器上同时处理多个任务

1.4、进程

  • 一个进程由程序段(代码段)、数据段、进程控制块三部分组成

  • 是程序执行和系统进行并发调度的最小单位,每⼀个进程都有它⾃⼰的内存空间和系统资源

1.5、线程

  • 线程是”进程代码段“的一次顺序执行流程,是CPU调度的最小单位
  • 一个线程由线程描述信息、程序计数器和栈内存三部分组成

Java命令执行一个class 类时,实际上是启动了一个JVM 进程,在该进程的内部至少会启动两个实例,一个是main 线程,另一个是GC线程

1.6、管程

  • Monitor(监视器),也就是我们平时所说的锁

  • Monitor其实是一种同步机制,他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。

好啦,了解完一个锁、两个并、三个程之后开始正菜

2、线程分类

2.1、用户线程

  • 主线程

  • 用户创建的线程

  • 不同的线程生命周期一般不同,不会同时消亡

2.2、守护线程

  • 是一种特殊的线程,在后台默默地完成一些系统性的服务,比如垃圾回收线程
  • 用户线程全部消亡则守护线程也消亡

来一个小栗子感受一下:

public class ThreadDemo {public static void main(String[] args) {Thread t1 = new Thread(() -> {System.out.println(Thread.currentThread().getName()+"是:\t"+(Thread.currentThread().isDaemon()?"守护线程":"用户线程"));while(true){}}, "t1");//t1.setDaemon(true);t1.start();//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println(Thread.currentThread().getName()+"\t"+" ----task is over");}
}

要是加上这句 t1.setDaemon(true) ,再来看看

这就很好的诠释了用户线程全部消亡则守护线程也消亡这句话。

需要注意的点:

  • 当程序中所有用户线程执行完毕之后,不管守护线程是否结束,系统都会自动退出

  • 如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了,系统可以退出了。所以当系统只剩下守护进程的时候,java虚拟机会自动退出

  • 设置守护线程,需要在start()方法之前进行

3、聊聊CompletableFuture

3.1、Future和Callable接口

Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如

  • 获取异步任务的执行结果
  • 取消任务的执行
  • 判断任务是否被取消
  • 判断任务执行是否完毕等(异步:可以被叫停,可以被取消)

假设一个场景:上课买水的例子

老师在上课,但是口渴,但是为了不耽误教学任务进度,她不能自己去买水,于是让班长这个线程去买水,自己可以继续上课,实现了异步任务。

主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。

一个目的:

  • 异步多线程任务执行且有返回结果

三个特点:

  • 多线程
  • 有返回
  • 异步任务

3.2、FutureTask实现类

分析源码:贴两张图


在源码可以看到,他既继承了RunnableFuture接口,也在构造方法中实现了Callable接口(有返回值、可抛出异常)和Runnable接口

举个栗子测试下:

public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<>(new MyThread());Thread t1 = new Thread(futureTask,"t1");t1.start();System.out.println(futureTask.get());//接收返回值}
}class MyThread implements Callable<String>{@Overridepublic String call() throws Exception {System.out.println("-----come in call() ----异步执行");return "hello Callable 返回值";}
}//结果
//-----come in call() ----异步执行
//hello Callable 返回值

3.3、Future到CompletableFuture

3.3.1、get()阻塞

这里有两个方案:

  • 方案一,3个任务1个main线程处理,大概1541ms

    public class FutureDemo {public static void main(String[] args) {/*** 三个任务,目前只有一个线程main 来处理,看耗时多少*/long startTime = System.currentTimeMillis();try{TimeUnit.MILLISECONDS.sleep(500);}catch(Exception e){e.fillInStackTrace();}try{TimeUnit.MILLISECONDS.sleep(500);}catch(Exception e){e.fillInStackTrace();}try{TimeUnit.MILLISECONDS.sleep(500);}catch(Exception e){e.fillInStackTrace();}long endTime = System.currentTimeMillis();System.out.println("花费时间"+(endTime-startTime)+"毫秒");System.out.println(Thread.currentThread().getName()+"结束");}
    }
    

  • 方案二,3个任务3个线程,利用线程池(假如每次new一个Thread,太浪费资源,会有GC这些工作),大概42毫秒

    public class FutureDemo {public static void main(String[] args) {/*** 三个任务,目前开启多个异步任务线程 来处理,看耗时多少*/long startTime = System.currentTimeMillis();ExecutorService threadPool = Executors.newFixedThreadPool(3);FutureTask<String> futureTask1 = new FutureTask<String>(()->{try {TimeUnit.MILLISECONDS.sleep(500);}catch(Exception e){e.printStackTrace();}return "futureTask1 结束";});threadPool.submit(futureTask1);FutureTask<String> futureTask2 = new FutureTask<String>(()->{try {TimeUnit.MILLISECONDS.sleep(500);}catch(Exception e){e.printStackTrace();}return "futureTask2 结束";});threadPool.submit(futureTask2);FutureTask<String> futureTask3 = new FutureTask<String>(()->{try {TimeUnit.MILLISECONDS.sleep(500);}catch(Exception e){e.printStackTrace();}return "futureTask3 结束";});threadPool.submit(futureTask3);long endTime = System.currentTimeMillis();System.out.println("花费时间"+(endTime-startTime)+"毫秒");System.out.println(Thread.currentThread().getName()+"结束");}
    }
    

不难看出future+线程池异步多线程任务配合,能显著提高程序的执行效率。

有优点那必然会有缺点的呀,接着往下聊

Future缺点:

1、get()阻塞

举个栗子:这里注意get()方法的位置

public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<String>(()->{System.out.println(Thread.currentThread().getName()+"\t ------线程come in");try {TimeUnit.SECONDS.sleep(5);//暂停几秒} catch (InterruptedException e) {e.printStackTrace();}return "task over";});Thread t1 = new Thread(futureTask,"t1");t1.start();System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");System.out.println(futureTask.get());
}

public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<String>(()->{System.out.println(Thread.currentThread().getName()+"\t ------线程come in");try {TimeUnit.SECONDS.sleep(5);//暂停几秒} catch (InterruptedException e) {e.printStackTrace();}return "task over";});Thread t1 = new Thread(futureTask,"t1");t1.start();System.out.println(futureTask.get());System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");}

从这两个例子可以看出,get()方法位置的不同,会导致主线程的效率(一旦调用get()方法直接去找副线程了,阻塞了主线程),所以通常将get()方法放在最后

另外:5秒钟后才出来结果,我只想等待3 秒钟,过时不候,这时会报异常

System.out.println(futureTask.get(3,TimeUnit.SECONDS));

程序运行过程中 那5秒就干巴巴的等,一点都不优雅 ,我们希望有点提示

那如何解决呢?

3.3.2、isDone()轮询

public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {FutureTask<String> futureTask = new FutureTask<String>(()->{System.out.println(Thread.currentThread().getName()+"\t ------线程come in");try {TimeUnit.SECONDS.sleep(1);//暂停几秒} catch (InterruptedException e) {e.printStackTrace();}return "task over";});Thread t1 = new Thread(futureTask,"t1");t1.start();//System.out.println(futureTask.get(3,TimeUnit.SECONDS));System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");//不要阻塞,尽量用轮询替代while(true){if(futureTask.isDone()){System.out.println("----result: "+futureTask.get());break;}else{System.out.println("还在计算中,别催,越催越慢,再催熄火");}}
}

如果想要异步获取结果,通常都会以轮询的方式去获取结果

但是呢 轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果

尽量不要阻塞

结论:Future 对于结果的获取不是很友好,只能通过阻塞或者轮询的方式得到任务的结果

现在想完成一些复杂的任务:

① 回调通知

② 将两个异步计算合成一个异步计算,这两个异步计算互相独立,同时第二个又依赖第一个的结果

等等

阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无畏的CPU资源。因此,JDK8设计出CompletableFuture

3.4、CompletableFuture基本介绍

类架构说明:

  • 在Java 8中, CompletableFuture提供了非常强大的Future的扩展功能, 可以帮助我们简化异步编程的复杂性, 并且提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 也提供了转换和组合CompletableFuture的方法。

  • 它可能代表一个明确完成的Future, 也有可能代表一个完成阶段(Completion Stage) , 它支持在计算完成以后触发一些函数或执行某些动作。

  • 它实现了Future和Completion Stage接口 ,还具备函数式编程的能力

接口CompletionStage

  • Completion Stage代表异步计算过程中的某一个阶段, 一个阶段完成以后可能会触发另外一个阶段

  • 一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如:stage.then Apply(x->square(x) ) .then Accept(x->System.out.print(x) ) .then Run() ->System.out.print In() ),一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。

核心的四个静态方法(分为两组)

  • 利用核心的四个静态方法创建一个异步操作 不建议用new

  • 关键就是 有没有返回值 是否用了线程池

  • 参数说明:

    • 没有指定Executor的方法,直接使用默认的ForkJoinPool.commPool()作为它的线程池执行异步代码。
    • 如果指定线程池,则使用我们定义的或者特别指定的线程池执行异步代码。

runAsync无返回值

1、runAsync

public static void main(String[] args) throws ExecutionException, InterruptedException{CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName());//停顿几秒线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(completableFuture.get());
}

2、runAsync+线程池

public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(3);//加入线程池CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName());//停顿几秒线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}},threadPool);System.out.println(completableFuture.get());threadPool.shutdown();
}

supplyAsync有返回值

3、supplyAsync

public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(3);//加入线程池CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "helllo supplyasync";});System.out.println(objectCompletableFuture.get());threadPool.shutdown();
}

4、supplyAsync+线程池

public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(3);//加入线程池CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "helllo supplyasync";},threadPool);System.out.println(objectCompletableFuture.get());threadPool.shutdown();
}

3.5、CompletableFuture使用演示(日常使用)

CompletableFuture 是Future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

1、基本功能

CompletableFuture可以完成Future的功能,举个栗子:

public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Object> objectCompletableFuture = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName()+"----副线程come in");int result = ThreadLocalRandom.current().nextInt(10);//产生一个随机数try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("1秒钟后出结果:"+result);return result;});System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");System.out.println(objectCompletableFuture.get());
}

2、减少阻塞和轮询whenComplete

CompletableFuture通过whenComplete减少阻塞和轮询(自动回调)

public static void main(String[] args){CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName()+"--------副线程come in");int result = ThreadLocalRandom.current().nextInt(10);//产生随机数try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return result;}).whenComplete((v,e) -> {//没有异常,v是值,e是异常if(e == null){System.out.println("------------------计算完成,更新系统updataValue"+v);}}).exceptionally(e->{//有异常的情况e.printStackTrace();System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());return null;});//线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}
}

解释下为什么默认线程池关闭,自定义线程池记得关闭:

还记得用户线程和守护线程吗? main 线程执行的快,当它结束时,副线程也必须关闭,所以呀main线程需要设置延迟几秒

3、假如换用自定义线程池

public static void main(String[] args){ExecutorService threadPool = Executors.newFixedThreadPool(3);try {CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName()+"--------副线程come in");int result = ThreadLocalRandom.current().nextInt(10);//产生随机数try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}//System.out.println("1秒钟后出结果:"+result);return result;},threadPool).whenComplete((v,e) -> {//没有异常,v是值,e是异常if(e == null){System.out.println("------------------计算完成,更新系统updataValue"+v);}}).exceptionally(e->{//有异常的情况e.printStackTrace();System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());return null;});System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");} catch (Exception e) {e.printStackTrace();}finally {threadPool.shutdown();}
}

4、异常情况的展示,设置一个异常 int i = 10 / 0 ;

public static void main(String[] args){ExecutorService threadPool = Executors.newFixedThreadPool(3);try {CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName()+"--------副线程come in");int result = ThreadLocalRandom.current().nextInt(10);//产生随机数try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("-----结果---异常判断值---"+result);//---------------------异常情况的演示--------------------------------------if(result > 2){int i  = 10 / 0 ;//我们主动的给一个异常情况}//------------------------------------------------------------------return result;},threadPool).whenComplete((v,e) -> {//没有异常,v是值,e是异常if(e == null){System.out.println("------------------计算完成,更新系统updataValue"+v);}}).exceptionally(e->{//有异常的情况e.printStackTrace();System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());return null;});System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");} catch (Exception e) {e.printStackTrace();}finally {threadPool.shutdown();}
}

CompletableFuture优点总结

  • 异步任务结束时,会自动回调某个对象的方法;
  • 主线程设置好毁掉后,不再关心异步任务的执行,异步任务之间可以顺序执行
  • 异步任务出错时,会自动回调某个对象的方法。

3.6、CompletableFuture案例精讲

编程必备技能准备

函数式接口

  • 函数式接口的定义:任何接口,如果只包含唯一一个抽象方法,那么它就是一个函数式接口。对于函数式接口,我们可以通过lambda表达式来创建该接口的对象

    public interface Runnable{public abstract void run();
    }
    
  • 常见的函数式接口
  1. Runnable

    @FunctionalInterface
    public interface Runnable {public abstract void run();
    }
    
  2. Function

    @FunctionalInterface
    public interface Function<T, R> {R apply(T t);
    }
    
  3. Consumer

    @FunctionalInterface
    public interface Consumer<T> {void accept(T t);
    }
    
  4. Supplier 供给型函数式接口

    @FunctionalInterface
    public interface Supplier<T> {/*** Gets a result.** @return a result*/T get();
    }
    
  5. Biconsumer(Bi代表两个的意思,我们要传入两个参数,在上面的案例中是v和e)

    @FunctionalInterface
    public interface BiConsumer<T, U> {void accept(T t, U u);}
    

Chain链式调用

public class Chain {public static void main(String[] args) {//-------------------老式写法------------
//        Student student = new Student();
//        student.setId(1);
//        student.setMajor("cs");
//        student.setName("小卡");
//------------------链式编程-------------new Student().setId(1).setName("大卡").setMajor("cs");}}
//--------------------------------------------------------------------------------------
@NoArgsConstructor
@AllArgsConstructor
@Data
@Accessors(chain = true)//开启链式编程
class Student{private int id;private String name;private String major;
}

join和get对比

  • 功能几乎一样,区别在于编码时是否需要抛出异常

    • get()方法需要抛出异常
    • join()方法不需要抛出异常
public class Chain {public static void main(String[] args) throws ExecutionException, InterruptedException {//抛出异常CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {return "hello 123";});System.out.println(completableFuture.get());}}
----------------------------------------------------------------------------------------
public class Chain {public static void main(String[] args)  {//未抛出异常CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {return "hello 123";});System.out.println(completableFuture.join());}
}

实战精讲-比价网站case

需求

1、需求说明
1.1、同一款产品,同时搜索出同款产品在各大电商平台的售价;
1.2、同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少2、输出返回:
出来结果希望是同款产品的在不同地方的价格清单列表, 返回一个List<String>
《mysql》in jd price is 88.05
《mysql》in dang dang price is 86.11
《mysql》in tao bao price is 90.433、解决方案,比对同一个商品在各个平台上的价格,要求获得一个清单列表
1   stepbystep   , 按部就班, 查完京东查淘宝, 查完淘宝查天猫......
2   all in       ,万箭齐发,一口气多线程异步任务同时查询...

基本框架搭建

  • 相当于是一个一个按部就班
//同一款产品,同时搜索出同款产品在各大电商平台的售价;
public static class Case {static List<NetMall> list = Arrays.asList(new NetMall("jd"),new NetMall("dangdang"),new NetMall("taobao"));public static List<String> getPrice(List<NetMall> list,String productName){return list.stream() //----流式计算做了映射(利用map),希望出来的是有格式的字符串(利用String.format),%是占位符.map(netMall -> String.format(productName + " in %s price is %.2f",netMall.getNetMallName(),//第一个%netMall.calcPrice(productName))).collect(Collectors.toList());//第二个%}public static void main(String[] args) {long startTime = System.currentTimeMillis();List<String> list1 = getPrice(list, "mysql");for(String element:list1){System.out.println(element);}long endTime = System.currentTimeMillis();System.out.println("---当前操作花费时间----costTime:"+(endTime-startTime)+"毫秒");}
}static class NetMall{private String netMallName;public String getNetMallName() {return netMallName;}public NetMall(String netMallName){this.netMallName = netMallName;}public double calcPrice(String productName){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);//用这句话来模拟价格}
}

从功能到性能:利用CompletableFuture

  • 这里是利用异步线程,万箭齐发
  • 此处用了两步流式编程
  • 性能差距巨大
public static class Case {static List<NetMall> list = Arrays.asList(new NetMall("jd"),new NetMall("dangdang"),new NetMall("taobao"));public static List<String> getPrice(List<NetMall> list,String productName){return list.stream() //----流式计算做了映射(利用map),希望出来的是有格式的字符串(利用String.format),%是占位符.map(netMall -> String.format(productName + " in %s price is %.2f",netMall.getNetMallName(),//第一个%netMall.calcPrice(productName))).collect(Collectors.toList());//第二个%}//从功能到性能public static List<String> getPricesByCompletableFuture(List<NetMall> list,String productName){return list.stream().map(netMall ->CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",netMall.getNetMallName(),netMall.calcPrice(productName))))//Stream<CompletableFuture<String>>.collect(Collectors.toList())//List<CompletablFuture<String>>.stream()//Stream<CompletableFuture<String>.map(s->s.join())//Stream<String>.collect(Collectors.toList());}public static void main(String[] args) {long startTime = System.currentTimeMillis();List<String> list1 = getPrice(list, "mysql");for(String element:list1){System.out.println(element);}long endTime = System.currentTimeMillis();System.out.println("--普通版----当前操作花费时间----costTime:"+(endTime-startTime)+"毫秒");System.out.println("------------------------------分割----------------------");startTime = System.currentTimeMillis();List<String> list2 = getPricesByCompletableFuture(list, "mysql");for(String element:list2){System.out.println(element);}endTime = System.currentTimeMillis();System.out.println("--性能版-当前操作花费时间----costTime:"+(endTime-startTime)+"毫秒");}
}static class NetMall{private String netMallName;public String getNetMallName() {return netMallName;}public NetMall(String netMallName){this.netMallName = netMallName;}public double calcPrice(String productName){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);//用这句话来模拟价格}
}

3.7、CompletableFuture常用API

1、获得结果和触发计算

获取结果

  • public T get() 不见不散,容易阻塞

  • public T get(long timeout,TimeUnit unit) 过时不候,超过时间会爆异常

  • public T join() 类似于get(),区别在于是否需要抛出异常

  • public T getNow(T valueIfAbsent)

    • 立即获取结果不阻塞

      • 计算完,返回计算完成后的结果
      • 没算完,给一个替代结果,返回设定的valueAbsent(直接返回了备胎值xxx)

主动触发计算

  • public boolean complete(T value) 是 否立即打断get()方法返回括号值

    (执行要2s,等待只有1s,所以还没执行完就被打断了。返回true表示打断了获取这个过程,直接返回了备胎值complete;如果没打断,返回false 和原来的abc)
    栗子:

public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);//执行需要2秒} catch (InterruptedException e) {e.printStackTrace();}return "abc";});try {TimeUnit.SECONDS.sleep(1);//等待需要1秒} catch (InterruptedException e) {e.printStackTrace();}// System.out.println(uCompletableFuture.getNow("xxx"));//执2-等1 返回xxxSystem.out.println(uCompletableFuture.complete("completeValue")+"\t"+uCompletableFuture.get());//执2-等1 返回true+备胎值completeValue
}

如果更改执行时间1秒,等待时间2秒,则

2、对计算结果进行处理

thenApply :计算结果存在在依赖关系,使得线程串行化。因为依赖关系,所以一旦有异常,直接叫停。

public static void main(String[] args) {//当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println("111");return 1024;}).thenApply(f -> {System.out.println("222");return f + 1;}).thenApply(f -> {//int age = 10/0; // 异常情况:那步出错就停在那步。System.out.println("333");return f + 1;}).whenCompleteAsync((v,e) -> {System.out.println("*****v: "+v);}).exceptionally(e -> {e.printStackTrace();return null;});System.out.println("-----主线程结束,END");// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
}

正常情况:

异常情况:

handle 类似于thenApply,但是有异常的话仍然可以往下走一步。

public static void main(String[] args) {//当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,// 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println("111");return 1024;}).handle((f,e) -> {int age = 10/0;//异常语句System.out.println("222");return f + 1;}).handle((f,e) -> {System.out.println("333");return f + 1;}).whenCompleteAsync((v,e) -> {System.out.println("*****v: "+v);}).exceptionally(e -> {e.printStackTrace();return null;});System.out.println("-----主线程结束,END");// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
}

3、对计算结果进行消费

  • 接收任务的处理结果,并消费处理,无返回结果|消费型函数式接口,之前的是Function

thenAccept

public static void main(String[] args)
{CompletableFuture.supplyAsync(() -> {return 1;}).thenApply(f -> {return f + 2;}).thenApply(f -> {return f + 3;}).thenApply(f -> {return f + 4;}).thenAccept(r -> System.out.println(r));
}

补充:Code之任务之间的顺序执行

  • thenRun

    • thenRun(Runnable runnable)
    • 任务A执行完执行B,并且B不需要A的结果
  • thenAccept

    • thenAccept(Consumer action)
    • 任务A执行完执行B,B需要A的结果,但是任务B无返回值
  • thenApply

    • thenApply(Function fn)
    • 任务A执行完执行B,B需要A的结果,同时任务B有返回值
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
//null System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());
//resultA打印出来的 null因为没有返回值System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
//resultAresultB 返回值

4、对计算速度进行选用

applyToEither 方法,快的那个掌权

谁快用谁

public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> play1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }return "play1 ";});CompletableFuture<String> play2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }return "play2";});CompletableFuture<String> thenCombineResult = play1.applyToEither(play2, f -> {//对计算速度进行选用return f + " is winner";});System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());
}

5、对计算结果进行合并

thenCombine 合并

  • 两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCOmbine来处理
  • 先完成的先等着,等待其它分支任务
public static void main(String[] args) throws ExecutionException, InterruptedException
{CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");return 10;});CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");return 20;});CompletableFuture<Integer> thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");return x + y;});System.out.println(thenCombineResult.get());
}

合并版本:

public static void main(String[] args) throws ExecutionException, InterruptedException
{CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");return 10;}).thenCombine(CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");return 20;}), (x,y) -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");return x + y;}).thenCombine(CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");return 30;}),(a,b) -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");return a + b;});System.out.println("-----主线程结束,END");System.out.println(thenCombineResult.get());// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
}

4、CompleteFuture和线程池(非常重要)

上面的几个方法都有普通版本和后面加Async的版本

  • thenRunthenRunAsync为例,有什么区别?
public static void main(String[] args){ExecutorService threadPool = Executors.newFixedThreadPool(5);CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("1号任务"+"\t"+Thread.currentThread().getName());return "abcd";},threadPool).thenRun(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("4号任务"+"\t"+Thread.currentThread().getName());});threadPool.shutdown();
}

1号任务 加线程池 其他的不带线程池:

public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(5);CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("1号任务"+"\t"+Thread.currentThread().getName());return "abcd";},threadPool).thenRunAsync(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("4号任务"+"\t"+Thread.currentThread().getName());});
}

这里另起炉灶重新调用了默认的 ForkJoinPool

public static void main(String[] args){ExecutorService threadPool = Executors.newFixedThreadPool(5);CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{// try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("1号任务"+"\t"+Thread.currentThread().getName());return "abcd";},threadPool).thenRun(()->{// try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{// try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{//try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("4号任务"+"\t"+Thread.currentThread().getName());});
}

分析源码:

//CompletableFuture.java 2009行
public CompletableFuture<Void> thenRun(Runnable action) {//传入值是一样的return uniRunStage(null, action);}public CompletableFuture<Void> thenRunAsync(Runnable action) {return uniRunStage(asyncPool, action);//但是这里有个异步的线程池asyncPool}
--------------------------------------------------------------------------------------//进入asyncPoolprivate static final boolean useCommonPool =(ForkJoinPool.getCommonPoolParallelism() > 1);//一般大于1都是成立的/*** Default executor -- ForkJoinPool.commonPool() unless it cannot* support parallelism.*/private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();//所以这里会调用forkJoin线程池

分析结果:

  • 没有传入自定义线程池,都默认线程池ForkJoinPool

  • 传入了一个自定义线程池

    如果执行第一个任务的时候,传入了一个自定义线程池

    • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务共用同一个线程池
    • 调用thenRunAsync方法执行第二个任务时,则第一个任务是自己传入的线程池,第二个任务使用的是ForkJoin线程池
  • 有可能处理太快,系统优化切换原则,直接使用main 线程处理

  • 其他如thenAccept 和thenAcceptAsync 它们之间的区别也是同理

高并发核心模式之异步回调相关推荐

  1. 《Java高并发核心编程.卷2,多线程、锁、JMM、JUC、高并发设计模式》

    <Java高并发核心编程.卷2,多线程.锁.JMM.JUC.高并发设计模式> 目录 第1章 多线程原理与实战 1.2 无处不在的进程和线程 1.2.1 进程的基本原理 1.2.2 线程的基 ...

  2. cpu多核 node 单线程_详解node单线程实现高并发原理与node异步I/O

    一.node单线程实现高并发原理 众所周知nodejs是单线程且支持高并发的脚本语言.可为什么单线程的nodejs可以支持高并发呢?很多人都不明白其原理,下面我来谈谈我的理解: 1. node的优点: ...

  3. 高并发核心编程Spring Cloud+Nginx秒杀实战,秒杀业务的参考实现

    秒杀业务的参考实现 本节从功能入手重点介绍Spring Cloud秒杀实战业务处理的3层实现:dao层.service层.controller层. 秒杀的功能模块和接口设计 秒杀系统的实现有多种多样的 ...

  4. python异步高并发_通过python异步通讯方式构建高并发压力测试工具

    背景说明 在工作中,要对一个接口进行压测,我当时就想通过python自己编写一个压力发生器. 初步方案(单线程循环发送) 通过循环向服务端发送请求,代码如下: #采用单步循环的方式循环测试 impor ...

  5. 可能要用心学高并发核心编程,限流原理与实战,分布式令牌桶限流

    实战:分布式令牌桶限流 本节介绍的分布式令牌桶限流通过Lua+Java结合完成,首先在Lua脚本中完成限流的计算,然后在Java代码中进行组织和调用. 分布式令牌桶限流Lua脚本 分布式令牌桶限流Lu ...

  6. cpu多核 node 单线程_node单线程支撑高并发原理(node异步I/O)

    一.node单线程实现高并发原理 众所周知nodejs是单线程且支持高并发的脚本语言.可为什么单线程的nodejs可以支持高并发呢?很多人都不明白其原理,下面我来谈谈我的理解: 1. node的优点: ...

  7. 面试又被问高并发,哑口无言?一份高并发核心文档助你吊打面试官

     关于程序员,除了做项目来提高自身的技术之外,还有一种提升自己的专业技能就是:多!看!书! Java高并发程序设计 高并发(High Concurrency)是互联网分布式系统架构设计中必须考虑的因素 ...

  8. 华为18级工程师耗时三年才总结出这份Java亿级高并发核心编程手册

    移动时代.5G时代.物联网时代的大幕已经开启,新时代提升了对Java应用的高性能.高并发的要求,也抬升了Java工程师的技术台阶和面试门槛. 很多公司的面试题从某个侧面反映了生产场景的技术要求.之前只 ...

  9. 【JUC并发编程13】异步回调

    文章目录 13 异步回调 13.1 CompletableFuture 13.2 Future 与 CompletableFuture 13 异步回调 同步:指等待资源(阻塞) 异步:指设立哨兵,资源 ...

最新文章

  1. SAP Spartacus B2B Unit page OrgUnitService.getTree方法返回的结果
  2. php字符串处理面试题,关于PHP字符串的一道面试题
  3. 游标迭代器(过滤器)——Scan
  4. 电文的编码和译码c语言实现,电文的编码及译码.doc
  5. 小米无线网卡驱动_小米air笔记本蓝屏故障排除,一个匪夷所思的故障原因。
  6. golang 读取Excel 或者map字符型返回结构体数组
  7. wince 错误 Error: failed PB timebomb check
  8. 最新服装零售软件管理排名
  9. 免匙SSH登录失败问题(非常规)
  10. matlab单服务排队模型,MATLAB模拟银行单服务台排队模型
  11. 如何做好学术演讲-01
  12. 专题方案 | 项目里程碑管理系统
  13. 2008年最吸引眼球的10只股票
  14. 权力来自于他人的服从
  15. 启动tomcat时候报错(Error deploying web application directory)
  16. Matlab绘图-很详细,很全面
  17. DXF图元数据的组织
  18. 分不清合约机与定制机,怀疑被骗
  19. 二分图 恶补定义!!!
  20. 南华大学的计算机专业学校排名,2019南华大学专业排名

热门文章

  1. 抖音seo源码开发,开源技术保姆式搭建
  2. python 错误 SyntaxError: invalid character in identifier
  3. M201: MongoDB Performance chapter 2 Mongodb Indexes学习记录
  4. html5开发页游(前话)
  5. Andersen Global宣布在沙特扩大业务
  6. Apipost使用指南
  7. 云计算疑难杂症解决方案一
  8. html换行(文本+符号)
  9. html字体换行教程
  10. c语言long和 int,C语言 int 和long int 问题