DAGScheduler

DAGScheduler是Spark中比较重要的类,实现了面向DAG的高层次调度,DAGScheduler通过计算将DAG中的一系列RDD划分到不同的Stage,然后构建这些Stage之间的父子关系,最后将每个Stage按照Partition切分为多个Task,并以TaskSet的形式提交给底层的TaskScheduler。所有的组件都通过向DAGScheduler投递DAGSchedulerEvent来使用DAGSchedulerDAGScheduler内部的DAGSchedulerEventProcessLoop将处理这些DAGScheduler-Event,并调用DAGScheduler的不同方法。

Job管理

Job概述

Spark支持两种RDD操作:transformationactiontransformation操作会针对已有的RDD创建一个新的RDD;而action则主要是对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并可以返回结果给Driver。transformation的特点就是lazy特性,如果一个spark应用中只定义了transformation操作,那么即使你执行该应用,这些操作也不会执行。也就是说,transformation是不会触发spark程序的执行的,它们只是记录了对RDD所做的操作,但是不会自发的执行。只有当执行了一个action操作,那么所有的transformation才会执行。Spark通过这种lazy特性,来进行底层的spark应用执行的优化,避免产生过多中间结果。所以,action算子是job划分的依据,会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行。

DAGScheduler-Job管理

DAGScheduler会记录目前运行的Job,Job是异步运行的,相关变量如下:

  1. nextJobId用来生成JobId,用于Job记录,还用于后续Stage调度优先级比较;
  2. numTotalJobs是目前运行完或者在运行的所有任务数目;
  3. jobIdToStageIds记录JobIdStageId集合的对应关系;
  4. jobIdToActiveJob记录JobId跟活跃Job的关系。
  5. activeJobs记录目前活跃的job。
private[scheduler] val nextJobId = new AtomicInteger(0)
private[scheduler] def numTotalJobs: Int = nextJobId.get()
private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val activeJobs = new HashSet[ActiveJob]

JobWaiter

DAGScheduler中的runJob函数,拿到相应的Job后,首先会通过submitJob来构造JobWaiter,由于执行Job的过程是异步的,利用JobWaiter等待Job处理完毕。如果Job执行成功,根据处理结果打印相应的日志;如果Job执行失败,除打印日志外,还将抛出引起Job失败的异常信息。

def submitJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit,properties: Properties): JobWaiter[U] = {// 获取当前Job的最大分区数val maxPartitions = rdd.partitions.length...// 生成下一个Job的jobIdval jobId = nextJobId.getAndIncrement()// 如果Job的分区数量等于0,则创建一个totalTasks属性为0的JobWaiter并返回。// 根据JobWaiter的实现,totalTasks属性为0的JobWaiter的jobPromise将被设置为Success。if (partitions.size == 0) { return new JobWaiter[U](this, jobId, 0, resultHandler)}// 分区数量大于0val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]// 创建JobWaiterval waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)// 将JobWaiter包装到JobSubmitted消息中,投递给DAGSchedulerEventProcessLoop,// 这个消息最终会被DAGScheduler的handleJobSubmitted()方法处理。eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties)))// 返回JobWaiterwaiter
}def runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite,  resultHandler: (Int, U) => Unit,properties: Properties): Unit = {// 启动时间val start = System.nanoTime// 提交Job,该方法是异步的,会立即返回JobWaiter对象val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]// 等待Job处理完毕waiter.completionFuture.ready(Duration.Inf)(awaitPermission)// 获取运行结果waiter.completionFuture.value.get match {case scala.util.Success(_) => // Job执行成功case scala.util.Failure(exception) => // Job执行失败// 记录线程异常堆栈信息val callerStackTrace = Thread.currentThread().getStackTrace.tailexception.setStackTrace(exception.getStackTrace ++ callerStackTrace)// 抛出异常throw exception}
}

这里的handler实际是任务结果的处理,简单的将结果存储到相应taskId对应的index中;func是action算子中对于iterator数据的处理函数,让我们来看下count的调用链:

