​ForkJoinPool源码解析

前言

Java 8Stream是对集合(Collection)对象功能的增强,其特性之一提供了流的并行处理 -> parallelStream。本篇来分析下项目中经常使用的parallelStream底层实现机制。

正文

以main函数为切入点分析, 采用parallelStream来处理集合数据。

public static void main(String[] args) {List<Integer> list = new ArrayList<>();list.addAll(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16));//1.1 .parallelStream()//1.2 .forEach(e ->{})list.parallelStream().forEach(e -> {try {Thread.currentThread().getThreadGroup().list();System.out.println("执行线程名:"+Thread.currentThread() + "  id " + e);} catch (InterruptedException h) {h.printStackTrace();}});
}

1.1 list调用 parallelStream() 返回一个并行流。

default Stream<E> parallelStream() {return StreamSupport.stream(spliterator(), true);
}

spliterator()方法返回一个迭代器对象。Spliterator是java 8引入的接口,用于并行遍历和划分源元素的迭代器。

//ArrayList类:
@Override
public Spliterator<E> spliterator() {return new ArrayListSpliterator<>(this, 0, -1, 0);
}

1.2 forEach()用于遍历集合数据,该方法用于接收一个Consumer接口函数。Consumer是java 8提供的函数式接口之一。

Stream接口:void forEach(Consumer<? super T> action);

Java 8函数式接口

函数式接口定义:有且仅有一个抽象方法

常用函数接口:

  • Consumer 消费接口,接收T(泛型)类型参数,没有返回值。

  • Function 该接口的抽象函数式有返回值。

  • Predicate 返回值固定的boolean。

从forEach出发

