前言

本文是专栏《AsyncTool框架原理源码解析》系列的第二篇文章《AsyncTool框架实战使用》,通过编排串行、并行、阻塞等待-先串行,后并行、阻塞等待-先并行,后串行、异常、超时等多种场景进行编码调试,学习AsyncTool框架的实际应用,并初步探知其实现原理。

专栏《AsyncTool框架原理源码解析》共有 5篇 文章,由浅入深,从实战使用再到源码、原理分析,包括但不仅局限于AsyncTool框架思考和总结,最后分享下我为AsyncTool框架开源项目贡献代码。以下是《专栏目录》:

  1. 《AsyncTool框架简介和分析实现》
  2. 《AsyncTool框架实战使用》
  3. 《AsyncTool框架原理源码解析》
  4. 《AsyncTool框架的一些思考》
  5. 《AsyncTool框架竟然有缺陷?》

1、串行场景

在实战之前,先看一下Worker最小任务单元的使用。

Worker的定义如下,实现IWorker,ICallback函数式接口,并重写下面的4个方法。4个方法的说明如下:

1、begin():Worker开始执行前,先回调begin()

2、action():Worker中执行耗时操作的地方,比如RPC接口调用。

3、result():action()执行完毕后,回调result方法,可以在此处处理action中的返回值。

4、defaultValue():整个Worker执行异常,或者超时,会回调defaultValue(),Worker返回默认值

如果没有实现ICallback,会默认执行DefaultCallback的回调方法。DefaultCallback是一个空的回调,里面没有任何逻辑。

模拟串行场景:A任务对参数+1,之后B任务对参数+2,之后C任务对参数+3。(场景没有实际业务含义,很简单)

(1)workerA:

public class WorkerA implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {/*** Worker开始的时候先执行begin*/@Overridepublic void begin() {System.out.println("A - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());}/*** Worker中耗时操作在此执行RPC/IO* @param object      object* @param allWrappers 任务包装* @return*/@Overridepublic Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {Integer res = object + 1;return res;}/*** action执行结果的回调* @param success* @param param* @param workResult*/@Overridepublic void result(boolean success, Integer param, WorkResult<Integer> workResult) {System.out.println("A - param:" + JSON.toJSONString(param));System.out.println("A - result:" + JSON.toJSONString(workResult));System.out.println("A - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());}/*** Worker异常时的回调* @return*/@Overridepublic Integer defaultValue() {System.out.println("A - defaultValue");return 101;}}

(2)workerB:

public class WorkerB implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {/*** Worker开始的时候先执行begin*/@Overridepublic void begin() {System.out.println("B - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());}/*** Worker中耗时操作在此执行RPC/IO* @param object      object* @param allWrappers 任务包装* @return*/@Overridepublic Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {Integer res = object + 2;return res;}/*** action执行结果的回调* @param success* @param param* @param workResult*/@Overridepublic void result(boolean success, Integer param, WorkResult<Integer> workResult) {System.out.println("B - param:" + JSON.toJSONString(param));System.out.println("B - result:" + JSON.toJSONString(workResult));System.out.println("B - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());}/*** Worker异常时的回调* @return*/@Overridepublic Integer defaultValue() {System.out.println("B - defaultValue");return 102;}
}

(3)WorkerC:

public class WorkerC implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {/*** Worker开始的时候先执行begin*/@Overridepublic void begin() {System.out.println("C - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());}/*** Worker中耗时操作在此执行RPC/IO* @param object      object* @param allWrappers 任务包装* @return*/@Overridepublic Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {Integer res = object + 3;return res;}/*** action执行结果的回调* @param success* @param param* @param workResult*/@Overridepublic void result(boolean success, Integer param, WorkResult<Integer> workResult) {System.out.println("C - param:" + JSON.toJSONString(param));System.out.println("C - result:" + JSON.toJSONString(workResult));System.out.println("C - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());}/*** Worker异常时的回调* @return*/@Overridepublic Integer defaultValue() {System.out.println("C - defaultValue");return 103;}
}

(4) 编排WorkerWrapper包装类:

上面Worker创建好之后,使用WorkerWrapper对Worker进行包装以及编排,WorkerWrapper是AsyncTool组件的最小可执行任务单元。

C是最后一步,它没有next。B的next是C,A的next是B。编排顺序就是:C <- B <- A

public class Test {public static void main(String[] args) {//引入Worker工作单元WorkerA workerA = new WorkerA();WorkerB workerB = new WorkerB();WorkerC workerC = new WorkerC();//包装Worker,编排串行顺序:C <- B <- A//C是最后一步,它没有nextWorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3)//3+3.build();//B的next是CWorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2)//2+2.next(wrapperC).build();//A的next是BWorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1)//1+1.next(wrapperB).build();try {//ActionAsync.beginWork(1000, wrapperA);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}}
}

(5)通过执行器类Async的beginWork方法提交任务执行。

