中文版Spark运行核心概念解析

名词

解释

Application

基于Spark的用户程序,包含了driver程序和集群上的executor

Driver Program

运⾏行main函数并且新建SparkContext的程序

Cluster Manager

在集群上获取资源的外部服务(例如:standalone,Mesos,Yarn )

Worker Node

集群中任何可以运行应用代码的节点

Executor

是在一个worker node上为某应用启动的一个进程,该进程负责运行任务,并且负责

将数据存在内存或者磁盘上。每个应用都有各自独立的executors

Task

被送到某个executor上的工作单元

Job

包含很多任务的并⾏行计算,可以看做和Spark的action对应

Stage

一个Job会被拆分很多组任务,每组任务被称为Stage(就像Mapreduce分map任务和

reduce任务一样)

英文原版释义

http://spark.apache.org/docs/latest/cluster-overview.html

The following table summarizes terms you’ll see used to refer to cluster concepts:

Term Meaning
Application User program built on Spark. Consists of a driver program and executors on the cluster.
Application jar A jar containing the user's Spark application. In some cases users will want to create an "uber jar" containing their application along with its dependencies. The user's jar should never include Hadoop or Spark libraries, however, these will be added at runtime.
Driver program The process running the main() function of the application and creating the SparkContext
Cluster manager An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN)
Deploy mode Distinguishes where the driver process runs. In "cluster" mode, the framework launches the driver inside of the cluster. In "client" mode, the submitter launches the driver outside of the cluster.
Worker node Any node that can run application code in the cluster
Executor A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors.
Task A unit of work that will be sent to one executor
Job A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. savecollect); you'll see this term used in the driver's logs.
Stage Each job gets divided into smaller sets of tasks called stages that depend on each other (similar to the map and reduce stages in MapReduce); you'll see this term used in the driver's logs.

下图是Spark官网提供的运行时结构图

Spark核心组件

任务调度

TaskScheduler.scala

