先简答说明一下上下文:

  1. 我就是想实现等待多个异步任务都执行完成的操作,同时搜集到每个任务的执行结果,当然可以使用线程池+CountDownLauncher,但是我个人更倾向于使用 CompletableFuture 来实现;
  2. 很明显要使用CompletableFuture.allOf().join() 来做;
  3. CompletableFuture 默认使用的是 ForkJoin 线程池,我个人倾向于自定义线程池;
  4. 自定义线程池拒绝策略是肯定要考虑的;

今天这个问题就出在拒绝策略上。这里偷个懒,直接将我之前一篇博客(https://dongguabai.blog.csdn.net/article/details/101145256)中的代码改下模拟我的项目代码:

package dongguabai.test.weixin;import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** @author Dongguabai* @description* @date 2021-08-07 13:00*/
public class Tests2 {private static final AtomicInteger SEQ = new AtomicInteger();private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(1, 1, 1,TimeUnit.MINUTES, new ArrayBlockingQueue<>(1), r -> new Thread(r, "my-thread-" + SEQ.getAndIncrement()), (r, executor) -> {throw new RuntimeException("REJECTED.......");});public static void main(String[] args) {try{Map<Integer, String> works = new HashMap<>(6);works.put(0, "a");works.put(1, "b");works.put(2, "c");works.put(3, "d");works.put(4, "e");works.put(5, "f");//Boolean标识成功或者失败Map<Integer, Boolean> resultMapv = new HashMap<>(6);System.out.println("Strat." + new Date().toLocaleString());CompletableFuture[] array = works.entrySet().stream().map(integerStringEntry ->CompletableFuture.supplyAsync(() -> process(integerStringEntry),EXECUTOR).whenComplete((result, e) -> {resultMapv.put(result.getKey(),true);})).toArray(CompletableFuture[]::new);CompletableFuture.allOf(array).join();System.out.println("End."+new Date().toLocaleString());System.out.println(resultMapv);}finally {EXECUTOR.shutdownNow();}}private static Map.Entry<Integer, String> process(Map.Entry<Integer, String> entry) {int workingTime = ThreadLocalRandom.current().nextInt(1, 10);workTime(workingTime*1000);System.out.println(Thread.currentThread().getName() + "完成工作,用时:" + workingTime);entry.setValue(entry.getValue() + "_finished");return entry;}private static void workTime(long ms) {final long l = System.currentTimeMillis();while (System.currentTimeMillis() <= l + ms) {}}
}

我这里线程池线程数设置的是 1,队列也是 1,也就是说同时最多只能进 2 个任务。我这里直接会丢 6 个任务进去。执行一下上面的代码,输出:

Strat.2021-8-7 14:22:58
Exception in thread "main" java.lang.RuntimeException: REJECTED.......at dongguabai.test.weixin.Tests2.lambda$static$1(Tests2.java:25)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)at dongguabai.test.weixin.Tests2.lambda$main$4(Tests2.java:45)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)at dongguabai.test.weixin.Tests2.main(Tests2.java:49)
my-thread-0完成工作,用时:2

同时进程也终止了。

输出结果并不符合我的预期,因为并没有打印出每个任务的执行结果。很明显,不符合预期的原因是因为执行了线程池的拒绝策略,任务都是串行丢给线程池执行的,造成一个任务被拒绝后直接影响了后续任务的执行。

那么就改一下,拒绝的时候不抛出异常:

    private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(1, 1, 1,TimeUnit.MINUTES, new ArrayBlockingQueue<>(1), r -> new Thread(r, "my-thread-" + SEQ.getAndIncrement()), (r, executor) -> {//throw new RuntimeException("REJECTED.......");System.out.println("REJECTED.......");});

再执行一遍,结果发现整个主线程直接就卡主了:

这个就很诡异了。要想知道为啥卡主了,就要解决两个问题:

  • 线程池的拒绝策略是怎么执行的?
  • 线程池的拒绝策略跟 CompletableFuture 的执行有什么联系?

先解决第一个问题,查看java.util.concurrent.ThreadPoolExecutor#execute方法:

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command); //执行拒绝策略else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command); //执行拒绝策略}

也就是说调用链路就是 ThreadPoolExecutor#execute-> ThreadPoolExecutor#reject。如果拒绝策略抛出了异常,会直接导致往线程池提交任务的时候出现异常,这也就解释了最开始为啥当拒绝策略是抛出异常的时候会导致后续任务无法提交。

再解决第二个问题,线程池的拒绝策略跟 CompletableFuture 的执行是怎么联系起来的呢?CompletableFuture 的源码非常复杂,抓大放小在,直奔主题,查看 java.util.concurrent.CompletableFuture#asyncSupplyStage 方法:

    static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) {if (f == null) throw new NullPointerException();CompletableFuture<U> d = new CompletableFuture<U>();e.execute(new AsyncSupply<U>(d, f));return d;}

即调用链路是 CompletableFuture#supplyAsync-> CompletableFuture#asyncSupplyStage-> CompletableFuture#supplyAsync-> ThreadPoolExecutor#execute-> ThreadPoolExecutor#reject。到这里,虽然整个调用链路非常明了了,但还是无法解释为什么会卡主。

发现遗漏了一个细节,程序卡主,是卡在哪了,从第二段代码的输出结果中可以得到两个结论:

  • 6 个任务的确是都丢到线程池中执行了;
  • 没有打印出”End“;
  • 程序卡着,连 finally 逗没进去;

也就是说明程序是卡在了 CompletableFuture.allOf(array).join(); 上。那么就再看看 CompletableFuture#join 是怎么玩的,看 java.util.concurrent.CompletableFuture#waitingGet 方法,那些细节就不看, 直接看最主要的:

  private Object waitingGet(boolean interruptible) {Signaller q = null;boolean queued = false;int spins = -1;Object r;while ((r = result) == null) {............}}

也就是说 CompletableFuture 是根据 result 作为标识符来判断任务是否执行完。

到这里,其实为什么程序会”卡主“无限等待,就是因为线程池执行了拒绝策略后造成 CompletableFuture 无法更新结束标识,从而导致无限 join

结论:

  • 线程池的拒绝策略抛出异常可以阻断无限 join,但是会造成后续串行提交的任务提交也被阻断;
  • 像我这样的场景,多个任务串行提交的情况下可以将 CompletableFuture#supplyAsync 方法 catch 一下;

以这个例子为例,可以这样改造:

package dongguabai.test.weixin;import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** @author Dongguabai* @description* @date 2021-08-07 13:00*/
public class Tests2 {private static final AtomicInteger SEQ = new AtomicInteger();private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(1, 1, 1,TimeUnit.MINUTES, new ArrayBlockingQueue<>(1), r -> new Thread(r, "my-thread-" + SEQ.getAndIncrement()), (r, executor) -> {throw new RuntimeException("REJECTED.......");//System.out.println("REJECTED.......");});public static void main(String[] args) {try {Map<Integer, String> works = new HashMap<>(6);works.put(0, "a");works.put(1, "b");works.put(2, "c");works.put(3, "d");works.put(4, "e");works.put(5, "f");//Boolean标识成功或者失败Map<Integer, Boolean> resultMapv = new HashMap<>(6);System.out.println("Strat." + new Date().toLocaleString());CompletableFuture[] array = works.entrySet().stream().map(integerStringEntry -> {try {return CompletableFuture.supplyAsync(() -> process(integerStringEntry), EXECUTOR).whenComplete((result, e) -> {resultMapv.put(result.getKey(), true);});} catch (Exception e) {resultMapv.put(integerStringEntry.getKey(), false);return CompletableFuture.completedFuture(new Object());}}).toArray(CompletableFuture[]::new);CompletableFuture.allOf(array).join();System.out.println("End." + new Date().toLocaleString());System.out.println(resultMapv);} finally {EXECUTOR.shutdownNow();}}private static Map.Entry<Integer, String> process(Map.Entry<Integer, String> entry) {int workingTime = ThreadLocalRandom.current().nextInt(1, 10);workTime(workingTime * 1000);System.out.println(Thread.currentThread().getName() + "完成工作,用时:" + workingTime);entry.setValue(entry.getValue() + "_finished");return entry;}private static void workTime(long ms) {final long l = System.currentTimeMillis();while (System.currentTimeMillis() <= l + ms) {}}
}

输出结果:

Strat.2021-8-7 15:07:10
my-thread-0完成工作,用时:1
my-thread-0完成工作,用时:9
End.2021-8-7 15:07:20
{0=true, 1=true, 2=false, 3=false, 4=false, 5=false}

References

  • https://dongguabai.blog.csdn.net/article/details/101145256
  • https://blog.csdn.net/Dongguabai/article/details/110338023

欢迎关注公众号:

关于 CompletableFuture 因为拒绝策略无限等待的解决思路相关推荐

  1. 任务数量超过线程池负荷了怎么办?拒绝策略安排起来!

    通过之前三篇关于Spring Boot异步任务实现的博文,我们分别学会了: @Async创建异步任务 为异步任务配置线程池 多个线程池隔离不同的异步任务 今天我们继续对异步任务的实现进行完善和优化! ...

  2. Java多线程学习七:线程池的 4 种拒绝策略和 6 种常见的线程池

    以便在必要的时候按照我们的策略来拒绝任务,那么拒绝任务的时机是什么呢?线程池会在以下两种情况下会拒绝新提交的任务. 第一种情况是当我们调用 shutdown 等方法关闭线程池后,即便此时可能线程池内部 ...

  3. Java六大线程池和四大拒绝策略

    Java六大线程池和四大拒绝策略 学习常见的 6 种线程池,并详细讲解 Java 8 新增的 ForkJoinPool 线程池,6 种常见的线程池如下. FixedThreadPool CachedT ...

  4. 一文详解java线程池 详解Java线程池的七个参数 详解池化技术 java如何选择核心线程数 详解Java线程池的拒绝策略

    目录 引言 线程池使用场景 加快请求响应(响应时间优先) 加快处理大任务(吞吐量优先) 特殊说明 线程池的池化技术 线程池的创建 手动创建 创建newFixedThreadPool线程池 创建newS ...

  5. 面试必问---Java线程池8大拒绝策略

    前言 谈到java的线程池最熟悉的莫过于ExecutorService接口了,jdk1.5新增的java.util.concurrent包下的这个api,大大的简化了多线程代码的开发.而不论你用Fix ...

  6. 面试官问:线程池除了常见的4种拒绝策略,你还知道哪些?

    点击关注上方"视学算法",设为"置顶或星标" 第一时间送达技术干货. 来源 | http://rrd.me/en3Wp 前言 谈到java的线程池最熟悉的莫过于 ...

  7. Java 线程池必知的8 大拒绝策略

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | kailing.pub/article/ind ...

  8. ThreadPoolExecutor 的八种拒绝策略 | 含番外!

    点击蓝色"程序猿DD"关注我 回复"资源"获取独家整理的学习资料! 作者 | KL 来源 | http://rrd.me/en3Wp 前言 谈到java的线程池 ...

  9. 【Java 并发编程】线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )

    文章目录 一.线程池阻塞队列 二.拒绝策略 三.使用 ThreadPoolExecutor 自定义线程池参数 一.线程池阻塞队列 线程池阻塞队列是线程池创建的第 555 个参数 : BlockingQ ...

最新文章

  1. 用Android Studio开发Java App (Runnable Jar)
  2. jboss8日志级别设置_罐中研讨会:设置JBoss BRMS全日研讨会
  3. MySql和Oracle数据库的区别?
  4. PNG免扣素材,快速提升你的画面设计感!
  5. 远程连接工具Putty 相关
  6. 各代iphone尺寸_iPhone每一代的屏幕尺寸比例是多少?
  7. 时间序列数据的存储和计算 - 概述
  8. STM32 DCMI调试
  9. `不知道是不是我惹的祸.有道网页翻译就失效了.`
  10. .net MVC 局部视图
  11. nm命令和其内容详解
  12. 谈谈SOA面向服务体系架构的安全问题
  13. oa系统用什么云服务器,oa系统怎么选云服务器
  14. 让网站用上骚气的人工智能!----全自动抠图
  15. strcmpi,stricmp函数
  16. 几个cve漏洞库查询网站
  17. 过会10个月,保荐机构主动要求撤销上市保荐,IPO终止
  18. 图片生成base64格式
  19. 建筑行业变革,ALC板材从工业建筑“红海”向民用住宅“蓝海”过渡
  20. V 神呼4123吁宽大处理1234123发者 Virgil Griffith 被判入狱 63 个月阿萨德按时

热门文章

  1. 关于Spring_02
  2. 命名实体识别(NER)算法
  3. 淘宝/天猫buyer_order_detail - 获取购买到的商品订单详情及 API 返回值说明
  4. 标识符命名规则和命名风格
  5. 《可伸缩服务架构 框架与中间件》综合(1)
  6. CSDN| CSDN自定义图片水印
  7. json解析嵌套jasn语句报错_jasn语法小记 - hiwill的个人空间 - OSCHINA - 中文开源技术交流社区...
  8. 考研大数据爬取与分析工具3.0需求分析文档
  9. 软件工程第一次作业(补充)
  10. 什么样的工作是好工作?