背景

异步编程现在受到了越来越多的关注,尤其是在 IO 密集型的业务场景中,相比传统的同步开发模式,异步编程的优势越来越明显,本文介绍Java常见的实现方式;

Future

描述

java.util.concurrent.Future是JDK5引入的,用来获取一个异步计算的结果。可以使用isDone方法检查计算是否完成,也可以使用get阻塞住调用线程,直到计算完成返回结果,使用cancel方法停止任务的执行。

FutureTask.java是对Futre和Runnable最简单的实现,实现了run函数,所以可以直接执行,任务执行结束通过set()保存结果,setException()保存异常信息。通常配合executorService.submit()一起使用,ExecutorService中将任务包装成FutureTask执行execute();

样例

     @Testpublic void futureCallBackTest() throws InterruptedException, ExecutionException {System.out.println(printThread("小明点餐"));Future<String> future = executorService.submit(() -> {System.out.println(printThread("厨师开始炒菜"));Thread.sleep(2000);System.out.println(printThread( "厨师炒好菜"));return "饭菜好了";});String result = future.get();executorService.shutdown();System.out.println(printThread(result + ",小明开始吃饭"));}

运行结果

优缺点

  • 能获得异步线程执行结果

  • 无法方便得知任务何时完成

  • 在主线程获得任务结果会导致主线程阻塞

  • 复杂一点的情况下,比如多个异步任务的场景,一个异步任务依赖上一个异步任务的执行结果,异步任务合并等,Future无法满足需求

ListenableFuture

描述

Google并发包下的listenableFuture对Java原生的future做了扩展,顾名思义就是使用监听器模式实现的回调,所以叫可监听的future,通过addListener(Runnable listener, Executor executor)方法添加回调任务。

要使用listenableFuture还要结合MoreExecutor线程池,MoreExecutor是对Java原生线程池的封装,比如常用的MoreExecutors.listeningDecorator(threadPool); 修改Java原生线程池的submit方法,封装了future返回listenableFuture。

样例

@Testpublic void listenableFutureTest() throws InterruptedException, ExecutionException {System.out.println(printThread("小明点餐"));ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());ListenableFuture<String> listenableFuture = listeningExecutorService.submit(() -> {System.out.println(printThread("厨师开始炒菜"));try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(printThread( "厨师炒好菜"));return "饭菜好了";});Futures.addCallback(listenableFuture, new FutureCallback<String>() {@Overridepublic void onSuccess(@Nullable String s) {System.out.println(printThread(s + ",小明开始吃饭"));}@Overridepublic void onFailure(Throwable throwable) {System.out.println(printThread( throwable.getMessage()));}}, executorService);System.out.println(printThread( "小明开始玩游戏"));try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(printThread("小明结束玩游戏"));listenableFuture.get();listeningExecutorService.shutdown();executorService.shutdown();}

运行结果

这里的运行结果:小明玩游戏和小明吃饭放在了2个线程,没有阻塞等待。

优缺点

充分利用线程的时间片

回调机制的最大问题是:Callback Hell(回调地狱)

CallbackHell

描述

大量使用 Callback 机制,使应该是先后的业务逻辑在代码形式上表现为层层嵌套,这会导致代码难以理解和维护

样例

@Testpublic void listenableFutureCallbackHellTest() throws InterruptedException, ExecutionException {System.out.println(printThread("小明点餐"));ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());ListenableFuture<String> listenableFuture = listeningExecutorService.submit(() -> {System.out.println(printThread("厨师开始做菜"));try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "菜已装盘";});Futures.addCallback(listenableFuture, new FutureCallback<String>() {@Overridepublic void onSuccess(@Nullable String s) {System.out.println(printThread(s + ",小明开始吃饭"));System.out.println(printThread( "小明点了个饮料"));ListenableFuture<String> listenableFuture1 = listeningExecutorService.submit(() -> {System.out.println(printThread("服务员拿饮料"));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "饮料好了";});Futures.addCallback(listenableFuture1, new FutureCallback<String>() {@Overridepublic void onSuccess(@Nullable String s) {System.out.println(printThread(s + ",小明开始喝饮料"));}@Overridepublic void onFailure(Throwable throwable) {}}, executorService);}@Overridepublic void onFailure(Throwable throwable) {System.out.println(printThread( throwable.getMessage()));}}, executorService);System.out.println(printThread( "小明开始玩游戏"));try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(printThread("小明结束玩游戏"));listenableFuture.get();listeningExecutorService.shutdown();executorService.awaitTermination(10, TimeUnit.SECONDS);executorService.shutdown();}

CompleteableFuture

描述

Java8新增的CompletableFuture类借鉴了Google Guava的ListenableFuture,它包含50多个方法,默认使用forkJoinPool线程池,提供了非常强大的Future扩展功能,可以帮助我们简化异步编程的复杂性,结合函数式编程,通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的多种方法,可以满足大部分异步回调场景。