阅读下注释,可以理解到SparkContext,DAGScheduler与TaskScheduler之间的关系

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.spark.schedulerimport org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId/*** Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl.* This interface allows plugging in different task schedulers. Each TaskScheduler schedulers tasks* for a single SparkContext. These schedulers get sets of tasks submitted to them from the* DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running* them, retrying if there are failures, and mitigating stragglers. They return events to the* DAGScheduler.*/
private[spark] trait TaskScheduler {def rootPool: Pooldef schedulingMode: SchedulingModedef start(): Unit// Invoked after system has successfully initialized (typically in spark context).// Yarn uses this to bootstrap allocation of resources based on preferred locations,// wait for slave registerations, etc.def postStartHook() { }// Disconnect from the cluster.def stop(): Unit// Submit a sequence of tasks to run.def submitTasks(taskSet: TaskSet): Unit// Cancel a stage.def cancelTasks(stageId: Int, interruptThread: Boolean)// Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.def setDAGScheduler(dagScheduler: DAGScheduler): Unit// Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.def defaultParallelism(): Int/*** Update metrics for in-progress tasks and let the master know that the BlockManager is still* alive. Return true if the driver knows about the given block manager. Otherwise, return false,* indicating that the block manager should re-register.*/def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],blockManagerId: BlockManagerId): Boolean/*** The application ID associated with the job, if any.** @return The application ID, or None if the backend does not provide an ID.*/def applicationId(): Option[String] = None}

DAGScheduler.scala

/*** The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of* stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a* minimal schedule to run the job. It then submits stages as TaskSets to an underlying* TaskScheduler implementation that runs them on the cluster.** In addition to coming up with a DAG of stages, this class also determines the preferred* locations to run each task on, based on the current cache status, and passes these to the* low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being* lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are* not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task* a small number of times before cancelling the whole stage.**/
private[spark]
class DAGScheduler

TaskSet, 很明显taskset与stage对应

package org.apache.spark.schedulerimport java.util.Properties/*** A set of tasks submitted together to the low-level TaskScheduler, usually representing* missing partitions of a particular stage.*/
private[spark] class TaskSet(val tasks: Array[Task[_]],val stageId: Int,val attempt: Int,val priority: Int,val properties: Properties) {val id: String = stageId + "." + attemptoverride def toString: String = "TaskSet " + id
}

SparkContext -> DAGScheduler -> TaskScheduler -> TaskSchedularImpl -> SparkDeploySchedulerBackend ->CoarseGrainedExecutorBackend 脉络关系

SparkContext 创建DAGScheduler和TaskScheduler

  // Create and start the schedulerprivate[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)private val heartbeatReceiver = env.actorSystem.actorOf(Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")@volatile private[spark] var dagScheduler: DAGScheduler = _try {dagScheduler = new DAGScheduler(this)} catch {case e: Exception => thrownew SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))}// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's// constructortaskScheduler.start()

创建TaskScheduler的时候调用了下面的方法,我们以Standalone模式为例,Spark_REGEX, 这里会生成一个TaskScheduler的实现类,并且生成一个backend

 /** Creates a task scheduler based on a given master URL. Extracted for testing. */private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {// Regular expression used for local[N] and local[*] master formatsval LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r// Regular expression for local[N, maxRetries], used in tests with failing tasksval LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r// Regular expression for simulating a Spark cluster of [N, cores, memory] locallyval LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r// Regular expression for connecting to Spark deploy clustersval SPARK_REGEX = """spark://(.*)""".r// Regular expression for connection to Mesos cluster by mesos:// or zk:// urlval MESOS_REGEX = """(mesos|zk)://.*""".r// Regular expression for connection to Simr clusterval SIMR_REGEX = """simr://(.*)""".r// When running locally, don't try to re-execute tasks on failure.val MAX_LOCAL_TASK_FAILURES = 1master match {case "local" =>val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)val backend = new LocalBackend(scheduler, 1)scheduler.initialize(backend)schedulercase LOCAL_N_REGEX(threads) =>def localCpuCount = Runtime.getRuntime.availableProcessors()// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.val threadCount = if (threads == "*") localCpuCount else threads.toIntval scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)val backend = new LocalBackend(scheduler, threadCount)scheduler.initialize(backend)schedulercase LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>def localCpuCount = Runtime.getRuntime.availableProcessors()// local[*, M] means the number of cores on the computer with M failures// local[N, M] means exactly N threads with M failuresval threadCount = if (threads == "*") localCpuCount else threads.toIntval scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)val backend = new LocalBackend(scheduler, threadCount)scheduler.initialize(backend)schedulercase SPARK_REGEX(sparkUrl) =>val scheduler = new TaskSchedulerImpl(sc)val masterUrls = sparkUrl.split(",").map("spark://" + _)val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)scheduler.initialize(backend)scheduler

SparkDeploySchedulerBackend,val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts), 注意这里的CoarseGrainedExecutorBackend。

package org.apache.spark.scheduler.clusterimport org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
import org.apache.spark.util.Utilsprivate[spark] class SparkDeploySchedulerBackend(scheduler: TaskSchedulerImpl,sc: SparkContext,masters: Array[String])extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)with AppClientListenerwith Logging {var client: AppClient = nullvar stopping = falsevar shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _var appId: String = _val registrationLock = new Object()var registrationDone = falseval maxCores = conf.getOption("spark.cores.max").map(_.toInt)val totalExpectedCores = maxCores.getOrElse(0)override def start() {super.start()// The endpoint for executors to talk to usval driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(SparkEnv.driverActorSystemName,conf.get("spark.driver.host"),conf.get("spark.driver.port"),CoarseGrainedSchedulerBackend.ACTOR_NAME)val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").map(Utils.splitCommandString).getOrElse(Seq.empty)val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>cp.split(java.io.File.pathSeparator)}val libraryPathEntries =sc.conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { cp =>cp.split(java.io.File.pathSeparator)}// Start executors with a few necessary configs for registering with the schedulerval sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)val javaOpts = sparkJavaOpts ++ extraJavaOptsval command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)client.start()waitForRegistration()}

很明显CoarseGrainedExecutorBackend继承了Actor,根据他的方法可以看出,当接收到msg后,这个backend首先反序列化数据,然后调用executor执行task。根据类的结构也可以看出一个backend用来host一个executor,每个backend独立和driver进行通信。

package org.apache.spark.executorimport java.nio.ByteBufferimport scala.concurrent.Awaitimport akka.actor.{Actor, ActorSelection, Props}
import akka.pattern.Patterns
import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}private[spark] class CoarseGrainedExecutorBackend(driverUrl: String,executorId: String,hostPort: String,cores: Int,sparkProperties: Seq[(String, String)])extends Actor with ActorLogReceive with ExecutorBackend with Logging {Utils.checkHostPort(hostPort, "Expected hostport")var executor: Executor = nullvar driver: ActorSelection = nulloverride def preStart() {logInfo("Connecting to driver: " + driverUrl)driver = context.actorSelection(driverUrl)driver ! RegisterExecutor(executorId, hostPort, cores)context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])}override def receiveWithLogging = {case RegisteredExecutor =>logInfo("Successfully registered with driver")// Make this host instead of hostPort ?executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,false)case RegisterExecutorFailed(message) =>logError("Slave registration failed: " + message)System.exit(1)case LaunchTask(data) =>if (executor == null) {logError("Received LaunchTask command but executor was null")System.exit(1)} else {val ser = SparkEnv.get.closureSerializer.newInstance()val taskDesc = ser.deserialize[TaskDescription](data.value)logInfo("Got assigned task " + taskDesc.taskId)executor.launchTask(this, taskDesc.taskId, taskDesc.name, taskDesc.serializedTask)}case KillTask(taskId, _, interruptThread) =>if (executor == null) {logError("Received KillTask command but executor was null")System.exit(1)} else {executor.killTask(taskId, interruptThread)}case x: DisassociatedEvent =>logError(s"Driver $x disassociated! Shutting down.")System.exit(1)case StopExecutor =>logInfo("Driver commanded a shutdown")executor.stop()context.stop(self)context.system.shutdown()}override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {driver ! StatusUpdate(executorId, taskId, state, data)}
}private[spark] object CoarseGrainedExecutorBackend extends Logging {private def run(driverUrl: String,executorId: String,hostname: String,cores: Int,workerUrl: Option[String]) {SignalLogger.register(log)SparkHadoopUtil.get.runAsSparkUser { () =>// Debug codeUtils.checkHost(hostname)// Bootstrap to fetch the driver's Spark properties.val executorConf = new SparkConfval port = executorConf.getInt("spark.executor.port", 0)val (fetcher, _) = AkkaUtils.createActorSystem("driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf))val driver = fetcher.actorSelection(driverUrl)val timeout = AkkaUtils.askTimeout(executorConf)val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]]fetcher.shutdown()// Create a new ActorSystem using driver's Spark properties to run the backend.val driverConf = new SparkConf().setAll(props)val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, port, driverConf, new SecurityManager(driverConf))// set itval sparkHostPort = hostname + ":" + boundPortactorSystem.actorOf(Props(classOf[CoarseGrainedExecutorBackend],driverUrl, executorId, sparkHostPort, cores, props),name = "Executor")workerUrl.foreach { url =>actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")}actorSystem.awaitTermination()}}def main(args: Array[String]) {args.length match {case x if x < 4 =>System.err.println(// Worker url is used in spark standalone mode to enforce fate-sharing with worker"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +"<cores> [<workerUrl>]")System.exit(1)case 4 =>run(args(0), args(1), args(2), args(3).toInt, None)case x if x > 4 =>run(args(0), args(1), args(2), args(3).toInt, Some(args(4)))}}
}

OK,我们继续看SparkContext中后续的宁日

      case SPARK_REGEX(sparkUrl) =>val scheduler = new TaskSchedulerImpl(sc)val masterUrls = sparkUrl.split(",").map("spark://" + _)val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)scheduler.initialize(backend)scheduler

在生成SchedulerBackend之后,开始初始化scheduler,来看看这个初始化方法在做什么。初始化方法中拿到backend之后,初始化调度模式FIFO或者FAIR模式,默认FIFO

package org.apache.spark.schedulerimport java.nio.ByteBuffer
import java.util.{TimerTask, Timer}
import java.util.concurrent.atomic.AtomicLongimport scala.concurrent.duration._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.language.postfixOps
import scala.util.Randomimport org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import akka.actor.Props/*** Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.* It can also work with a local setup by using a LocalBackend and setting isLocal to true.* It handles common logic, like determining a scheduling order across jobs, waking up to launch* speculative tasks, etc.** Clients should first call initialize() and start(), then submit task sets through the* runTasks method.** THREADING: SchedulerBackends and task-submitting clients can call this class from multiple* threads, so it needs locks in public API methods to maintain its state. In addition, some* SchedulerBackends synchronize on themselves when they want to send events here, and then* acquire a lock on us, so we need to make sure that we don't try to lock the backend while* we are holding a lock on ourselves.*/
private[spark] class TaskSchedulerImpl(val sc: SparkContext,val maxTaskFailures: Int,isLocal: Boolean = false)extends TaskScheduler with Logging
{def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))val conf = sc.conf// How often to check for speculative tasksval SPECULATION_INTERVAL = conf.getLong("spark.speculation.interval", 100)// Threshold above which we warn user initial TaskSet may be starvedval STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000)// CPUs to request per taskval CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)// TaskSetManagers are not thread safe, so any access to one should be synchronized// on this class.val activeTaskSets = new HashMap[String, TaskSetManager]val taskIdToTaskSetId = new HashMap[Long, String]val taskIdToExecutorId = new HashMap[Long, String]@volatile private var hasReceivedTask = false@volatile private var hasLaunchedTask = falseprivate val starvationTimer = new Timer(true)// Incrementing task IDsval nextTaskId = new AtomicLong(0)// Which executor IDs we have executors onval activeExecutorIds = new HashSet[String]// The set of executors we have on each host; this is used to compute hostsAlive, which// in turn is used to decide when we can attain data locality on a given hostprotected val executorsByHost = new HashMap[String, HashSet[String]]protected val hostsByRack = new HashMap[String, HashSet[String]]protected val executorIdToHost = new HashMap[String, String]// Listener object to pass upcalls intovar dagScheduler: DAGScheduler = nullvar backend: SchedulerBackend = nullval mapOutputTracker = SparkEnv.get.mapOutputTrackervar schedulableBuilder: SchedulableBuilder = nullvar rootPool: Pool = null// default scheduler is FIFOprivate val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")val schedulingMode: SchedulingMode = try {SchedulingMode.withName(schedulingModeConf.toUpperCase)} catch {case e: java.util.NoSuchElementException =>throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf")}// This is a var so that we can reset it for testing purposes.private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)override def setDAGScheduler(dagScheduler: DAGScheduler) {this.dagScheduler = dagScheduler}def initialize(backend: SchedulerBackend) {this.backend = backend// temporarily set rootPool name to emptyrootPool = new Pool("", schedulingMode, 0, 0)schedulableBuilder = {schedulingMode match {case SchedulingMode.FIFO =>new FIFOSchedulableBuilder(rootPool)case SchedulingMode.FAIR =>new FairSchedulableBuilder(rootPool, conf)}}schedulableBuilder.buildPools()}