//流的源阶段
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {@Overridepublic void forEach(Consumer<? super E_OUT> action) {if (!isParallel()) {sourceStageSpliterator().forEachRemaining(action);}else {//并行实现super.forEach(action);}}
}
//流的源阶段与中间阶段
abstract class ReferencePipeline<P_IN, P_OUT>extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>implements Stream<P_OUT>  {@Overridepublic void forEach(Consumer<? super P_OUT> action) {//makeRef方法构造一个TerminalOp对象,该操作对流的每个元素执行操作。evaluate(ForEachOps.makeRef(action, false));}
}
//使用终端操作评估管道以生成结果。
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);//标记管道已使用linkedOrConsumed = true;return isParallel()? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
//静态抽象类:ForEachOp
@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,Spliterator<S> spliterator) {if (ordered)new ForEachOrderedTask<>(helper, spliterator, this).invoke();else//1.3 用于执行并行操作的任务new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();return null;
}

forEach阶段简单的介绍了下,重点在下文。

来张ForkJoinPool运行流程图:

正式分析ForkJoinPool源码之前,先看相关类的属性定义

ForkJoinPool线程池状态定义:

public class ForkJoinPool extends AbstractExecutorService {private static final int  RSLOCK     = 1; //锁定态private static final int  RSIGNAL    = 1 << 1; //唤醒态private static final int  STARTED    = 1 << 2; //开始态private static final int  STOP       = 1 << 29; //停止态private static final int  TERMINATED = 1 << 30; //终止态private static final int  SHUTDOWN   = 1 << 31; //关闭态//线程池状态volatile int runState;//线程池的控制变量字段/*** long类型ctl字段为64位,包含4个16位子字段* AC: 活动线程数 49-64位* TC: 总线程数 33-48位* SS: 顶部等待线程的版本计数和状态(第32位为1表示inative, 其余15位表示版本计数) 17-32位* ID: WorkQueue在WorkQueue[]中索引位置 1-16位* SP: 低32位。** AC为负:没有足够活跃的线程* TC为负:总线程数不足* SP非0:有等待的线程。**/volatile long ctl;}

ctl图示

WorkQueue属性定义:

static final class WorkQueue {......//扫描状态,<0:不活跃;奇数:扫描任务;偶数:执行任务volatile int scanState; //线程探针值,可用于计算workqueue队列在数组中槽位int hint; //高16位记录队列模式(FIFO、LIFO),低16位记录工作队列数组槽位(workqueue在数组中索引下标)int config;//队列状态,1:锁定;0:未锁定;<0:终止volatile int qlock;//下一个poll操作的索引(栈底/队列头),线程从base端窃取任务volatile int base;         // index of next slot for poll//下一个push操作的索引(栈顶/队列尾)int top;                   // index of next slot for push//任务数组ForkJoinTask<?>[] array;//队列所属线程池(可能为空)final ForkJoinPool pool;//当前WorkQueue所属工作线程,如果是外部提交的任务,则owner为空final ForkJoinWorkerThread owner; // owning thread or null if shared//当前正在进行join等待的其它任务volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin//当前正在偷取的任务volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer......
}

config字段高16位队列模式图:

ForkJoinTask任务状态定义:

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {//任务运行状态,初始为0volatile int status; static final int DONE_MASK   = 0xf0000000;  //任务状态掩码static final int NORMAL      = 0xf0000000;  //完成状态static final int CANCELLED   = 0xc0000000;  //取消状态static final int EXCEPTIONAL = 0x80000000;  //异常状态static final int SIGNAL      = 0x00010000;  //信号状态,用于唤醒阻塞线程static final int SMASK       = 0x0000ffff;  //低位掩码
}

ForkJoinTask初探

1.3 跟踪 invoke方法

//ForkJoinTask抽象类
public final V invoke() {int s;//获取任务执行状态if ((s = doInvoke() & DONE_MASK) != NORMAL)reportException(s);//计算结果return getRawResult();
}private int doInvoke() {int s; Thread t; ForkJoinWorkerThread wt;return (s = doExec()) < 0 ? s :((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?(wt = (ForkJoinWorkerThread)t).pool.//1.4 帮助执行或阻塞线程awaitJoin(wt.workQueue, this, 0L) ://阻塞一个非工作线程直到完成externalAwaitDone();
}

externalAwaitDone()方法这里就不贴代码了,简单说下逻辑。后面扫描任务的scan方法有详解,流程差不多。

外部线程,也就是前面例子中的main线程执行完doExec()完成任务拆分逻辑后,此时任务状态还不是完成态,则由main线程帮助执行子任务。

main线程先执行自己初始化创建队列中的任务,后扫描其它工作队列Workqueue中的任务。

上面流程执行完后,判断任务状态是否为完成态,如果不是完成态,说明有工作线程在执行任务,cas操作设置状态为唤醒态,然后main线程调用wait方法阻塞自己,等其它线程唤醒。工作线程任务执行完后会调用notifyAll唤醒主线程。最终任务全部执行完毕返回。

1.4  java.util.concurrent.ForkJoinPool#awaitJoin:

//帮助或阻塞,直到给定的任务完成或超时
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {int s = 0;if (task != null && w != null) {//记录前一个等待的任务ForkJoinTask<?> prevJoin = w.currentJoin;//设置当前任务U.putOrderedObject(w, QCURRENTJOIN, task);CountedCompleter<?> cc = (task instanceof CountedCompleter) ?(CountedCompleter<?>)task : null;for (;;) {//任务执行完if ((s = task.status) < 0)break;if (cc != null)//帮助完成任务,先执行自己队列任务,再扫描其它队列任务helpComplete(w, cc, 0);//如果队列为空,说明任务被偷else if (w.base == w.top || w.tryRemoveAndExec(task))//遍历队列找到当前偷取此任务的队列,执行偷取者队列的任务helpStealer(w, task);if ((s = task.status) < 0)break;long ms, ns;if (deadline == 0L)ms = 0L;else if ((ns = deadline - System.nanoTime()) <= 0L)break;else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)ms = 1L;//补偿措施,唤醒线程或创建线程执行任务if (tryCompensate(w)) {task.internalWait(ms);U.getAndAddLong(this, CTL, AC_UNIT);}}U.putOrderedObject(w, QCURRENTJOIN, prevJoin);}return s;
}

跟踪doExec方法,执行路径:ForkJoinTask#doExec -> CountedCompleter#exec -> ForEachTask#compute。

来看看compute()方法做了哪些事情。

//ForEachTask静态类
public void compute() {//ArrayList.ArrayListSpliterator迭代器Spliterator<S> rightSplit = spliterator, leftSplit;//预估值,初始化时为集合总数,后面为任务分割后子任务内元素和long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;if ((sizeThreshold = targetSize) == 0L)//目标值 = 预估值 / (总线程数 * 4)targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());boolean forkRight = false;Sink<S> taskSink = sink;ForEachTask<S, T> task = this;//任务切分逻辑while (!isShortCircuit || !taskSink.cancellationRequested()) {// 切分直至子任务的大小小于阈值if (sizeEstimate <= sizeThreshold ||//trySplit()将rightSplit平均分,返回平分后左边的任务//待切分任务小于等于1,停止切分(leftSplit = rightSplit.trySplit()) == null) {//执行函数式接口task.helper.copyInto(taskSink, rightSplit);break;}ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);//原子更新挂起任务数量task.addToPendingCount(1);ForEachTask<S, T> taskToFork;if (forkRight) {forkRight = false;rightSplit = leftSplit;taskToFork = task;task = leftTask;}else {forkRight = true;taskToFork = leftTask;}//将子任务提交到线程池ForkJoinPooltaskToFork.fork();sizeEstimate = rightSplit.estimateSize();}task.spliterator = null;//如果挂起计数不为0,则递减。最终任务处理完后设置状态为完成态,并唤醒阻塞的线程task.propagateCompletion();
}

compute方法主要逻辑就是达到拆分条件就平均拆分任务,待切分任务小于1时,停止切分,去执行任务。对应例子中的consumer函数式接口。

拆分后的任务会提交到队列中,由线程获取任务后继续调用 compute() 来判断拆分还是执行。

继续看fork逻辑:

//ForkJoinTask抽象类
public final ForkJoinTask<V> fork() {Thread t;//如果当前线程为ForkJoin工作线程,则任务放入自身工作线程队列if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);else//如果是外部线程池调用,则任务push到公共的线程池执行,由于本文是main函数开头,所以会执行此处ForkJoinPool.common.externalPush(this);return this;
}

ForkJoinPool类解析

来看下公共线程池的创建:
ForkJoinPool.common = makeCommonPool()
ForkJoinPool类初始化时会执行静态代码块,静态代码块中会执行makeCommonPool方法,返回ForkJoinPool对象实例

//ForkJoinPool类//创建并返回一个公共线程池
private static ForkJoinPool makeCommonPool() {int parallelism = -1;ForkJoinWorkerThreadFactory factory = null;......if (fp != null)factory = ((ForkJoinWorkerThreadFactory)ClassLoader.getSystemClassLoader().loadClass(fp).newInstance());if (parallelism < 0 && // default 1 less than #cores//线程池数量,默认cpu核数-1, 用户也可在环境变量中指定数量(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)parallelism = 1;if (parallelism > MAX_CAP)parallelism = MAX_CAP;return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,"ForkJoinPool.commonPool-worker-");
}

回到externalPush方法:

//外部任务提交
final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;//探针值,用于计算WorkQueue槽位索引。//ThreadLocalRandom原理:获取随机数时,每个线程获取自己初始化种子,避免多线程使用同一个原子种子变量,从而导致原子变量的竞争。int r = ThreadLocalRandom.getProbe();int rs = runState;//外部线程第一次进入,此时workQueues还未初始化,if代码不会执行if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&//这里会获取偶数槽位,外部提交的任务会放在WorkQueue数组的偶数槽位。后面会配图说明(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&//加锁U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a; int am, n, s;if ((a = q.array) != null &&(am = a.length - 1) > (n = (s = q.top) - q.base)) {//计算任务在内存中偏移量int j = ((am & s) << ASHIFT) + ABASE;//向队列放入任务U.putOrderedObject(a, j, task);//top值 +1U.putOrderedInt(q, QTOP, s + 1);//解锁U.putIntVolatile(q, QLOCK, 0);//任务数小于等于1,尝试创建或激活工作线程if (n <= 1)signalWork(ws, q);return;}U.compareAndSwapInt(q, QLOCK, 1, 0);}externalSubmit(task);}//完整版externalPush,用于将任务提交到池中,如果是第一次提交,则初始化工作队列
private void externalSubmit(ForkJoinTask<?> task) {int r; //线程探针值if ((r = ThreadLocalRandom.getProbe()) == 0) {//初始化线程字段,生成原子种子变量,获取随机数ThreadLocalRandom.localInit();r = ThreadLocalRandom.getProbe();}for (;;) {WorkQueue[] ws; WorkQueue q; int rs, m, k;boolean move = false;//如果线程池为终止状态,则抛出拒绝执行异常if ((rs = runState) < 0) {tryTerminate(false, false);     // help terminatethrow new RejectedExecutionException();}//线程池状态不是开始状态,则状态加锁,初始化WorkQueue 数组else if ((rs & STARTED) == 0 ||     // initialize((ws = workQueues) == null || (m = ws.length - 1) < 0)) {int ns = 0;//加锁rs = lockRunState();try {if ((rs & STARTED) == 0) {//通过原子操作,完成窃取任务总数这个计数器的初始化U.compareAndSwapObject(this, STEALCOUNTER, null,new AtomicLong());// create workQueues array with size a power of two//获取并行度config的值,前面说过config值为cpuh核数-1,比如我的系统是8核,那这里p=8-1=7。int p = config & SMASK; // ensure at least 2 slotsint n = (p > 1) ? p - 1 : 1;n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;//数组大小这里可以找到规律。找到大于2倍p的值最近的2^m。由于最后 (n+1)左移了一位,所以n最小值为4。//举例:p = 7, 那么 2p = 14, 大于2p的2^m = 16。所以这里m=4, n=16。//假如p=3, 2p=6,2^m=8 > 6。 则m=3, n=8。 workQueues = new WorkQueue[n]; //由于我这里p=7, 所以初始化队列数组长度为16。ns = STARTED;}} finally {//解锁,状态修改为开始态unlockRunState(rs, (rs & ~RSLOCK) | ns);}}//判断偶数槽位是否为空else if ((q = ws[k = r & m & SQMASK]) != null) {//qlock属性定义,初始值0,1:锁定,< 0:终止//队列加锁if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {//任务数组ForkJoinTask<?>[] a = q.array;int s = q.top;boolean submitted = false;try {if ((a != null && a.length > s + 1 - q.base) ||//growArray()逻辑:任务数组为空,则创建数组,初始大小为8192,不为空,则2倍扩容。任务移到新数组。(a = q.growArray()) != null) {//计算top在内存中偏移量int j = (((a.length - 1) & s) << ASHIFT) + ABASE; //任务存储到任务数组U.putOrderedObject(a, j, task);//更新top值。top定义:push操作的索引(栈顶)U.putOrderedInt(q, QTOP, s + 1);submitted = true;}} finally {//队列解锁U.compareAndSwapInt(q, QLOCK, 1, 0);}if (submitted) {//1.5 创建或激活工作线程signalWork(ws, q);return;}}move = true;                   // move on failure}//队列数组槽位为空,则创建一个WorkQueue放入数组else if (((rs = runState) & RSLOCK) == 0) {//外部线程所属队列是共享队列,参数owner传入nullq = new WorkQueue(this, null);//记录这个探针值q.hint = r;//记录队列数组槽位和队列模式q.config = k | SHARED_QUEUE;//共享队列初始化为inactive状态q.scanState = INACTIVE;rs = lockRunState();           // publish indexif (rs > 0 &&  (ws = workQueues) != null &&k < ws.length && ws[k] == null)ws[k] = q;unlockRunState(rs, rs & ~RSLOCK);}elsemove = true;if (move)r = ThreadLocalRandom.advanceProbe(r);}}

externalSubmit方法三部曲:

  • 初始化工作队列数组 WorkQueue[],

  • 创建工作队列 WorkQueue,

  • 任务放入任务数组 ForkJoinTask[], 更新top值, 创建线程执行任务。

画个图帮助理解任务提交,看到这的点个赞把☺(* ̄︶ ̄)

m & r & SQMASK必为偶数,所以通过externalSubmit方法提交的任务都添加到了偶数索引的任务队列中(没有绑定的工作线程)。

1.5  java.util.concurrent.ForkJoinPool#signalWork:

工作线程数不足:创建一个工作线程;

工作线程数足够:唤醒一个空闲(阻塞)的工作线程。

//创建或激活工作线程
final void signalWork(WorkQueue[] ws, WorkQueue q) {long c; int sp, i; WorkQueue v; Thread p;//ctl小于0时,活跃的线程数不够while ((c = ctl) < 0L) {//取ctl的低32位,如果为0,说明没有等待的线程if ((sp = (int)c) == 0) { //取tc的高位,不为0说明总线程数不够,创建线程if ((c & ADD_WORKER) != 0L)tryAddWorker(c);break;}if (ws == null) break;if (ws.length <= (i = sp & SMASK))break;if ((v = ws[i]) == null)    break;int vs = (sp + SS_SEQ) & ~INACTIVE; int d = sp - v.scanState;long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {v.scanState = vs;if ((p = v.parker) != null)//唤醒线程U.unpark(p);break;}if (q != null && q.base == q.top)break;}
}private void tryAddWorker(long c) {boolean add = false;do {//活跃线程和总线程数都 +1long nc = ((AC_MASK & (c + AC_UNIT)) |(TC_MASK & (c + TC_UNIT)));if (ctl == c) {int rs, stop; //加锁,并判断线程池状态是否为停止态if ((stop = (rs = lockRunState()) & STOP) == 0)//cas更新ctl的值add = U.compareAndSwapLong(this, CTL, c, nc);//解锁unlockRunState(rs, rs & ~RSLOCK);if (stop != 0)break;if (add) {//1.6 创建新的工作线程createWorker();break;}}//ADD_WORKER的第48位是1,其余位都为0,与ctl位与判断TC总线程是否已满,如果线程满了, 那么ctl的第48位为0,0 & ADD_WORKER = 0//(int)c == 0 表示截取ctl的低32位(int是4字节,1字节是8位)} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}

通过ctl的高32位来判断是否需要创建线程或激活线程。

上文有讲,公共线程池数量为cpu核数-1,我电脑是8核,所以总线程数为7。也就是并行度为7。高32位 +7最终等于0。

1.6  java.util.concurrent.ForkJoinPool#createWorker:

private boolean createWorker() {ForkJoinWorkerThreadFactory fac = factory;Throwable ex = null;ForkJoinWorkerThread wt = null;try {//使用线程工厂创建工作线程if (fac != null && (wt = fac.newThread(this)) != null) {//启动线程wt.start();return true;}} catch (Throwable rex) {ex = rex;}deregisterWorker(wt, ex);return false;
}

线程工厂创建线程。

外部线程提交任务后创建线程,线程扫描到任务后会继续创建线程,直到达到总线程数。

本地测试中某一次线程创建流程图如下,可以看到main线程创建了worker-1和worker-3工作线程,worker-1又创建了worker-2和worker-4工作线程。都属于同一个main线程组。

ForkJoinWorkerThread

public void run() {
//线程初次运行任务数组为空
if (workQueue.array == null) { // only run onceThrowable exception = null;try {onStart();pool.runWorker(workQueue);} catch (Throwable ex) {exception = ex;} finally {try {onTermination(exception);} catch (Throwable ex) {if (exception == null)exception = ex;} finally {pool.deregisterWorker(this, exception);}}
}
}
java.util.concurrent.ForkJoinPool#runWorkerfinal void runWorker(WorkQueue w) {//初始化或两倍扩容w.growArray();     //获取创建工作队列的线程探针值int seed = w.hint; int r = (seed == 0) ? 1 : seed;  for (ForkJoinTask<?> t;;) {//扫描任务if ((t = scan(w, r)) != null)//1.7 执行获取到的任务w.runTask(t);else if (!awaitWork(w, r))break;r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift}
}private ForkJoinTask<?> scan(WorkQueue w, int r) {WorkQueue[] ws; int m;//判断任务队列数组是否为空if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {//获取扫描状态int ss = w.scanState; for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;int b, n; long c;//如果k槽位不为空if ((q = ws[k]) != null) {if ((n = (b = q.base) - q.top) < 0 &&(a = q.array) != null) {//获取base的偏移量long i = (((a.length - 1) & b) << ASHIFT) + ABASE;//FIFO模式,从base端获取任务if ((t = ((ForkJoinTask<?>)U.getObjectVolatile(a, i))) != null &&q.base == b) {//判断线程是否活跃状态if (ss >= 0) {//任务被偷了。更新任务数组a在内存中偏移量为base位置的值为空if (U.compareAndSwapObject(a, i, t, null)) {  //更新base值q.base = b + 1;if (n < -1)//数组还有任务,创建或激活线程执行任务signalWork(ws, q);//返回扫描到的任务return t;}}else if (oldSum == 0 && w.scanState < 0)tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);}if (ss < 0) ss = w.scanState;r ^= r << 1; r ^= r >>> 3; r ^= r << 10;origin = k = r & m; oldSum = checkSum = 0;continue;}checkSum += b;}//m二进制位都为1,(k+1)&m 会遍历工作队列数组所有槽位if ((k = (k + 1) & m) == origin) {if ((ss >= 0 || (ss == (ss = w.scanState))) &&oldSum == (oldSum = checkSum)) {if (ss < 0 || w.qlock < 0)break;int ns = ss | INACTIVE;//活跃线程-1long nc = ((SP_MASK & ns) |(UC_MASK & ((c = ctl) - AC_UNIT)));w.stackPred = (int)c;U.putInt(w, QSCANSTATE, ns);if (U.compareAndSwapLong(this, CTL, c, nc))ss = ns;elsew.scanState = ss;         // back out}checkSum = 0;}}}return null;
}

scan扫描任务,从其它队列的base端偷取。

base属性加了volatile关键字,保证了共享变量的内存可见性。获取任务后通过cas乐观锁操作将被窃取的队列中任务置位空。

此窃取机制减少了top和base端的竞争(队列线程和窃取线程分别从top和base端操作),用cas操作也提高了效率。

比如外部线程分割了3个任务,那么top+3=4099,三个任务放在内存偏移量为4096、4097、4098的位置。启动的工作线程从base端偷取任务。如下图:

1.7  java.util.concurrent.ForkJoinPool.WorkQueue#runTask:

final void runTask(ForkJoinTask<?> task) {if (task != null) {//此时scanState变成了偶数,表示正在执行任务scanState &= ~SCANNING; // mark as busy//执行窃取到的任务,doExec上面有讲,会在compute方法中执行任务或分割任务,分割的任务放入自己队列的任务数组里。(currentSteal = task).doExec();//将窃取到的任务字段置空U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC//执行自个任务数组里任务。任务来源就是上面分割后的子任务。execLocalTasks();ForkJoinWorkerThread thread = owner;//窃取任务数+1if (++nsteals < 0)      // collect on overflow//任务数累加到pool的stealCounter字段中transferStealCount(pool);//恢复扫描状态scanState |= SCANNING;if (thread != null)//钩子方法thread.afterTopLevelExec();}
}final void execLocalTasks() {int b = base, m, s;ForkJoinTask<?>[] a = array;//任务数组中有任务if (b - (s = top - 1) <= 0 && a != null &&(m = a.length - 1) >= 0) {//队列模式,判断是否为后进先出模式 LIFO。if ((config & FIFO_QUEUE) == 0) {for (ForkJoinTask<?> t;;) {//从top端取任务if ((t = (ForkJoinTask<?>)U.getAndSetObject(a, ((m & s) << ASHIFT) + ABASE, null)) == null)break;//修改内存中top值U.putOrderedInt(this, QTOP, s);//执行任务t.doExec();if (base - (s = top - 1) > 0)break;}}else//先进先出模式 FIFO, 轮询从base端取任务运行,直到为空pollAndExecAll();}
}

runTask方法主要逻辑就是将扫描到的任务执行,执行过程中可能会继续分割。执行完窃取的任务后,就执行自个队列里的任务。

至此,ForkJoinPool就分析到这。

总结:

大概可以用一句话总结ForkJoinPool原理:一个可以并行执行任务的线程池,可以处理一个可递归划分的任务并获取结果。

参考博文:

[1]  https://blog.csdn.net/qq_27785239/article/details/103395079
[2]  https://blog.csdn.net/yusimiao/article/details/114009972

parallelStream 底层 ForkJoinPool 实现相关推荐

  1. system流怎么判断为空_并行流ParallelStream中隐藏的陷阱

    点击上方蓝字 ↑↑ Throwable文摘 关注公众号设置星标,不定时推送高质量原创文章 关注 前提 这篇文章介绍一下日常开发中并行流ParallelStream中隐藏的陷阱,这个问题其实离我们很近, ...

  2. java 进阶笔记线程与并发之ForkJoinPool简析

    简介 ForkJoinPool是一个线程池,支持特有的的ForkJoinTask,对于ForkJoinTask任务,通过特定的for与join方法可以优化调度策略,提高效率. 使用 通常,我们继承使用 ...

  3. JDK8新特性:Lambda表达式、Stream流、日期时间工具类

    重要特性: 可选类型声明:不需要声明参数类型,编译器可以统一识别参数值. 可选的参数圆括号:一个参数无需定义圆括号,但多个参数需要定义圆括号. 可选的大括号:如果主体包含了一个语句,就不需要大括号. ...

  4. 记一次java程序从6H到30min的优化过程

    背景 我们有一个java系统,需要从固定的文件目录中读取csv格式的文件,然后将文件内容读取后在进行一系列的转换以及数据库操作,包含文件内容转换为java对象.内容去重.查询oracle.生成目标数据 ...

  5. JAVA跨线程传递数据方式总结

    实现跨线程传递数据方式: v1:子线程使用主线程的局部变量 这种当主线程和子线程不在一快儿时就不适用.可以使用JDK原生的InheritableThreadLocal. v2:InheritableT ...

  6. parallelstream启动的线程数_谈谈并行流parallelStream

    一.parallelStream是什么 Java8中提供了能够更方便处理集合数据的Stream类,其中parallelStream()方法能够充分利用多核CPU的优势,使用多线程加快对集合数据的处理速 ...

  7. 自定义parallelStream的thread pool

    文章目录 简介 通常操作 使用自定义ForkJoinPool 总结 自定义parallelStream的thread pool 简介 之前我们讲到parallelStream的底层使用到了ForkJo ...

  8. Java8 并行流(parallelStream)原理分析及注意事项

    文章目录 前言 一.parallelStream是什么 二.parallelStream原理分析 1.Fork/Join框架 1.1 work-stealing(工作窃取算法) 1.2 常用方法 2. ...

  9. 线程及同步的性能 – 线程池/ ThreadPoolExecutors/ ForkJoinPool

    线程池和ThreadPoolExecutors 虽然在程序中可以直接使用Thread类型来进行线程操作,但是更多的情况是使用线程池,尤其是在Java EE应用服务器中,一般会使用若干个线程池来处理来自 ...

最新文章

  1. 面试:你知道Java中的回调机制吗?
  2. 详细分析本机号码一键登录原理
  3. 菜鸟发现--网站的皮肤 ^_^
  4. windows下 Qt 静态编译
  5. NSString/NSMutable字符串——韩俊强…
  6. myisam怎么读_耗时半年,我成功“逆袭”,拿下美团offer(刷面试题+读源码+项目准备)...
  7. 【C语言进阶深度学习记录】二十八 数组指针与指针数组的分析
  8. 耗时两年,19岁小伙采用230片纯74逻辑芯片搭建出32位处理器!可玩贪吃蛇
  9. 自己在项目中写的一个Jquery插件和Jquery tab 功能
  10. 你那不是拖延症,只是习惯性逃避
  11. 流浪四十年,中国科幻撒点野
  12. ti-sdk-evm-am335x-05.07 uboot分析(MLO跳转到u-boot之前)
  13. Linkedin领英如何批量撤回邀请
  14. OLED_I2C_SH1106屏幕教程
  15. Windows与USB的通信
  16. 文章转载---西工大博士生文言文答辩致谢
  17. c语言姓名如何定义,c语言如何输入编号和姓名
  18. 3D-Max 软件许可证检出失败 错误20 解决办法
  19. 视频怎么压缩小于100兆?压缩视频这么做就可以了
  20. 播放流媒体常用工具和格式

热门文章

  1. [Buzz.Today]2012.01.10
  2. 接线端子02——接线端子-线鼻-线耳
  3. 【前端】Echarts的scatter3D各个属性具体含义
  4. 在外企工作三年的我 给大家一些建议 附送学习网站及资料无数(转)
  5. 数据库纳管平台DBhouse的技术路线与实践
  6. 从《蜗居》看失衡的中国城市化
  7. php做的论坛 vip,论坛会员源码分享
  8. 鼠标的移入、移出事件与css中的hover伪类
  9. Rexsee API介绍:Android屏幕锁定Keyguard
  10. 都20多岁了,总是怕这怕那的,怎么办?