CompletableFuture可以用来以声明式语义构建创建异步任务的编排模式,它可以用于通过声明表示:

  • 将要执行一个异步任务;

  • 将要执行一个异步任务,它必须在一个前驱异步任务完成之后执行,其以前驱任务的输出作为自身的输入;

  • 将要执行一个异步任务,它必须在若干前驱异步任务中的(任意或全部)完成之后执行,其以全部(或任一)前驱任务的输出作为自身的输入;

样例

@Testpublic void completeableFutureTest()  {System.out.println(printThread("小明点餐"));CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(printThread("厨师开始做菜"));try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(printThread("厨师菜做好了"));return "菜已装盘";});CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {System.out.println(printThread( "小明开始玩游戏"));try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(printThread("小明结束玩游戏"));});CompletableFuture<Void> completableFuture2 = completableFuture.thenAcceptBoth(completableFuture1,(a, b) -> System.out.println(printThread( a + ", 小明开始吃饭,并点了饮料"))).thenApplyAsync((b) -> {System.out.println(printThread("服务员拿饮料"));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "饮料好了";},executorService).thenAcceptAsync((s) -> System.out.println(printThread(s + ",小明开始喝饮料")));completableFuture2.join();}

方法介绍

创建对象

以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。

runAsync方法也好理解,它以Runnable函数式接口类型为参数,所以CompletableFuture的计算结果为空。

supplyAsync方法以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为U。

计算结果完成时的处理

当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action;

不以Async结尾的方法由原来的线程计算,以Async结尾的方法由默认的线程池ForkJoinPool.commonPool()或者指定的线程池executor运行;

exceptionally方法返回一个新的CompletableFuture,当原始的CompletableFuture抛出异常的时候,就会触发这个CompletableFuture的计算,调用function计算值;

转换、消费

一个传Function将CompletableFuture中的值转换成另一个值,一个传Consumer将CompletableFuture值消费;

组合

thenCombine用来复合另外一个CompletionStage的结果,两个CompletionStage是并行执行的,它们之间并没有先后依赖顺序;

thenAcceptBoth和runAfterBoth是当两个CompletableFuture都计算完成,acceptEither和applyToEither方法是当任意一个CompletionStage完成的时候执行后续任务;

辅助方法 allOf 和 anyOf

allOf方法是当所有的CompletableFuture都执行完后执行计算;

anyOf方法是当任意一个CompletableFuture执行完后就会执行计算;

Reactor

描述

Reactor 框架是 Pivotal 公司( Spring 家族公司)开发的,实现了 Reactive Programming 思想,符合 Reactive Streams 规范的一项技术;

Reactive

反应式宣言-反应式宣言

Reactive Streams

介绍

官网-https://www.reactive-streams.org/?spm=a2c6h.12873639.0.0.edf277a6wQI9QB

简介:

Reactive Streams是一个对于 异步流处理且伴随非阻塞背压机制 而提供的倡议规范;

目标:

控制异步边界的流数据交换(例如从一个线程池向另一个线程池传递数据),同时要确保接收端不被强迫 缓冲任意数量的数据,也就是利用背压(backpressure)模型调节线程间的队列;

反应式编程的范式(接口规范),主要接口

  • Publisher

  • Subscriber

  • Subcription

其中,Subcriber 中便包含了上面表格提到的 onNext、onError、onCompleted 这三个方法。

一个简单样例

public static void main(String[] args) throws InterruptedException {Flux.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.parallel()).subscribe(new CoreSubscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {System.out.println(printThread("onSubscribe, " + s.getClass().toString()));s.request(5);}@Overridepublic void onNext(Integer integer) {System.out.println(printThread("next: " + integer));}@Overridepublic void onError(Throwable t) {}@Overridepublic void onComplete() {System.out.println(printThread("complete"));}});Thread.sleep(1000);
}private static String printThread(String note) {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("hh:mm:ss");long time = System.currentTimeMillis();Date date = new Date(time);return Thread.currentThread().getName() + " " + simpleDateFormat.format(date) + " " + time + " " + note;
}

Reactor

实现reactive streams的类库-Reactor 3 Reference Guide

相似的类库有RxJava2, JDK9 Flow等

Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的发布者。

Flux 同样实现了 org.reactivestreams.Publisher 接口,代表0到N个元素的发表者。

Scheduler 表示背后驱动反应式流的调度器,通常由各种线程池实现。

@Testpublic void ReactorTest() throws InterruptedException {System.out.println(printThread("小明点餐"));Mono mono = Mono.fromSupplier(() -> {System.out.println(printThread("厨师开始做菜"));try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(printThread("厨师菜做好了"));return "菜已装盘";}).publishOn(Schedulers.parallel()).zipWith(Mono.fromSupplier(() -> {System.out.println(printThread( "小明开始玩游戏"));try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(printThread("小明结束玩游戏"));return " ";})).doOnSuccess((tuple2) -> System.out.println(printThread( tuple2.getT1() + ", 小明开始吃饭")));mono.subscribe();Thread.sleep(10000);}