下面我们看一下taskscheduler启动过程

还是在SparkContext中,嗲用了taskscheduler的start方法

  // Create and start the schedulerprivate[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)private val heartbeatReceiver = env.actorSystem.actorOf(Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")@volatile private[spark] var dagScheduler: DAGScheduler = _try {dagScheduler = new DAGScheduler(this)} catch {case e: Exception => thrownew SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))}// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's// constructortaskScheduler.start()

在TaskschedulerImpl中start方法主要用来启动backend

  override def start() {backend.start()if (!isLocal && conf.getBoolean("spark.speculation", false)) {logInfo("Starting speculative execution thread")import sc.env.actorSystem.dispatchersc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,SPECULATION_INTERVAL milliseconds) {Utils.tryOrExit { checkSpeculatableTasks() }}}}

SparkDeploySchedulerBackend启动函数会调用父类的start方法,最终会调用到CoarseGrainedSchedulerBackend的start方法。

  override def start() {super.start()// The endpoint for executors to talk to usval driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(SparkEnv.driverActorSystemName,conf.get("spark.driver.host"),conf.get("spark.driver.port"),CoarseGrainedSchedulerBackend.ACTOR_NAME)val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").map(Utils.splitCommandString).getOrElse(Seq.empty)val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>cp.split(java.io.File.pathSeparator)}val libraryPathEntries =sc.conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { cp =>cp.split(java.io.File.pathSeparator)}// Start executors with a few necessary configs for registering with the schedulerval sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)val javaOpts = sparkJavaOpts ++ extraJavaOptsval command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)client.start()waitForRegistration()}

那么来看下上层父类中到底有什么CoarseGrainedSchedulerBackend。在这个类中有一个属性var driverActor: ActorRef,这个driverActor就是负责和executor进行通信的driver的代理人了。

var driverActor: ActorRef = null
import java.util.concurrent.atomic.AtomicIntegerimport scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
import scala.concurrent.duration._import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
import org.apache.spark.ui.JettyUtils/*** A scheduler backend that waits for coarse grained executors to connect to it through Akka.* This backend holds onto each executor for the duration of the Spark job rather than relinquishing* executors whenever a task is done and asking the scheduler to launch a new executor for* each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the* coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode* (spark.deploy.*).*/
private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem)extends SchedulerBackend with Logging