  1. Timeout:全组任务超时时间设定,如果Worker任务超时,则Worker结果使用defaultValue()默认值。
  2. ExecutorService executorService:自定义线程池,不自定义的话,就走默认的COMMON_POOL。默认的线程池是不定长线程池
  3. WorkerWrapper… workerWrapper:起始任务,可以是多个。注意不要提交中间节点的任务,只需要提交起始任务即可,编排的后续任务会自动执行。
//默认不定长线程池
private static final ThreadPoolExecutor COMMON_POOL = (ThreadPoolExecutor) Executors.newCachedThreadPool();Async.beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)

(6)上面只是一种写法,如果觉得这种写法反人类,也可以使用depend方式编排:

//A没有depend
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1).build();//B的depend是A
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2).depend(wrapperA).build();//C的depend是B
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3).depend(wrapperB).build();
//begin
Async.beginWork(1000, wrapperA);

运行结果:A:1+1=2;B:2+2=4;C:3+3=6

2、并行场景

场景模拟:基于串行场景,。A任务对参数+1,B任务对参数+2,C任务对参数+3。并行执行。

WorkerWrapper并行编排:A\B\C都没有next和depend, 3个WorkerWrapper一起begin。Async.beginWork(1000, wrapperA, wrapperB, wrapperC);

public class Test {public static void main(String[] args) {//引入Worker工作单元WorkerA workerA = new WorkerA();WorkerB workerB = new WorkerB();WorkerC workerC = new WorkerC();/*** 包装Worker,编排并行顺序*///AWorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1)//1+1.build();//BWorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2)//2+2.build();//CWorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3)//3+3.build();try {//3个WorkerWrapper一起beginAsync.beginWork(1000, wrapperA, wrapperB, wrapperC);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}}
}

执行结果:ABC分别使用不同的线程并行执行。A:1+1=2;B:2+2=4;C:3+3=6

3、阻塞等待 - 先串行,后并行

阻塞等待 - 先串行,后并行场景模拟:A先执行,对参数+1;A执行完毕之后,B\C同时并行执行,B任务基于A的返回值+2,C任务基于A的返回值+3

(1)next写法:

public static void nextWork() {//引入Worker工作单元WorkerA workerA = new WorkerA();WorkerB workerB = new WorkerB();WorkerC workerC = new WorkerC();//C是最后一步,它没有nextWorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(null)//没有参数,根据A的返回值+3.build();//B是最后一步,它没有nextWorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(null)//没有参数,根据A的返回值+2.build();//A的next是B、CWorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1)//1+1//next是B、C.next(wrapperB, wrapperC).build();try {//ActionAsync.beginWork(1000, wrapperA);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}
}

(2)depend写法:

//A没有depend,就是开始
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1).build();//C depend A
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(null).depend(wrapperA).build();
//B depend A
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(null).depend(wrapperA).build();

执行结果:A:1+1 = 2;B:2+2 =4;C:3+2 = 5

4、阻塞等待 - 先并行,后串行

场景模拟:阻塞等待 - 先并行,后串行。

B\C并行执行。B对参数+2,C对参数+3,B\C全部执行完后,A = B返回值+C返回值。

注意:需要B和C同时begin。Async.beginWork(4000, wrapperB, wrapperC);

(1)next写法:

public static void nextWork() {//引入Worker工作单元WorkerA workerA = new WorkerA();WorkerB workerB = new WorkerB();WorkerC workerC = new WorkerC();//A是最后一步,没有nextWorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(null)//参数是null,A = B + C.build();//C next AWorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3)//3+3 = 6.next(wrapperA).build();//B next AWorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2)//2+2 = 4.next(wrapperA).build();try {new SynchronousQueue<Runnable>();//ActionAsync.beginWork(4000, wrapperB, wrapperC);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}
}

(2)depend写法:

//C没有depend,是起始节点
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3)//3+3 = 6.build();
//B没有depend,是起始节点
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2)//2+2 = 4.build();//A depend B,C
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(null)//参数是null,A = B + C.depend(wrapperB, wrapperC).build();

执行结果:B:2+2=4;C:3+3 = 6;A = B+C = 10

5、异常、超时回调场景

这2种场景,可以基于以上场景微调,即可debug调试。

//超时时间,线程池,初始Wrapper,多个
Async.beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)
  1. 基于全组设定的timeout,如果超时了,则worker中的返回值使用defaultValue()
  2. 如果当前Worker任务异常了,则当前任务使用defaultValue(),并且depend当前任务的,也FastFail,返回defaultValue()