reactor操作函数

Reactor 3 Reference Guide

Java异步实现的N种方式相关推荐

  1. java异步编程的 7 种方式

    早期的系统是同步的,容易理解,我们来看个例子 同步编程 当用户创建一笔电商交易订单时,要经历的业务逻辑流程还是很长的,每一步都要耗费一定的时间,那么整体的RT就会比较长. 于是,聪明的人们开始思考能不 ...

  2. Java异步调用的几种方式

    一.通过创建新线程 二.通过线程池 三.通过@Async注解 四.通过CompletableFuture 日常开发中,会经常遇到说,前台调服务,然后触发一个比较耗时的异步服务,且不用等异步任务的处理结 ...

  3. Java 实现多线程的四种方式 超详细

    Java 实现多线程的四种方式 文章目录 Java 实现多线程的四种方式 一.继承 Thread 类 二.实现 Runnable 接口 三.实现 Callable 接口 四.线程池 1,Executo ...

  4. 12月18日云栖精选夜读 | Java 中创建对象的 5 种方式!...

    作为Java开发者,我们每天创建很多对象,但我们通常使用依赖管理系统,比如Spring去创建对象.然而这里有很多创建对象的方法,我们会在这篇文章中学到. Java中有5种创建对象的方式,下面给出它们的 ...

  5. Java中创建对象的几种方式

    Java中创建对象的几种方式 1.使用new创建对象,在堆上创建. 2.克隆 3.反序列化 4.反射创建对象 5.NIO中可以使用本地方法直接分配堆外内存. 转载于:https://www.cnblo ...

  6. Java中创建对象的四种方式

    为什么80%的码农都做不了架构师?>>>    Java中创建对象的四种方式 (1) 用new语句创建对象,这是最常见的创建对象的方法.    (2) 运用反射手段,调用java.l ...

  7. java解析xml的几种方式

    java解析xml的几种方式 博客分类: java基础备忘-好记性不然烂笔头 XMLJava应用服务器数据结构编程  第一种:DOM. DOM的全称是Document Object Model,也即文 ...

  8. 创建和应用Java包文件的两种方式(转)

    创建和应用Java包文件的两种方式(转) <Java编程艺术>章节选登.作者:高永强 清华大学出版社 (即将出版) 12.1  包--package    ... 12.1.1  包命名规 ...

  9. java制作oracle程序,Java程序操作Oracle两种方式之简单实现

    Java程序操作Oracle两种方式之简单实现 1.通过JDBC-ODBC桥连接Oracle数据库 (1)创建odbc源,在控制面板->管理工具->数据源(odbc)中添加DSN,比如取名 ...

最新文章

  1. 与其羡慕他人精彩,还不如设法活出自我
  2. MVC中执行成功弹出对话框
  3. uwsgi基础——最佳实践和问题
  4. ActiveMQ介绍
  5. 【WebView】warnning:所有WebView方法必须在主线程调用(4.0) 所有WebView方法必须在同一线程调用(4.4)
  6. PHP获取浏览器版本号
  7. WebPart中的ReplaceTokens 方法
  8. SAP CRM Appointment应用里Date profile的配置
  9. Vue.js(5)- 全局组件
  10. 塑料壳上下扣合的卡扣设计_一种塑料件卡扣结构制造技术
  11. 132. 分割回文串 II
  12. 使用人脸客户端库快速实现对面部的分析---C#
  13. ArcGIS Server Manager打不开(运行时错误)
  14. VirtualBox安装RedHat7
  15. 纤亿通带你认识和正确使用SFP光模块
  16. aps后缀是什么文件_文件后缀大全
  17. Android蓝牙打印机功能开发完整Demo
  18. mysql partition 语法,MySQL与瀚高数据库的范围分区的语法及实例(APP)
  19. python 培训基础
  20. 渗透测试中用到的一些基本知识

热门文章

  1. JAVA调用PageOffice实现Word文档中加盖印章
  2. mysql递归函数替换oracle递归
  3. arcgistif合并_ARCGIS拼接影像教程
  4. poi 导出Excel 动态 合并单元格
  5. 如何在Mac笔记本电脑上查看电池用量历史记录及电池状态?
  6. 只有医者对治好耳鸣心怀希望,耳鸣者才有希望!
  7. 借势良政 环卫及再生资源回收行业现“蹄疾步稳”
  8. C++按列读取txt文件并保存,替代excel处理
  9. java定义一个动物接口,Java 动物声音“模拟器”(用接口实现)
  10. android studio 首次安装报 Unable to access Android SDK add-on list 错误