DriverActor的定义在CoarseGrainedSchedulerBackend内部,当backend调用start方法时,如下,便初始化了driverActor

override def start() {val properties = new ArrayBuffer[(String, String)]for ((key, value) <- scheduler.sc.conf.getAll) {if (key.startsWith("spark.")) {properties += ((key, value))}}// TODO (prashant) send conf instead of propertiesdriverActor = actorSystem.actorOf(Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)}
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {override protected def log = CoarseGrainedSchedulerBackend.this.logprivate val executorActor = new HashMap[String, ActorRef]private val executorAddress = new HashMap[String, Address]private val executorHost = new HashMap[String, String]private val freeCores = new HashMap[String, Int]private val totalCores = new HashMap[String, Int]private val addressToExecutorId = new HashMap[Address, String]override def preStart() {// Listen for remote client disconnection events, since they don't go through Akka's watch()context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])// Periodically revive offers to allow delay scheduling to workval reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000)import context.dispatchercontext.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)}def receiveWithLogging = {case RegisterExecutor(executorId, hostPort, cores) =>Utils.checkHostPort(hostPort, "Host port expected " + hostPort)if (executorActor.contains(executorId)) {sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)} else {logInfo("Registered executor: " + sender + " with ID " + executorId)sender ! RegisteredExecutorexecutorActor(executorId) = senderexecutorHost(executorId) = Utils.parseHostPort(hostPort)._1totalCores(executorId) = coresfreeCores(executorId) = coresexecutorAddress(executorId) = sender.path.addressaddressToExecutorId(sender.path.address) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)makeOffers()}case StatusUpdate(executorId, taskId, state, data) =>scheduler.statusUpdate(taskId, state, data.value)if (TaskState.isFinished(state)) {if (executorActor.contains(executorId)) {freeCores(executorId) += scheduler.CPUS_PER_TASKmakeOffers(executorId)} else {// Ignoring the update since we don't know about the executor.val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"logWarning(msg.format(taskId, state, sender, executorId))}}case ReviveOffers =>makeOffers()case KillTask(taskId, executorId, interruptThread) =>executorActor(executorId) ! KillTask(taskId, executorId, interruptThread)case StopDriver =>sender ! truecontext.stop(self)case StopExecutors =>logInfo("Asking each executor to shut down")for (executor <- executorActor.values) {executor ! StopExecutor}sender ! truecase RemoveExecutor(executorId, reason) =>removeExecutor(executorId, reason)sender ! truecase AddWebUIFilter(filterName, filterParams, proxyBase) =>addWebUIFilter(filterName, filterParams, proxyBase)sender ! truecase DisassociatedEvent(_, address, _) =>addressToExecutorId.get(address).foreach(removeExecutor(_,"remote Akka client disassociated"))case RetrieveSparkProps =>sender ! sparkProperties}// Make fake resource offers on all executorsdef makeOffers() {launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))}// Make fake resource offers on just one executordef makeOffers(executorId: String) {launchTasks(scheduler.resourceOffers(Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))}// Launch tasks returned by a set of resource offersdef launchTasks(tasks: Seq[Seq[TaskDescription]]) {for (task <- tasks.flatten) {val ser = SparkEnv.get.closureSerializer.newInstance()val serializedTask = ser.serialize(task)if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +"spark.akka.frameSize or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,AkkaUtils.reservedSizeBytes)taskSet.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {freeCores(task.executorId) -= scheduler.CPUS_PER_TASKexecutorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))}}}// Remove a disconnected slave from the clusterdef removeExecutor(executorId: String, reason: String) {if (executorActor.contains(executorId)) {logInfo("Executor " + executorId + " disconnected, so removing it")val numCores = totalCores(executorId)executorActor -= executorIdexecutorHost -= executorIdaddressToExecutorId -= executorAddress(executorId)executorAddress -= executorIdtotalCores -= executorIdfreeCores -= executorIdtotalCoreCount.addAndGet(-numCores)scheduler.executorLost(executorId, SlaveLost(reason))}}}

下面一部分,记录一下action对应的执行过程。

以count方法为例

RDD.scala, count方法会调用sparkcontext的runJob方法

  /*** Return the number of elements in the RDD.*/def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

SparkContext.scala, 会紧跟着调用DAGScheduler的runJob方法

/*** Run a function on a given set of partitions in an RDD and pass the results to the given* handler function. This is the main entry point for all actions in Spark. The allowLocal* flag specifies whether the scheduler can run the computation on the driver rather than* shipping it out to the cluster, for short actions like first().*/def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],allowLocal: Boolean,resultHandler: (Int, U) => Unit) {if (dagScheduler == null) {throw new SparkException("SparkContext has been shutdown")}val callSite = getCallSiteval cleanedFunc = clean(func)logInfo("Starting job: " + callSite.shortForm)val start = System.nanoTimedagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,resultHandler, localProperties.get)logInfo("Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")rdd.doCheckpoint()}

DAGScheduler.scala

  def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,allowLocal: Boolean,resultHandler: (Int, U) => Unit,properties: Properties = null){val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)waiter.awaitResult() match {case JobSucceeded => {}case JobFailed(exception: Exception) =>logInfo("Failed to run " + callSite.shortForm)throw exception}}