def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U,  partitions: Seq[Int]): Array[U] = {val results = new Array[U](partitions.size)runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)results
}def runJob[T, U: ClassTag](rdd: RDD[T],func: Iterator[T] => U,partitions: Seq[Int]): Array[U] = {val cleanedFunc = clean(func)runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {runJob(rdd, func, 0 until rdd.partitions.length)
}def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

JobListener

JobListener定义了所有Job的监听器的接口规范,Job执行成功后将调用Job Listener定义的taskSucceeded方法,而在Job失败后调用Job Listener定义的jobFailed方法。

private[spark] trait JobListener {// Job执行成功后将调用该方法def taskSucceeded(index: Int, result: Any): Unit// Job执行失败后将调用该方法def jobFailed(exception: Exception): Unit
}

私有变量

JobWaiter用于等待整个Job执行完毕,然后调用给定的处理函数对返回结果进行处理,我们来看下它的属性:

  1. dagScheduler 的引用;
  2. jobId当前job的Id,在dagScheduler中生成;
  3. totalTasks是当前Job对应的ResultStage的task的数目;
  4. resultHandlers是结果处理函数,对于任务得到的结果进行处理;
  5. finishedTasks是已经完成的任务数目,通过taskSucceeded更新;
  6. jobPromise是用来代表Job完成后的结果。
private[spark] class JobWaiter[T](dagScheduler: DAGScheduler,val jobId: Int, totalTasks: Int, resultHandler: (Int, T) => Unit)extends JobListener with Logging {// 等待完成的Job中已经完成的Task数量private val finishedTasks = new AtomicInteger(0)// 用来代表Job完成后的结果。如果totalTasks等于零,说明没有Task需要执行,此时将被直接设置为Success。private val jobPromise: Promise[Unit] =if (totalTasks == 0) Promise.successful(()) else Promise()// Job是否已经完成def jobFinished: Boolean = jobPromise.isCompleted// 返回jobPromise的futuredef completionFuture: Future[Unit] = jobPromise.future
}

任务成功处理

当任务成功后,TaskSchduler会通过DAGScheduler发送任务完成消息,如果任务执行成功,而且任务类型为ResultTask,有可能会导致当前Job所有任务都结束了,所以会调用jobwaiter.taskSucceeded来累加处理结果,参数index代表任务Id,参数result代表任务的运行结果。

// Job执行成功后将调用该方法
override def taskSucceeded(index: Int, result: Any): Unit = {synchronized { // 加锁进行回调resultHandler(index, result.asInstanceOf[T])}// 完成Task数量自增,如果所有Task都完成了就调用JobPromise的success()方法if (finishedTasks.incrementAndGet() == totalTasks) {jobPromise.success(())}
}

任务失败处理

job失败后会调用jobPromise的相关方法将其设置为Failure

// Job执行失败后将调用该方法
override def jobFailed(exception: Exception): Unit = {// 调用jobPromise的相关方法将其设置为Failureif (!jobPromise.tryFailure(exception)) {logWarning("Ignore failure", exception)}
}

ActiveJob

ActiveJob用来表示已经激活的Job,即被DAGScheduler接收处理的Job,主要由以下属性:

  1. jobId是Job的身份标识;
  2. finalStage是Job的最下游ResultStage;
  3. callSite是应用程序调用栈;
  4. listener是监听当前Job的JobListener,就是上面的JobWaiter
  5. properties是包含了当前Job的调度、Job group、描述等属性的Properties;
  6. numPartitions是当前Job的分区数量,如果finalStage为ResultStage,那么此属性等于ResultStage的partitions属性的长度;如果finalStage为ShuffleMapStage,那么此属性等于ShuffleMapStage的rdd的partitions属性的长度,不过Job的最后一个Stage一般是ResultStage
  7. finished:Boolean类型的数组,每个数组索引代表一个分区的任务是否执行完成,这个主要给后面的ResultStage来判断哪些分区还没计算完成;
  8. numFinished当前Job的所有任务中已完成任务的数量。
private[spark] class ActiveJob(val jobId: Int, val finalStage: Stage,val callSite: CallSite,  val listener: JobListener, val properties: Properties) {val numPartitions = finalStage match {case r: ResultStage => r.partitions.lengthcase m: ShuffleMapStage => m.rdd.partitions.length}val finished = Array.fill[Boolean](numPartitions)(false)var numFinished = 0
}

上面在讲创建JobWaiter时候,会发送JobSubmitted消息给DAGScheduler的消息处理器,接收到该消息,DagScheduler会调用handleJobSubmitted方法来进行创建ActiveJob,提交Stage

// 处理Job的提交
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _, partitions: Array[Int],  callSite: CallSite,listener: JobListener, properties: Properties) {var finalStage: ResultStage = nulltry {// 创建ResultStagefinalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)} catch {..}// 创建ActiveJobval job = new ActiveJob(jobId, finalStage, callSite, listener, properties)// 清空缓存的各个RDD的所有分区的位置信息clearCacheLocs()// 生成Job提交时间val jobSubmissionTime = clock.getTimeMillis()// 记录jobId与ActiveJob的映射jobIdToActiveJob(jobId) = jobactiveJobs += job// 将finalStage的ActiveJob设置为当前提交的JobfinalStage.setActiveJob(job)// 获取Job所有Stage的StageInfo对象val stageIds = jobIdToStageIds(jobId).toArrayval stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))// 向事件总线投递SparkListenerJobStart事件listenerBus.post(parkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))// 提交ResultStagesubmitStage(finalStage)
}

Stage管理

找到Job的分界线后,Spark会根据同一个Job里的一系列RDD转换和依赖关系构建有向无环图[DAG],然后根据RDD依赖的不同将RDD划分到不同的阶段[Stage],每个阶段按照分区[Partition]的数量创建多个任务[Task],最后将这些任务提交到集群的各个运行节点上运行。DAG使得没有依赖关系的Stage可以并行执行,并保证有依赖关系的Stage顺序执行,并行执行能够有效利用集群资源,提升运行效率,而串行执行则适用于那些在时间和数据资源上存在强制依赖的场景,我们先来看下DAGScheduler中对于Stage管理的相关变量:

  1. nextStageId用于生成下一个Stage的身份标识;
  2. stageIdToStage缓存StageIdStage之间的映射关系;
  3. shuffleIdToMapStage缓存Shuffle的身份标识shuffleIdShuffleMapStage之间的映射关系;
  4. waitingStages是处于等待状态的Stage集合;
  5. runningStages是处于运行状态的Stage集合;
  6. failedStages是处于失败状态的Stage集合。
private val nextStageId = new AtomicInteger(0)
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
private[scheduler] val waitingStages = new HashSet[Stage]
private[scheduler] val runningStages = new HashSet[Stage]
private[scheduler] val failedStages = new HashSet[Stage]

StageInfo

StageInfo是用来存储一个Stage的相关信息,提交时间,执行完成时间,失败原因,聚合器值等,可以通过DAGScheduler发送给SparkListeners

class StageInfo(val stageId: Int, val attemptId: Int, val name: String, val numTasks: Int,val rddInfos: Seq[RDDInfo], val parentIds: Seq[Int], val details: String,val taskMetrics: TaskMetrics = null, private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {// DAGScheduler将当前Stage提交给TaskScheduler的时间var submissionTime: Option[Long] = None// 当前Stage中的所有Task完成的时间(即Stage完成的时间)或者Stage被取消的时间var completionTime: Option[Long] = None// 如果Stage失败了,用于记录失败的原因var failureReason: Option[String] = None// 存储了所有聚合器计算的最终值val accumulables = HashMap[Long, AccumulableInfo]()// 当Stage失败时会调用的方法def stageFailed(reason: String) {// 保存Stage失败的原因和Stage完成的时间failureReason = Some(reason)completionTime = Some(System.currentTimeMillis)}
}

Stage

Stage是抽象类,定义了一个Stage基本的公共属性:

  1. id代表当前Stage的身份标识;

  2. rdd是当前Stage包含的RDD,归属于本Stage的最后一个RDD;

  3. numTasks是当前Stage的Task数量;

  4. parents是当前Stage的父Stage列表,一个Stage可以有一到多个父亲Stage;

  5. firstJobId是第一个提交当前Stage的Job的身份标识,当使用FIFO调度时,通过firstJobId首先计算来自较早Job的Stage,或者在发生故障时更快的恢复;

  6. callSite是应用程序中与当前Stage相关联的调用栈信息;

  7. numPartitions是当前Stage的分区数量;

  8. jobIds是当前Stage所属的Job的身份标识集合,一个Stage可以属于一到多个Job;

  9. pendingPartitions是存储待处理分区的索引的集合;

  10. nextAttemptId是用于生成Stage下一次尝试的身份标识,失败后会重新尝试执行;

  11. _latestInfo是Stage最近一次尝试的信息,即StageInfo;

  12. fetchFailedAttemptIds发生过FetchFailure的Stage尝试的身份标识的集合,此属性用于避免在发生FetchFailure后无止境的重试。

private[scheduler] abstract class Stage(val id: Int, val rdd: RDD[_],val numTasks: Int, val parents: List[Stage], val firstJobId: Int, val callSite: CallSite)
extends Logging {// 当前Stage最后一个RDD的分区数量val numPartitions = rdd.partitions.length// 当前Stage所属的Job的身份标识集合。一个Stage可以属于一到多个Job。val jobIds = new HashSet[Int]// 存储待处理分区的索引的集合。val pendingPartitions = new HashSet[Int]// 用于生成Stage下一次尝试的身份标识。private var nextAttemptId: Int = 0// 记录当前调用栈信息val name: String = callSite.shortFormval details: String = callSite.longForm// Stage最近一次尝试的信息。private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)// 发生过FetchFailure的Stage尝试的身份标识的集合。此属性用于避免在发生FetchFailure后无止境的重试。private val fetchFailedAttemptIds = new HashSet[Int]
}

启动或者失败重试

用于创建新的Stage尝试,比较简单,步骤如下:

  1. 调用StageInfofromStage方法创建新的StageInfo
  2. 增加nextAttemptId
def makeNewStageAttempt(numPartitionsToCompute: Int,taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {val metrics = new TaskMetricsmetrics.register(rdd.sparkContext)// 创建新的StageInfo_latestInfo = StageInfo.fromStage(this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)// 自增nextAttemptIdnextAttemptId += 1
}

另外定义了findMissingPartitions供子类实现。

ShuffleMapStage

Spark中最耗时的操作是Shuffle,具有宽依赖前后两个Stage,前面的Stage就是ShuffleMapStage,它包括一到多个ShuffleMapTask,这些ShuffleMapTask将生成用于Shuffle的数据。ShuffleMapStage一般是ResultStage或者其他ShuffleMapStage的前置Stage。ShuffleMapStage除继承自父类Stage的属性外,还有一些自己的属性:

  1. shuffleDep是与ShuffleMapStage相对应的ShuffleDependency
  2. _mapStageJobs是与ShuffleMapStage相关联的ActiveJob的列表;
  3. _numAvailableOutputsShuffleMapStage可用的map任务的输出数量,这也代表了执行成功的map任务数;
  4. outputLocsShuffleMapStage的各个map任务与其对应的MapStatus列表的映射关系,由于map任务可能会运行多次,因而可能会有多个MapStatus。
private[spark] class ShuffleMapStage(id: Int, rdd: RDD[_], numTasks: Int, parents: List[Stage],firstJobId: Int, callSite: CallSite,  val shuffleDep: ShuffleDependency[_, _, _])extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {// 与ShuffleMapStage相关联的ActiveJob的列表private[this] var _mapStageJobs: List[ActiveJob] = Nil// ShuffleMapStage可用的Map任务的输出数量,这也代表了执行成功的Map任务数。private[this] var _numAvailableOutputs: Int = 0// ShuffleMapStage的各个分区与其对应的MapStatus列表的映射关系。// 由于map任务可能会运行多次,因而可能会有多个MapStatus。private[this] val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
}

OutLoc添加&删除

上面说到outputLocsShuffleMapStage的各个map任务与其对应的MapStatus列表的映射关系,由于map任务可能会运行多次,因而可能会有多个MapStatus。

  1. addOutputLoc是添加,当某一分区的任务执行完成后,首先将分区与MapStatus的对应关系添加到outputLocs中,然后将可用的输出数加一,等到可用输出数目等于分区数目时候就表示完成了这个Stage;

  2. FetchFailed失败时候说明无法获取MapStatus记录位置的数据,已经失效了,所以要删除

// 当某一分区的任务执行完成后,首先将分区与MapStatus的对应关系添加到outputLocs中,然后将可用的输出数加一。
def addOutputLoc(partition: Int, status: MapStatus): Unit = {// 得到分区对应的MapStatus列表val prevList = outputLocs(partition)// 将status添加到该列表中outputLocs(partition) = status :: prevListif (prevList == Nil) { // 如果之前该分区没有任何MapStatus// 将可用的输出数加一_numAvailableOutputs += 1}
}// 对于记录的某个blockManager上的MapStatus失效了,删除
def removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit = {val prevList = outputLocs(partition)val newList = prevList.filterNot(_.location == bmAddress)outputLocs(partition) = newListif (prevList != Nil && newList == Nil) {_numAvailableOutputs -= 1}
}

findMissingPartitions实现

由于继承了Stage,实现了``findMissingPartitions方法,用于找出当前Job的所有分区中还没有完成的分区的索引。ShuffleMapStage判断一个分区是否完成,是通过outputLocs`中是否有位置记录。

override def findMissingPartitions(): Seq[Int] = {// 从outputLocs中查找val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty)missing
}

ResultStage

ResultStage可以使用指定的函数对RDD中的分区进行计算并得出最终结果,ResultStage是最后执行的Stage,此阶段主要进行作业的收尾工作。

私有变量

private[spark] class ResultStage(id: Int,rdd: RDD[_],val func: (TaskContext, Iterator[_]) => _,val partitions: Array[Int],parents: List[Stage],firstJobId: Int,callSite: CallSite)extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {}

从源码看出,相对于StageResultStage只增加了func属性,这个是用来在构建ResultTask时候传递给Task具体的计算逻辑:

val taskBinaryBytes: Array[Byte] = stage match {// 对Stage的rdd和ShuffleDependency进行序列化case stage: ShuffleMapStage =>JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))// 对Stage的rdd和对RDD的分区进行计算的函数func进行序列化case stage: ResultStage =>JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}

另外,还记录了ResultStage处理的ActiveJob,包括设置删除。

private[this] var _activeJob: Option[ActiveJob] = Nonedef activeJob: Option[ActiveJob] = _activeJobdef setActiveJob(job: ActiveJob): Unit = {_activeJob = Option(job)
}def removeActiveJob(): Unit = {_activeJob = None
}

findMissingPartitions实现

由于继承了Stage,实现了``findMissingPartitions方法,用于找出当前Job的所有分区中还没有完成的分区的索引。ResultStage判断一个分区是否完成,是通过ActiveJob的Boolean类型数组finished`,因为finished记录了每个分区是否完成。

override def findMissingPartitions(): Seq[Int] = {val job = activeJob.get// 通过ActiveJob的Boolean类型数组finished来判断,finished记录了每个分区是否完成(0 until job.numPartitions).filter(id => !job.finished(id))
}

Task管理

DAGSchduler是任务提交的入口,是任务完成后的清理工作,主要会处理任务提交,任务的启动,任务的结束,任务结果处理

  1. 任务提交的入口是submitMissingTasks,这个我们在Task提交部分分析过,主要是对Stage进行任务集构造,然后提交给TaskScheduler,进行调度分配资源,启动任务执行;
  2. TaskManagerSet中当一个任务获取到资源提交任务后,会发送BeginEvent事件,DAGSchduler会通知listenerBus记录任务启动;
  3. 任务状态发生变化后,会发送CompletionEvent消息,然后DAGSchduler会使用handleTaskCompletion会进行处理任务状态变更;
  4. 任务集失败,当一个Stage中任务集尝试失败次数超过一定程度,就会发送TaskSetFailed消息,主要是从``记录中删除,并终止这个Stage,后面再重新尝试。

参考

  1. https://blog.csdn.net/u012684933/article/details/48714915
  2. https://blog.csdn.net/u012684933/article/category/5814489/1
  3. http://reader.epubee.com/books/mobile/86/86ace180a75c902ade14ef11fccba342/text00145.html

Spark DAGScheduler源码分析系列之一: 基础相关推荐

  1. Spark Shuffle源码分析系列之PartitionedPairBufferPartitionedAppendOnlyMap

    概述 SortShuffleWriter使用ExternalSorter进行ShuffleMapTask数据内存以及落盘操作,ExternalSorter中使用内存进行数据的缓存过程中根据是否需要ma ...

  2. Spark Shuffle源码分析系列之UnsafeShuffleWriter

    前面我们介绍了BypassMergeSortShuffleWriter和SortShuffleWriter,知道了它们的应用场景和实现方式,本节我们来看下UnsafeShuffleWriter,它使用 ...

  3. MyBatis 源码分析系列文章导读

    1.本文速览 本篇文章是我为接下来的 MyBatis 源码分析系列文章写的一个导读文章.本篇文章从 MyBatis 是什么(what),为什么要使用(why),以及如何使用(how)等三个角度进行了说 ...

  4. Spring IOC 容器源码分析系列文章导读

    1. 简介 前一段时间,我学习了 Spring IOC 容器方面的源码,并写了数篇文章对此进行讲解.在写完 Spring IOC 容器源码分析系列文章中的最后一篇后,没敢懈怠,趁热打铁,花了3天时间阅 ...

  5. Spring IOC 容器源码分析系列文章导读 1

    1. 简介 Spring 是一个轻量级的企业级应用开发框架,于 2004 年由 Rod Johnson 发布了 1.0 版本.经过十几年的迭代,现在的 Spring 框架已经非常成熟了.Spring ...

  6. idea 线程内存_Java线程池系列之-Java线程池底层源码分析系列(一)

    课程简介: 课程目标:通过本课程学习,深入理解Java线程池,提升自身技术能力与价值. 适用人群:具有Java多线程基础的人群,希望深入理解线程池底层原理的人群. 课程概述:多线程的异步执行方式,虽然 ...

  7. idea 线程内存_Java线程池系列之-Java线程池底层源码分析系列(二)

    课程简介: 课程目标:通过本课程学习,深入理解Java线程池,提升自身技术能力与价值. 适用人群:具有Java多线程基础的人群,希望深入理解线程池底层原理的人群. 课程概述:多线程的异步执行方式,虽然 ...

  8. 菜鸟读jQuery 2.0.3 源码分析系列(1)

    原文链接在这里,作为一个菜鸟,我就一边读一边写 jQuery 2.0.3 源码分析系列 前面看着差不多了,看到下面一条(我是真菜鸟),推荐木有入门或者刚刚JS入门摸不着边的看看,大大们手下留情,想一起 ...

  9. k8s源码分析 pdf_Spark Kubernetes 的源码分析系列 - features

    1 Overview features 包里的代码,主要是用于构建 Spark 在 K8S 中的各类资源所需要的特征,个人觉得可以理解成这些 features 就是帮你写各类 Kind 的 YAML ...

最新文章

  1. 第八次课作业(采购管理、信息与配置管理)
  2. 橙白oj18训练作业2-题解、代码
  3. java单元测试面试,Java必备!JUnit面试题和答案汇总
  4. 随想录(什么是软件架构师)
  5. 用户计算机安全管理,关于加强用户计算机安全管理工作的通知
  6. 双11背后的黑科技:大数据实时计算如何为你量身定制?
  7. 动态规划之编辑距离问题
  8. SolidWorks2020无法获得下列许可SOLIDWORKS Standard.Server节点已经关闭或是没有响应。(-96,7,11003)
  9. oppor829t如何刷机_OPPO R829T卡刷刷机图文教程
  10. Android 6.0 sensor 框架详解 (application层)
  11. 视频接口的种类及数据类型
  12. ibd 导入mysql_拷贝ibd实现MySQL的数据导入
  13. MybatisPlusException: This is impossible to happen
  14. “点亮心灯祝福世界·清明” 活动暨“生与死的艺术沙龙清明特场”启动
  15. HTML5:移动互联网的第二个苹
  16. 窗函数(window function)
  17. 国内如何打开 Coursera?(Mac系统)
  18. 没有要使用本计算机 用户必须输入密码,要使用本计算机,用户必须输入用户名和密码选项不见了怎么办...
  19. PADS9.5软件安装教程|兼容WIN10
  20. 【3d地图】vue3.0中使用echarts geo3D

热门文章

  1. 教学记事:用提问的方式解疑
  2. 在python中、整数的十进制不能以0开头_Python关于int整数数据类型在使用介绍
  3. 什么软件能测试触控采样率,手机也能实现240Hz触控采样率?华为Mate40告诉你什么是好手机...
  4. 智能工厂以MES系统为基础,实现"信息化减人,自动化换人"
  5. 李力游辞去紫光集团联席总裁一职
  6. 2021-2027全球及中国不同物种脑立体定位仪适配器行业研究及十四五规划分析报告
  7. tableau 字段去重_Tableau 我常用函数整理
  8. AEM K50堪称最经济的八类线测试仪
  9. 新零售到底新在哪里了
  10. 实验室网页以及后台管理系统(1)-表格设计和建表语句