2、AsyncTool框架实战使用相关推荐

  1. java ee核心框架实战 pdf_Java EE核心框架实战 高洪岩 中文PDF

    资源名称:Java EE核心框架实战 高洪岩 中文PDF 第1章 MyBatis 3操作数据库 第2章 MyBatis 3常用技能 第3章 Struts 2必备开发技能 第4章 Struts 2文件的 ...

  2. 《Java EE核心框架实战》—— 2.3 resultMap 标签

    本节书摘来异步社区<Java EE核心框架实战>一书中的第2章,第2.3节,作者: 高洪岩,更多章节内容可以访问云栖社区"异步社区"公众号查看. 2.3 < re ...

  3. 【Spring 5】响应式Web框架实战(下)

    - [Spring 5]响应式Web框架前瞻  - 响应式编程总览  - [Spring 5]响应式Web框架实战(上) 1 回顾 上篇介绍了如何使用Spring MVC注解实现一个响应式Web应用( ...

  4. python接口测试框架实战与自动化进阶(三)

    python接口测试框架实战与自动化进阶 一.持续集成 1.持续集成环境搭建 1)安装Jenkins 官网下载后直接安装:https://jenkins.io/ 终端直接安装及启动:java -jar ...

  5. 8s yaml 配置生成_接口测试框架实战(六) | 配置的数据驱动

    <Python 测试开发实战进阶>课程,4 个月挑战 BAT 大厂年薪 50W+ Offer,文末加群! 在实际工作中,为了便于维护,对于环境的切换和配置,通常不会使用硬编码的形式完成.在 ...

  6. 11小时 python自动化测试从入门到_从设计到开发Python接口自动化测试框架实战,资源教程下载...

    课程名称 从设计到开发Python接口自动化测试框架实战,资源教程下载 课程简介: 课程从接口基础知识入门,从抓包开始,到接口工具的运用,再到常见接口库.接口开发.Mock服务.unittest框架的 ...

  7. python接口自动化测试框架实战从设计到开发_Python接口自动化测试框架实战 从设计到开发...

    第1章 课程介绍(不要错过) 本章主要讲解课程的详细安排.课程学习要求.课程面向用户等,让大家很直观的对课程有整体认知! 第2章 接口测试工具Fiddler的运用 本章重点讲解如何抓app\web的h ...

  8. Angular4+AdminLTE+Jeecg 前后端分离框架实战-张代浩-专题视频课程

    Angular4+AdminLTE+Jeecg 前后端分离框架实战-2259人已学习 课程介绍         Angular4+AdminLTE+Jeecg 前后端分离框架实战 涉及技术点:angu ...

  9. 应用程序框架实战二十二 : DDD分层架构之仓储(层超类型基础篇)

    前一篇介绍了仓储的基本概念,并谈了我对仓储的一些认识,本文将实现仓储的基本功能. 仓储代表聚合在内存中的集合,所以仓储的接口需要模拟得像一个集合.仓储中有很多操作都是可以通用的,可以把这部分操作抽取到 ...

最新文章

  1. OpenGL 有时候纹理映射的部分问题
  2. ASP.NET站点性能提升-压缩
  3. java异常体系分类(面试)
  4. 取消win2003关机提示的设置
  5. Spark之scala学习(基础篇)待更新
  6. 通过JMX获取weblogic的监控指标
  7. eclipse中自定义videoview类_android控件之VideoView建立自己的播放器
  8. 5Python全栈之路系列之字符串格式化
  9. 大数据_Hbase-API访问_Java操作Hbase_获取连接---Hbase工作笔记0011
  10. layer弹出层 iframe层去掉滚动条 content : [url , 'no']
  11. python画图时常用的颜色——color=‘ ’
  12. python语言的单行注释以单引号开头_知到智慧树大数据分析的python基础(山东联盟)(1)答案章节期末答案...
  13. RocketMQ(消息中间件)
  14. 测试用例编写方法——场景法
  15. Win10离线安装.NET Framework 3.5的方法总结
  16. kalman滤波的matlab,kalman滤波matlab实现
  17. BUUCTF misc 九连环隐写
  18. 在树莓派(Linux)上使用FTDI的usb转串口芯片ft232两种驱动方式(VCP和D2xx)
  19. 27 - Excel 的基本公式和重要函数(Excel入门下)
  20. 2018.06.27Firing(最大权闭合子图)

热门文章

  1. CHIP8 Emulator(1)——CHIP8简介
  2. C语言基础 - 结构体类型字节对齐总结
  3. MessageBeep - Play a System sound
  4. vue.js项目实战运用篇之抖音视频APP-第八节: 视频播放功能
  5. vue.js项目实战运用篇之抖音视频APP-第一节:项目环境搭建
  6. python 爬虫获取代理Ip
  7. 吃鸡登录界面一直服务器维护,绝地求生正式服停机维护到几点?吃鸡进不去怎么办?...
  8. Sqlserver 数据库异步公共访问类dbhelperasync -SmartSoftHelp辅助开发工具
  9. AI伦理问题探究:人工智能对我们的未来会产生什么样的影响?
  10. java使用mq教程,Java语言快速实现简单MQ消息队列服务