然后调用内部的submitJob方法,最终会走到eventProcessActor ! JobSubmitted。这里的eventProcessActor是DAGScheduler的内部类DAGSchedulerEventProcessActor的实例对象。

  /*** Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object* can be used to block until the the job finishes executing or can be used to cancel the job.*/def submitJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,allowLocal: Boolean,resultHandler: (Int, U) => Unit,properties: Properties = null): JobWaiter[U] ={// Check to make sure we are not launching a task on a partition that does not exist.val maxPartitions = rdd.partitions.lengthpartitions.find(p => p >= maxPartitions || p < 0).foreach { p =>throw new IllegalArgumentException("Attempting to access a non-existent partition: " + p + ". " +"Total number of partitions: " + maxPartitions)}val jobId = nextJobId.getAndIncrement()if (partitions.size == 0) {return new JobWaiter[U](this, jobId, 0, resultHandler)}assert(partitions.size > 0)val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)waiter}

下面是这个actor的部分内容

private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)extends Actor with Logging {override def preStart() {// set DAGScheduler for taskScheduler to ensure eventProcessActor is always// valid when the messages arrivedagScheduler.taskScheduler.setDAGScheduler(dagScheduler)}/*** The main event loop of the DAG scheduler.*/def receive = {case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,listener, properties)

--------->生成最后一个final stage之后,使用submitStage方法提交

private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],allowLocal: Boolean,callSite: CallSite,listener: JobListener,properties: Properties = null){var finalStage: Stage = nulltry {// New stage creation may throw an exception if, for example, jobs are run on a// HadoopRDD whose underlying HDFS files have been deleted.finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)} catch {case e: Exception =>logWarning("Creating new stage failed due to exception - job: " + jobId, e)listener.jobFailed(e)return}if (finalStage != null) {val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)clearCacheLocs()logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(job.jobId, callSite.shortForm, partitions.length, allowLocal))logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")logInfo("Parents of final stage: " + finalStage.parents)logInfo("Missing parents: " + getMissingParentStages(finalStage))val shouldRunLocally =localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1if (shouldRunLocally) {// Compute very short actions like first() or take() with no parent stages locally.listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))runLocally(job)} else {jobIdToActiveJob(jobId) = jobactiveJobs += jobfinalStage.resultOfJob = Some(job)listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray,properties))submitStage(finalStage)}}submitWaitingStages()}

然后submitStage方法会计算是否有依赖的stage,进行递归计算,然后提交没有任何依赖的的stage, submitMissingTasks

  /** Submits stage, but first recursively submits any missing parents. */private def submitStage(stage: Stage) {val jobId = activeJobForStage(stage)if (jobId.isDefined) {logDebug("submitStage(" + stage + ")")if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {val missing = getMissingParentStages(stage).sortBy(_.id)logDebug("missing: " + missing)if (missing == Nil) {logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")submitMissingTasks(stage, jobId.get)} else {for (parent <- missing) {submitStage(parent)}waitingStages += stage}}} else {abortStage(stage, "No active job for stage " + stage.id)}}

submitMissingTasks最终会把任务提交给taskScheduler,通过方法taskScheduler.submitTasks

  /** Called when stage's parents are available and we can now do its task. */private def submitMissingTasks(stage: Stage, jobId: Int) {logDebug("submitMissingTasks(" + stage + ")")// Get our pending tasks and remember them in our pendingTasks entrystage.pendingTasks.clear()// First figure out the indexes of partition ids to compute.val partitionsToCompute: Seq[Int] = {if (stage.isShuffleMap) {(0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)} else {val job = stage.resultOfJob.get(0 until job.numPartitions).filter(id => !job.finished(id))}}val properties = if (jobIdToActiveJob.contains(jobId)) {jobIdToActiveJob(stage.jobId).properties} else {// this stage will be assigned to "default" poolnull}runningStages += stage// SparkListenerStageSubmitted should be posted before testing whether tasks are// serializable. If tasks are not serializable, a SparkListenerStageCompleted event// will be posted, which should always come after a corresponding SparkListenerStageSubmitted// event.stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast// the serialized copy of the RDD and for each task we will deserialize it, which means each// task gets a different copy of the RDD. This provides stronger isolation between tasks that// might modify state of objects referenced in their closures. This is necessary in Hadoop// where the JobConf/Configuration object is not thread-safe.var taskBinary: Broadcast[Array[Byte]] = nulltry {// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).// For ResultTask, serialize and broadcast (rdd, func).val taskBinaryBytes: Array[Byte] =if (stage.isShuffleMap) {closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()} else {closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()}taskBinary = sc.broadcast(taskBinaryBytes)} catch {// In the case of a failure during serialization, abort the stage.case e: NotSerializableException =>abortStage(stage, "Task not serializable: " + e.toString)runningStages -= stagereturncase NonFatal(e) =>abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")runningStages -= stagereturn}val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {partitionsToCompute.map { id =>val locs = getPreferredLocs(stage.rdd, id)val part = stage.rdd.partitions(id)new ShuffleMapTask(stage.id, taskBinary, part, locs)}} else {val job = stage.resultOfJob.getpartitionsToCompute.map { id =>val p: Int = job.partitions(id)val part = stage.rdd.partitions(p)val locs = getPreferredLocs(stage.rdd, p)new ResultTask(stage.id, taskBinary, part, locs, id)}}if (tasks.size > 0) {// Preemptively serialize a task to make sure it can be serialized. We are catching this// exception here because it would be fairly hard to catch the non-serializable exception// down the road, where we have several different implementations for local scheduler and// cluster schedulers.//// We've already serialized RDDs and closures in taskBinary, but here we check for all other// objects such as Partition.try {closureSerializer.serialize(tasks.head)} catch {case e: NotSerializableException =>abortStage(stage, "Task not serializable: " + e.toString)runningStages -= stagereturncase NonFatal(e) => // Other exceptions, such as IllegalArgumentException from Kryo.abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")runningStages -= stagereturn}logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")stage.pendingTasks ++= taskslogDebug("New pending tasks: " + stage.pendingTasks)taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))stage.latestInfo.submissionTime = Some(clock.getTime())} else {// Because we posted SparkListenerStageSubmitted earlier, we should post// SparkListenerStageCompleted here in case there are no tasks to run.listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))logDebug("Stage " + stage + " is actually done; %b %d %d".format(stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))runningStages -= stage}}

下面进入TaskSchedulerImp中, 最终调用backend的reviveOffers, backend.reviveOffers()

TaskSchedulerImpl.scala

  override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {val manager = new TaskSetManager(this, taskSet, maxTaskFailures)activeTaskSets(taskSet.id) = managerschedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient memory")} else {this.cancel()}}}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)}hasReceivedTask = true}backend.reviveOffers()}

稍稍看看SchedulerBackend的结构,

根据注释和代码层级结构,可以得知,这里的backend是一个可插拔的实现,在不同场景下会调用不同的backend,在standalone情况下回调用CoarseGrainedScheduler

package org.apache.spark.scheduler/*** A backend interface for scheduling systems that allows plugging in different ones under* TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as* machines become available and can launch tasks on them.*/
private[spark] trait SchedulerBackend {def start(): Unitdef stop(): Unitdef reviveOffers(): Unitdef defaultParallelism(): Intdef killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =throw new UnsupportedOperationExceptiondef isReady(): Boolean = true/*** The application ID associated with the job, if any.** @return The application ID, or None if the backend does not provide an ID.*/def applicationId(): Option[String] = None}

backend.reviveOffers()表示driver需要资源运行程序

package org.apache.spark.scheduler.clusterimport java.util.concurrent.atomic.AtomicIntegerimport scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
import scala.concurrent.duration._import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
import org.apache.spark.ui.JettyUtils/*** A scheduler backend that waits for coarse grained executors to connect to it through Akka.* This backend holds onto each executor for the duration of the Spark job rather than relinquishing* executors whenever a task is done and asking the scheduler to launch a new executor for* each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the* coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode* (spark.deploy.*).*/
private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem)extends SchedulerBackend with Logging
{// Use an atomic variable to track total number of cores in the cluster for simplicity and speedvar totalCoreCount = new AtomicInteger(0)var totalRegisteredExecutors = new AtomicInteger(0)val conf = scheduler.sc.confprivate val timeout = AkkaUtils.askTimeout(conf)private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)// Submit tasks only after (registered resources / total expected resources)// is equal to at least this value, that is double between 0 and 1.var minRegisteredRatio =math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))// Submit tasks after maxRegisteredWaitingTime milliseconds// if minRegisteredRatio has not yet been reachedval maxRegisteredWaitingTime =conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)val createTime = System.currentTimeMillis()class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {override protected def log = CoarseGrainedSchedulerBackend.this.logprivate val executorActor = new HashMap[String, ActorRef]private val executorAddress = new HashMap[String, Address]private val executorHost = new HashMap[String, String]private val freeCores = new HashMap[String, Int]private val totalCores = new HashMap[String, Int]private val addressToExecutorId = new HashMap[Address, String]override def preStart() {// Listen for remote client disconnection events, since they don't go through Akka's watch()context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])// Periodically revive offers to allow delay scheduling to workval reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000)import context.dispatchercontext.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)}def receiveWithLogging = {case RegisterExecutor(executorId, hostPort, cores) =>Utils.checkHostPort(hostPort, "Host port expected " + hostPort)if (executorActor.contains(executorId)) {sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)} else {logInfo("Registered executor: " + sender + " with ID " + executorId)sender ! RegisteredExecutorexecutorActor(executorId) = senderexecutorHost(executorId) = Utils.parseHostPort(hostPort)._1totalCores(executorId) = coresfreeCores(executorId) = coresexecutorAddress(executorId) = sender.path.addressaddressToExecutorId(sender.path.address) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)makeOffers()}case StatusUpdate(executorId, taskId, state, data) =>scheduler.statusUpdate(taskId, state, data.value)if (TaskState.isFinished(state)) {if (executorActor.contains(executorId)) {freeCores(executorId) += scheduler.CPUS_PER_TASKmakeOffers(executorId)} else {// Ignoring the update since we don't know about the executor.val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"logWarning(msg.format(taskId, state, sender, executorId))}}case ReviveOffers =>makeOffers()case KillTask(taskId, executorId, interruptThread) =>executorActor(executorId) ! KillTask(taskId, executorId, interruptThread)case StopDriver =>sender ! truecontext.stop(self)case StopExecutors =>logInfo("Asking each executor to shut down")for (executor <- executorActor.values) {executor ! StopExecutor}sender ! truecase RemoveExecutor(executorId, reason) =>removeExecutor(executorId, reason)sender ! truecase AddWebUIFilter(filterName, filterParams, proxyBase) =>addWebUIFilter(filterName, filterParams, proxyBase)sender ! truecase DisassociatedEvent(_, address, _) =>addressToExecutorId.get(address).foreach(removeExecutor(_,"remote Akka client disassociated"))case RetrieveSparkProps =>sender ! sparkProperties}// Make fake resource offers on all executorsdef makeOffers() {launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))}// Make fake resource offers on just one executordef makeOffers(executorId: String) {launchTasks(scheduler.resourceOffers(
        Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))}//WorkerOffer表示某Executor上的可用资源,freeCores该executor上的空闲Core// Launch tasks returned by a set of resource offersdef launchTasks(tasks: Seq[Seq[TaskDescription]]) {for (task <- tasks.flatten) {val ser = SparkEnv.get.closureSerializer.newInstance()val serializedTask = ser.serialize(task)if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +"spark.akka.frameSize or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,AkkaUtils.reservedSizeBytes)taskSet.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {freeCores(task.executorId) -= scheduler.CPUS_PER_TASKexecutorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))}}}
......
//向driverActor发送ReviveOffers请求override def reviveOffers() {driverActor ! ReviveOffers}
   // Launch tasks returned by a set of resource offersdef launchTasks(tasks: Seq[Seq[TaskDescription]]) {for (task <- tasks.flatten) {val ser = SparkEnv.get.closureSerializer.newInstance()val serializedTask = ser.serialize(task)if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +"spark.akka.frameSize or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,AkkaUtils.reservedSizeBytes)taskSet.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {freeCores(task.executorId) -= scheduler.CPUS_PER_TASKexecutorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))}}}

上面这里想executorActor发送LaunchTask事件,executorActor是一个HashMap,里面key是executorId,value是CoarseGrainedExecutorBackend。

注意:这里已经把task分发给了executor机器了,下面的执行可能已经是在远程或者本地的executor上面执行了

CoarseGrainedExecutorBackend.scala

package org.apache.spark.executorimport java.nio.ByteBufferimport scala.concurrent.Awaitimport akka.actor.{Actor, ActorSelection, Props}
import akka.pattern.Patterns
import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}private[spark] class CoarseGrainedExecutorBackend(driverUrl: String,executorId: String,hostPort: String,cores: Int,sparkProperties: Seq[(String, String)])extends Actor with ActorLogReceive with ExecutorBackend with Logging {Utils.checkHostPort(hostPort, "Expected hostport")var executor: Executor = nullvar driver: ActorSelection = nulloverride def preStart() {logInfo("Connecting to driver: " + driverUrl)driver = context.actorSelection(driverUrl)driver ! RegisterExecutor(executorId, hostPort, cores)context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])}override def receiveWithLogging = {case RegisteredExecutor =>logInfo("Successfully registered with driver")// Make this host instead of hostPort ?executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,false)case RegisterExecutorFailed(message) =>logError("Slave registration failed: " + message)System.exit(1)case LaunchTask(data) =>if (executor == null) {logError("Received LaunchTask command but executor was null")System.exit(1)} else {val ser = SparkEnv.get.closureSerializer.newInstance()val taskDesc = ser.deserialize[TaskDescription](data.value)logInfo("Got assigned task " + taskDesc.taskId)executor.launchTask(this, taskDesc.taskId, taskDesc.name, taskDesc.serializedTask)}

上面调用executor.launchTask,executor本地启动一个threadpool执行本地task,到这里task就已经开始执行了。

package org.apache.spark.executorimport java.io.File
import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.concurrent._import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util.{AkkaUtils, Utils}/*** Spark executor used with Mesos, YARN, and the standalone scheduler.*/
private[spark] class Executor(executorId: String,slaveHostname: String,properties: Seq[(String, String)],isLocal: Boolean = false)extends Logging
{// Application dependencies (added through SparkContext) that we've fetched so far on this node.// Each map holds the master's timestamp for the version of that file or JAR we got.private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()private val currentJars: HashMap[String, Long] = new HashMap[String, Long]()private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))@volatile private var isStopped = false// No ip or host:port - just hostnameUtils.checkHost(slaveHostname, "Expected executed slave to be a hostname")// must not have port specified.assert (0 == Utils.parseHostPort(slaveHostname)._2)// Make sure the local hostname we report matches the cluster scheduler's name for this hostUtils.setCustomHostname(slaveHostname)// Set spark.* properties from executor argval conf = new SparkConf(true)conf.setAll(properties)if (!isLocal) {// Setup an uncaught exception handler for non-local mode.// Make any thread terminations due to uncaught exceptions kill the entire// executor process to avoid surprising stalls.Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler)}val executorSource = new ExecutorSource(this, executorId)// Initialize Spark environment (using system properties read above)private val env = {if (!isLocal) {val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,isDriver = false, isLocal = false)SparkEnv.set(_env)_env.metricsSystem.registerSource(executorSource)_env} else {SparkEnv.get}}// Create our ClassLoader// do this after SparkEnv creation so can access the SecurityManagerprivate val urlClassLoader = createClassLoader()private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)// Set the classloader for serializerenv.serializer.setDefaultClassLoader(urlClassLoader)// Akka's message frame size. If task result is bigger than this, we use the block manager// to send the result back.private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)// Start worker thread poolval threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")// Maintains the list of running tasks.private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]startDriverHeartbeater()def launchTask(context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {val tr = new TaskRunner(context, taskId, taskName, serializedTask)runningTasks.put(taskId, tr)threadPool.execute(tr)}

这里究竟task是怎么分发的呢,为什么就突然知道每个task需要分发到哪里去了。

在CoarseGrainedSchedulerBackend在执行makeOffer的时候调用了scheduler.resourceOffers

    // Make fake resource offers on all executorsdef makeOffers() {launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))}

上面的scheduler其实是TaskSchedulerImpl,这里以一种round robin的形式分发了task,其实这就已经决定好,那个task到哪里去了。

/*** Called by cluster manager to offer resources on slaves. We respond by asking our active task* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so* that tasks are balanced across the cluster.*/def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {

下面把官方文档贴过来,感觉官方文档解释的很清楚了

================================================================================================================================

Cluster Mode Overview

This document gives a short overview of how Spark runs on clusters, to make it easier to understand the components involved. Read through theapplication submission guide to submit applications to a cluster.

Components

Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called thedriver program). Specifically, to run on a cluster, the SparkContext can connect to several types of cluster managers (either Spark’s own standalone cluster manager or Mesos/YARN), which allocate resources across applications. Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends tasks for the executors to run.

There are several useful things to note about this architecture:

  1. Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads. This has the benefit of isolating applications from each other, on both the scheduling side (each driver schedules its own tasks) and executor side (tasks from different applications run in different JVMs). However, it also means that data cannot be shared across different Spark applications (instances of SparkContext) without writing it to an external storage system.
  2. Spark is agnostic to the underlying cluster manager. As long as it can acquire executor processes, and these communicate with each other, it is relatively easy to run it even on a cluster manager that also supports other applications (e.g. Mesos/YARN).
  3. Because the driver schedules tasks on the cluster, it should be run close to the worker nodes, preferably on the same local area network. If you’d like to send requests to the cluster remotely, it’s better to open an RPC to the driver and have it submit operations from nearby than to run a driver far away from the worker nodes.

Cluster Manager Types

The system currently supports three cluster managers:

  • Standalone – a simple cluster manager included with Spark that makes it easy to set up a cluster.
  • Apache Mesos – a general cluster manager that can also run Hadoop MapReduce and service applications.
  • Hadoop YARN – the resource manager in Hadoop 2.

In addition, Spark’s EC2 launch scripts make it easy to launch a standalone cluster on Amazon EC2.

Submitting Applications

Applications can be submitted to a cluster of any type using the spark-submit script. The application submission guide describes how to do this.

Monitoring

Each driver program has a web UI, typically on port 4040, that displays information about running tasks, executors, and storage usage. Simply go to http://<driver-node>:4040 in a web browser to access this UI. The monitoring guide also describes other monitoring options.

Job Scheduling

Spark gives control over resource allocation both across applications (at the level of the cluster manager) and within applications (if multiple computations are happening on the same SparkContext). The job scheduling overview describes this in more detail.

Spar学习3:Spark运行概览相关推荐

  1. 4-spark学习笔记-spark运行模式与原理

  2. Spark基础学习笔记02:Spark运行时架构

    文章目录 零.本讲学习目标 一.Spark运行时架构 二.YARN集群架构 (一)YARN集群主要组件 1.ResourceManager - 资源管理器 2.NodeManager - 节点管理器 ...

  3. 关于图计算图学习的基础知识概览:前置知识点学习(PGL)[系列一]

    关于图计算&图学习的基础知识概览:前置知识点学习(Paddle Graph Learning (PGL)) 0.1图计算基本概念 首先看到百度百科定义: 图计算(Graph Processin ...

  4. A.关于图计算图学习的基础知识概览:前置知识点学习(Paddle Graph L)【一】

    图学习图神经网络算法专栏简介:主要实现图游走模型(DeepWalk.node2vec):图神经网络算法(GCN.GAT.GraphSage),部分进阶 GNN 模型(UniMP标签传播.ERNIESa ...

  5. Spark学习之Spark调优与调试(7)

    Spark学习之Spark调优与调试(7) 1. 对Spark进行调优与调试通常需要修改Spark应用运行时配置的选项. 当创建一个SparkContext时就会创建一个SparkConf实例. 2. ...

  6. 第三章 Spark运行模式及原理

    第三章 Spark运行模式及原理 目录 Spark运行模式概述 Local模式 Standalone模式 Local cluster模式 Mesos模式 YARN standalone/YARN cl ...

  7. spark代码连接hive_spark SQL学习(spark连接hive)

    spark 读取hive中的数据 scala> import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql. ...

  8. spark启动的worker节点是localhost_Spark大数据在线培训:Spark运行原理解析

    在大数据技术框架当中,Spark是继Hadoop之后的又一代表性框架,也是学习大数据当中必学的重点技术框架.在这些年的发展当中,Spark所占据的市场地位,也在不断拓展.今天的Spark大数据在线培训 ...

  9. spark运行pi_如何使用甜蜜的橙色Pi起床并运行

    spark运行pi 随着像Arduino和Raspberry Pi这样的开源支持的硬件变得越来越主流,其成本不断下降,这为新型和创新的IoT和STEM应用打开了大门. 作为对这两者都充满热情的人,我一 ...

最新文章

  1. Web负载均衡学习笔记之K8S内Ngnix微服务服务超时问题
  2. Eclipse和MyEclipse使用技巧--解决MyEclipse中的js报错的小方法
  3. C++最大数的幂 largest power实现算法(附完整源码)
  4. 我只是一个程序代码员吗?
  5. nginx里配置跨域
  6. (转)搭建Spring4.x.x开发环境
  7. origin调整画板大小
  8. 百度中文依存句法分析工具DDParser重磅开源
  9. Chrome浏览器下载
  10. win10商店下载位置_手把手教您win10应用商店安装目录在哪的详尽处理门径
  11. Qunee学习开发体会
  12. 普通学校,非科班,从电脑小白到大厂offer的自学之路
  13. matlab 定义结构体数组,结构体数组及其定义和使用,C语言结构体数组详解
  14. Word 2016双击格式刷无法连用的问题
  15. python多个文件打包成exe_多个py文件生成一个可运行exe文件
  16. 电话机漏电流大引起电话交换机振铃
  17. CodeForces - 1324D Pair of Topics(二分或双指针)
  18. Go实现的5G核心网开源项目free5gc源码分析系列 | Gopher Daily (2021.01.08) ʕ◔ϖ◔ʔ
  19. 【日记 2021-05-14】树莓派获取环境亮度(光照度)
  20. MonkeyPatchWarning: Monkey-patching ssl after ssl has already been imported may lead to errors

热门文章

  1. 360一代加固脱壳方法总结
  2. 【图片新闻】波音公司发布了一款令人惊叹的新型无人飞机:“忠诚的僚机”
  3. javascript经典案例
  4. php二维数组追加字段给所有数组追加
  5. mysql数据库相关面试题_20个MySQL-DB相关的经典面试题
  6. DataTime获取当前系统时间大全
  7. HTML span元素
  8. Dfine 2 降噪滤镜
  9. nginx 编译安装
  10. Vuforia(高通)识别3D物体