一、Shuffle结果的写入和读取

通过之前的文章Spark源码解读之Shuffle原理剖析与源码分析我们知道,一个Shuffle操作被DAGScheduler划分为两个stage,第一个stage是ShuffleMapTask,第二个是ResultTask。ShuffleMapTask会产生临时计算结果,这些数据会被ResultTask作为输入而读取。

原文地址:原文链接

那么ShuffleMapTask的计算结果是如何被ResultTask取得的呢?过程如下:

  1. ShuffleMapTask将计算状态(不是具体的计算数值)包装为MapStatus返回给DAGScheduler。
  2. DAGScheduler将MapStatus保存到MapOutputTrackerMaster中。
  3. ResultTask在调用到ShuffleRDD时会利用BlockShuffleFetcher的fetch方法去获取数据。首先是咨询MapOutputTracker所要取的数据的location;然后根据返回的结果调用BlockManager.getMultiple获取真正的数据。

每一个ShuffleMapTask都会用一个MapStatus来保存计算结果。MapStatus是由BlockManagerId和ByeteSize构成,BlockManagerId表示这些计算的中间结果的实际数据在哪个BlockManager,ByteSize表示不同reduceid所要读取的数据的大小。

private[spark] sealed trait MapStatus {/** Location where this task was run. */def location: BlockManagerId/*** Estimated size for the reduce block, in bytes.** If a block is non-empty, then this method MUST return a non-zero size.  This invariant is* necessary for correctness, since block fetchers are allowed to skip zero-size blocks.*///不同reduceID所要读取的数据的大小def getSizeForBlock(reduceId: Int): Long
}

1. Shuffle结果的写入

Shuffle的写入过程如下:

ShuffleMapTask.runTask ----> HashShuffleWriter.writer ----> BlockObjectWriter.writer

ShuffleMapTask中runTask方法源码如下:

 override def runTask(context: TaskContext): MapStatus = {//使用广播变量反序列化RDD// Deserialize the RDD using the broadcast variable.val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)metrics = Some(context.taskMetrics)var writer: ShuffleWriter[Any, Any] = nulltry {//获取ShuffleManager,从ShuffleManager中获取ShuffleWriterval manager = SparkEnv.get.shuffleManagerwriter = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)//首先调用rdd的iterator方法,并且传入了当前task要处理那个partition,然后执行我们定义的函数//处理返回的数据都是通过ShuffleWriter,经过HashPartitioner进行分区之后,写入了自己对应的bucketwriter.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])//最后返回结果,MapStatus//MapStatus里面封装了ShffleMapTask计算后的数据,存储在哪里,其实就是BlockManager的信息//BlockManager是spark底层内存,数据,磁盘数据管理的组件return writer.stop(success = true).get} catch {case e: Exception =>try {if (writer != null) {writer.stop(success = false)}} catch {case e: Exception =>log.debug("Could not stop writer", e)}throw e}}

在HashShuffleWriter.writer中主要处理两件事:

  1. 判断是否需要进行聚合操作,比如有<hello,1>,<hello,1>都需要写入的话,那么需要写成<hello,2>,然后再进行后续操作。
  2. 利用Partition函数来决定<key,value>写入哪个文件中。

HashShuffleWriter中的writer方法源码如下:

 /** Write a bunch of records to this task's output *//*** 将每个ShuffleMapTask计算出来的新的RDD的partition数据,写入本地磁盘* @param records*/override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {//判断是否需要进行本地,如果是reduceByKey这种操作,则要进行聚合操作//即dep.aggregator.isDefined为true//dep.mapSideCombine也为trueval iter = if (dep.aggregator.isDefined) {if (dep.mapSideCombine) {//这里进行本地聚合操作,比如本地有(hello,1),(hello,1)//则可以聚合成(hello,2)dep.aggregator.get.combineValuesByKey(records, context)} else {records}} else {require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")records}//如果需要本地聚合,则先进行聚合//然后遍历数据,对每一个数据,进行partition操作,默认的是HashPartitioner,并且生成bucketId//也就表示这数据要写入哪一个bucketfor (elem <- iter) {//计算bucketIdval bucketId = dep.partitioner.getPartition(elem._1)//调用shuffleBlockManager.forMapTask()方法生成bucketId对应的writer,然后用writer将数据写入bucket//DiskBlockObjectWriter负责将数据真正写入磁盘shuffle.writers(bucketId).write(elem)}}

在上面writer方法中,使用到的Shuffle由ShuffleBlockManager中的forMapTask函数生成,该方法源码如下:

/*** Get a ShuffleWriterGroup for the given map task, which will register it as complete* when the writers are closed successfully*//*** 给每一个map task生成 一个ShuffleWriterGroup*/def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer,writeMetrics: ShuffleWriteMetrics) = {new ShuffleWriterGroup {shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))private val shuffleState = shuffleStates(shuffleId)private var fileGroup: ShuffleFileGroup = nullval openStartTime = System.nanoTime//判断是否开启了consolidate优化,如果开启了,就不会为每一个bucket获取一个输出文件//而是为每一个bucket获取一个ShuffleGroup的writeval writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {fileGroup = getUnusedFileGroup()Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>//首先生成一个唯一的blockId,然后用bucketId来调用ShuffleFileGroup的apply函数来获取一个writerval blockId = ShuffleBlockId(shuffleId, mapId, bucketId)//使用blockManager.getDiskWriter()函数来获取一个writer//实际上在开启优化配置后,对一个bucketId,不再是像之前一样获取一个独立的ShuffleBlockFile的writer//而是获取ShuffleFileGroup中的一个writer//这样就实现了多个ShufffleMapTask的输出文件的合并blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,writeMetrics)}} else {//如果没有进行shuffle优化配置,也会针对每一个shuffleMapTask创建一个ShuffleBlockFileArray.tabulate[BlockObjectWriter](numBuckets) { bucketId =>val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)val blockFile = blockManager.diskBlockManager.getFile(blockId)// Because of previous failures, the shuffle file may already exist on this machine.// If so, remove it.//如果ShuffleBlockFile存在,则进行删除if (blockFile.exists) {if (blockFile.delete()) {logInfo(s"Removed existing shuffle file $blockFile")} else {logWarning(s"Failed to remove existing shuffle file $blockFile")}}//写入磁盘中blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)}}// Creating the file to write to and creating a disk writer both involve interacting with// the disk, so should be included in the shuffle write time.writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)override def releaseWriters(success: Boolean) {if (consolidateShuffleFiles) {if (success) {val offsets = writers.map(_.fileSegment().offset)val lengths = writers.map(_.fileSegment().length)fileGroup.recordMapOutput(mapId, offsets, lengths)}recycleFileGroup(fileGroup)} else {shuffleState.completedMapTasks.add(mapId)}}private def getUnusedFileGroup(): ShuffleFileGroup = {val fileGroup = shuffleState.unusedFileGroups.poll()if (fileGroup != null) fileGroup else newFileGroup()}private def newFileGroup(): ShuffleFileGroup = {val fileId = shuffleState.nextFileId.getAndIncrement()val files = Array.tabulate[File](numBuckets) { bucketId =>val filename = physicalFileName(shuffleId, bucketId, fileId)blockManager.diskBlockManager.getFile(filename)}val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)shuffleState.allFileGroups.add(fileGroup)fileGroup}private def recycleFileGroup(group: ShuffleFileGroup) {shuffleState.unusedFileGroups.add(group)}}}

在上面的源码中涉及到Shuffle的优化原理,细节可以查看上篇文章Spark源码解读之Shuffle原理剖析与源码分析
在gieFile方法中负责将Shuffle需要写入的数据映射为一个文件。

/** Looks up a file by hashing it into one of our local subdirectories. */// This method should be kept in sync with// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile().//负责将三元组(shuffle_id,map_id,reduce_id)映射到文件名def getFile(filename: String): File = {// Figure out which local directory it hashes to, and which subdirectory in thatval hash = Utils.nonNegativeHash(filename)val dirId = hash % localDirs.lengthval subDirId = (hash / localDirs.length) % subDirsPerLocalDir// Create the subdirectory if it doesn't already existvar subDir = subDirs(dirId)(subDirId)if (subDir == null) {subDir = subDirs(dirId).synchronized {val old = subDirs(dirId)(subDirId)if (old != null) {old} else {val newDir = new File(localDirs(dirId), "%02x".format(subDirId))if (!newDir.exists() && !newDir.mkdir()) {throw new IOException(s"Failed to create local dir in $newDir.")}subDirs(dirId)(subDirId) = newDirnewDir}}}new File(subDir, filename)}

最后使用DiskBlockObjectWriter.writer负责将数据真正写入磁盘中。

 override def write(value: Any) {if (!initialized) {open()}objOut.writeObject(value)numRecordsWritten += 1writeMetrics.incShuffleRecordsWritten(1)if (numRecordsWritten % 32 == 0) {updateBytesWritten()}}

2. Shuffle结果读取

Shuffle结果的读取过程如下所示:

ShuffleRDD.compute ---> HashShuffleRead.read ---> BlockStoreShuffleFetcher.fetch ---> BlockManager.getMultiple

ShuffleRDD的compute函数是读取ShuffleMapTask计算结果的出发点。compute源码如下:

 /***shuffle的入口*/override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {//这里会调用shuffleManager.getReader()来获取一个HashShuffleReader//然后调用它的reader方法来拉取resultTask需要聚合的数据val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context).read().asInstanceOf[Iterator[(K, C)]]}

在这里使用HashShuffleReader调用reader方法获取合并后的数据,源码如下所示:

/** Read the combined key-values for this reduce task */override def read(): Iterator[Product2[K, C]] = {val ser = Serializer.getSerializer(dep.serializer)//通过BlockStoreShuffleFetcher的fetch方法来从DAGScheduler的MapOutputTrackerMaster中获取//自己需要的数据的信息,然后底层再通过对应的BlockManager拉取需要的数据val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {if (dep.mapSideCombine) {new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))} else {new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))}} else {require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")// Convert the Product2s to pairs since this is what downstream RDDs currently expectiter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))}// Sort the output if there is a sort ordering defined.dep.keyOrdering match {case Some(keyOrd: Ordering[K]) =>// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,// the ExternalSorter won't spill to disk.val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))sorter.insertAll(aggregatedIter)context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)sorter.iteratorcase None =>aggregatedIter}}

在reader函数中调用BlockStoreShuffleFetcher的fetch方法去获取MapStatus,最后通过BlockManager去真正获取数据。源码如下:

private[hash] object BlockStoreShuffleFetcher extends Logging {def fetch[T](shuffleId: Int,reduceId: Int,context: TaskContext,serializer: Serializer): Iterator[T] ={logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))val blockManager = SparkEnv.get.blockManagerval startTime = System.currentTimeMillis//获取一个全局的MapOutputTracker,并且调用其getServerStatuses方法//注意这里传入了两个参数,shuffleId和reduceId//shuffle有两个stage参与,因此shuffleId代表表示上一个stage,使用这个参数来获取//上一个stage的ShuffleMapTask shuffle write输出的MapStatus数据信息//在获取到MapStatus之后,还要使用reduceId来拉取当前stage需要获取的之前stage的ShuffleMapTask的输出文件信息//这个getServerStatuses方法是需要走网络通信的,因为它要连接Driver上的DAGScheduler来获取MapOutputTracker上的数据信息val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(shuffleId, reduceId, System.currentTimeMillis - startTime))val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]for (((address, size), index) <- statuses.zipWithIndex) {splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))}val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {case (address, splits) =>(address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))}def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = {val blockId = blockPair._1val blockOption = blockPair._2blockOption match {case Success(block) => {block.asInstanceOf[Iterator[T]]}case Failure(e) => {blockId match {case ShuffleBlockId(shufId, mapId, _) =>val address = statuses(mapId.toInt)._1throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)case _ =>throw new SparkException("Failed to get block " + blockId + ", which is not a shuffle block", e)}}}}val blockFetcherItr = new ShuffleBlockFetcherIterator(context,SparkEnv.get.blockManager.shuffleClient,blockManager,blocksByAddress,serializer,SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)val itr = blockFetcherItr.flatMap(unpackBlock)val completionIter = CompletionIterator[T, Iterator[T]](itr, {context.taskMetrics.updateShuffleReadMetrics()})new InterruptibleIterator[T](context, completionIter) {val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()override def next(): T = {readMetrics.incRecordsRead(1)delegate.next()}}}
}

在MapOutputTracker中调用getServerStatuses在Executor中获取ShuffleMapTask输出结果数据的所在的URL和Size,源码如下:

 /*** Called from executors to get the server URIs and output sizes of the map outputs of* a given shuffle.*/def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {val statuses = mapStatuses.get(shuffleId).orNullif (statuses == null) {logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")var fetchedStatuses: Array[MapStatus] = nullfetching.synchronized {// Someone else is fetching it; wait for them to be done//等待抓取数据while (fetching.contains(shuffleId)) {try {fetching.wait()} catch {case e: InterruptedException =>}}// Either while we waited the fetch happened successfully, or// someone fetched it in between the get and the fetching.synchronized.fetchedStatuses = mapStatuses.get(shuffleId).orNullif (fetchedStatuses == null) {// We have to do the fetch, get others to wait for us.fetching += shuffleId}}if (fetchedStatuses == null) {// We won the race to fetch the output locs; do sologInfo("Doing the fetch; tracker actor = " + trackerActor)// This try-finally prevents hangs due to timeouts:try {val fetchedBytes =askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]]fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)logInfo("Got the output locations")mapStatuses.put(shuffleId, fetchedStatuses)} finally {fetching.synchronized {fetching -= shuffleIdfetching.notifyAll()}}}if (fetchedStatuses != null) {fetchedStatuses.synchronized {return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)}} else {logError("Missing all output locations for shuffle " + shuffleId)throw new MetadataFetchFailedException(shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)}} else {statuses.synchronized {return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)}}}

一个ShuffleMapTask会生成一个MapStatus,在MapStatus中含有当前ShuffleMapTask产生的数据落到各个Partition中的大小。如果大小为0,则表示该分区中没有数据产生。每一个分区中的数据大小使用一个byte来表示的,但是一个byte最多只能表示255,如何表示更大的size呢?这里就使用到了巧妙的转换,使用1.1作为对数底,可以将28,转换为1.1256。MapStatus中的compressSize和decompressSize的作用,就是将数据的大小用另一种进制来表示,这样就可以让表达的空间从0至255转换为0至35903328256,单个存储的大小可以高达近35GB。

源码如下:

/*** Compress a size in bytes to 8 bits for efficient reporting of map output sizes.* We do this by encoding the log base 1.1 of the size as an integer, which can support* sizes up to 35 GB with at most 10% error.*/def compressSize(size: Long): Byte = {if (size == 0) {0} else if (size <= 1L) {1} else {math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte}}/*** Decompress an 8-bit encoded block size, using the reverse operation of compressSize.*/def decompressSize(compressedSize: Byte): Long = {if (compressedSize == 0) {0} else {math.pow(LOG_BASE, compressedSize & 0xFF).toLong}}

ShuffleId唯一标识了一个job中的stage,这一个stage是作为ReduceTask所在Stage的直接上游。需要遍历该Stage中每一个Task产生的mapStatus来获知是否有当前ResultTask需要读取的数据。

在BlockManager中首先会调用initialize函数进行初始化,初始化BlockTransferService 和 ShuffleClient,向BlockManagerMaster进行注册,并且在BlockManagerWorker中注册本地的Shuffle service。如果所要获取的文件落在本地,则调用getLocal获取,否则调用getRemote远程拉取。initialize函数源码如下:

/*** Initializes the BlockManager with the given appId. This is not performed in the constructor as* the appId may not be known at BlockManager instantiation time (in particular for the driver,* where it is only learned after registration with the TaskScheduler).** This method initializes the BlockTransferService and ShuffleClient, registers with the* BlockManagerMaster, starts the BlockManagerWorker actor, and registers with a local shuffle* service if configured.*/def initialize(appId: String): Unit = {blockTransferService.init(this)shuffleClient.init(appId)blockManagerId = BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port)shuffleServerId = if (externalShuffleServiceEnabled) {BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)} else {blockManagerId}master.registerBlockManager(blockManagerId, maxMemory, slaveActor)// Register Executors' configuration with the local shuffle service, if one should exist.if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {registerWithExternalShuffleServer()}}

Shuffle操作会消耗大量的内存,具体体现在下面几个方面:

  • 每个Writer开启100KB的缓存。
  • Records会占用大量内存。
  • 在ResultTask的combine阶段,利用HashMap来缓存数据,如果读取的数据量很大,或者分区很多,可能导致内存不足。

二、Memory Store

在上面我们剖析了Shuffle的存储过程,对于Spark,它首先会将RDD缓存在内存中,其次磁盘等,那么它的存取过程是怎样的呢?下面我们来看看Spark的存储系统的框架图:

以上框架图主要包含以下几个模块:

  • CacheManager:RDD进行计算的时候,通过CacheManager来获取数据,并通过CacheManager来存储计算结果。
  • BlockManager:CacheManager在进行数据的读取和存储的时候主要依赖BlockManager接口来操作,BlockManager决定数据是从内存还是从磁盘中获取。
  • MemoryStore:负责将数据存储在内存中或从内存中读取。
  • DiskStore:负责将 数据写入磁盘或者从磁盘读入。
  • BlockManagerWorker:数据写入本地的MemoryStore或者DiskStore是一个同步操作,为了容错还可能将数据复制到别的计算节点,以便数据丢失的时候还能够恢复,数据复制的操作是异步操作,由BlockManagerWorker来完成。
  • ConnectionManager:负责与其他计算节点建立连接,并且负责数据的发送和接收。
  • BlockManagerMaster:该模块只在Driver所运行的Executor中运行,主要功能是记录所有BlockId存储在哪个SlaveWroker上。如果一个RDD Task运行所需要的Block不在本地机器上,这时候Worker需要询问Master该Block的位置,然后通过ConnectionManager去连接获取。

1.启动过程

在SparkEnv中初始化过程源码如下:

//创建各个子模块val blockManagerMaster = new BlockManagerMaster(registerOrLookup("BlockManagerMaster",new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)// NB: blockManager is not valid until initialize() is called later.val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,numUsableCores)val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)val cacheManager = new CacheManager(blockManager)

在registerOrLookup函数中,如果当前节点是Driver则创建这个Actor,否则建立到Driver的连接,取得BlockManagerMaster的Actor。

 def registerOrLookup(name: String, newActor: => Actor): ActorRef = {//如果当前节点是Driver则创建 Actorif (isDriver) {logInfo("Registering " + name)actorSystem.actorOf(Props(newActor), name = name)//否则建立到Driver连接,取得BlockManagerMaster} else {AkkaUtils.makeDriverRef(name, conf, actorSystem)}}

2. 数据的写入过程

数据写入过程简述如下:

  1. RDD.iterator是与Storage子系统交互的入口。
  2. CacheManager.getOrCompute中调用BlockManager的doPut方法来写入数据。
  3. 数据优先写入内存中,如果内存已经满了,则将数据刷新到磁盘中。
  4. 通过BlockManagerMaster中有新的数据写入,在BlockManagerMaster中保存元数据。
  5. 如果数据备份数目大于1,则将写入的数据同步到其他Worker中。

RDD.iterator方法源码如下:

 /*** Internal method to this RDD; will read from cache if applicable, or otherwise compute it.* This should ''not'' be called by users directly, but is available for implementors of custom* subclasses of RDD.*///与子Storage子系统交互的入口final def iterator(split: Partition, context: TaskContext): Iterator[T] = {if (storageLevel != StorageLevel.NONE) {SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)} else {computeOrReadCheckpoint(split, context)}}

CacheManager.getOrCompute源码如下:

 def getOrCompute[T](rdd: RDD[T],partition: Partition,context: TaskContext,storageLevel: StorageLevel): Iterator[T] = {val key = RDDBlockId(rdd.id, partition.index)logDebug(s"Looking for partition $key")blockManager.get(key) match {case Some(blockResult) =>// Partition is already materialized, so just return its valuesval inputMetrics = blockResult.inputMetricsval existingMetrics = context.taskMetrics.getInputMetricsForReadMethod(inputMetrics.readMethod)existingMetrics.incBytesRead(inputMetrics.bytesRead)val iter = blockResult.data.asInstanceOf[Iterator[T]]new InterruptibleIterator[T](context, iter) {override def next(): T = {existingMetrics.incRecordsRead(1)delegate.next()}}case None =>// Acquire a lock for loading this partition// If another thread already holds the lock, wait for it to finish return its resultsval storedValues = acquireLockForPartition[T](key)if (storedValues.isDefined) {return new InterruptibleIterator[T](context, storedValues.get)}// Otherwise, we have to load the partition ourselvestry {logInfo(s"Partition $key not found, computing it")//判断是否进行了checkPoint操作,如果没有则进行计算val computedValues = rdd.computeOrReadCheckpoint(partition, context)// If the task is running locally, do not persist the resultif (context.isRunningLocally) {return computedValues}// Otherwise, cache the values and keep track of any updates in block statusesval updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]//缓存计算结果,默认MEMORY_AND_DISK级别val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)val metrics = context.taskMetricsval lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)new InterruptibleIterator(context, cachedValues)} finally {loading.synchronized {loading.remove(key)loading.notifyAll()}}}}

putInBlockManager方法源码如下:

 /*** Cache the values of a partition, keeping track of any updates in the storage statuses of* other blocks along the way.** The effective storage level refers to the level that actually specifies BlockManager put* behavior, not the level originally specified by the user. This is mainly for forcing a* MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,* while preserving the the original semantics of the RDD as specified by the application.*/private def putInBlockManager[T](key: BlockId,values: Iterator[T],level: StorageLevel,updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {val putLevel = effectiveStorageLevel.getOrElse(level)//如果没有缓存到内存中,则进行计算,并且作为BlockManager的一个iterator,而不是展现在内存中if (!putLevel.useMemory) {/** This RDD is not to be cached in memory, so we can just pass the computed values as an* iterator directly to the BlockManager rather than first fully unrolling it in memory.*/updatedBlocks ++=blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)blockManager.get(key) match {case Some(v) => v.data.asInstanceOf[Iterator[T]]case None =>logInfo(s"Failure to store $key")throw new BlockException(key, s"Block manager failed to return cached value for $key!")}} else {/** This RDD is to be cached in memory. In this case we cannot pass the computed values* to the BlockManager as an iterator and expect to read it back later. This is because* we may end up dropping a partition from memory store before getting it back.** In addition, we must be careful to not unroll the entire partition in memory at once.* Otherwise, we may cause an OOM exception if the JVM does not have enough space for this* single partition. Instead, we unroll the values cautiously, potentially aborting and* dropping the partition to disk if applicable.*///如果RDD缓存到内存中了,这时不需要进行计算,需要读取缓存的RDD之后返回,否则可能因为在读取返回之前将其删除导致RDD//丢失。另外,不能将整个partition展现在内存中,否则可能会出现OOM,可进行适当刷新数据到磁盘上blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {case Left(arr) =>// We have successfully unrolled the entire partition, so cache it in memoryupdatedBlocks ++=blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)arr.iterator.asInstanceOf[Iterator[T]]case Right(it) =>// There is not enough space to cache this partition in memory//没有足够的内存写入磁盘val returnValues = it.asInstanceOf[Iterator[T]]if (putLevel.useDisk) {logWarning(s"Persisting partition $key to disk instead.")val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,useOffHeap = false, deserialized = false, putLevel.replication)putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))} else {returnValues}}}}

这时进入BlockManager中在putArray中调用doPut方法:

 /*** Put a new block of values to the block manager.* Return a list of blocks updated as a result of this put.*/def putArray(blockId: BlockId,values: Array[Any],level: StorageLevel,tellMaster: Boolean = true,effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {require(values != null, "Values is null")//调用doPut方法doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)}

在doPut方法中,如果replicate大于1,则调用replicate方法进行备份,然后缓存数据到内存,tachyon或者磁盘中,最后向Master报告每一个Block的信息。

 // If we're storing bytes, then initiate the replication before storing them locally.// This is faster as data is already serialized and ready to send.val replicationFuture = data match {//如果备份数目大于1,调用replicate函数将数据备份到其他节点case b: ByteBufferValues if putLevel.replication > 1 =>// Duplicate doesn't copy the bytes, but just creates a wrapperval bufferView = b.buffer.duplicate()Future { replicate(blockId, bufferView, putLevel) }case _ => null}
 // Keep track of which blocks are dropped from memoryif (putLevel.useMemory) {result.droppedBlocks.foreach { updatedBlocks += _ }}val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)if (putBlockStatus.storageLevel != StorageLevel.NONE) {// Now that the block is in either the memory, tachyon, or disk store,// let other threads read it, and tell the master about it.marked = trueputBlockInfo.markReady(size)if (tellMaster) {//将数据缓存到内存,tachyon或者磁盘上之后向master报告并且 写入每一个block的信息reportBlockStatus(blockId, putBlockInfo, putBlockStatus)}updatedBlocks += ((blockId, putBlockStatus))}

reportBlockStatus方法源码如下:

/*** Tell the master about the current storage status of a block. This will send a block update* message reflecting the current status, *not* the desired storage level in its block info.* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.** droppedMemorySize exists to account for when the block is dropped from memory to disk (so* it is still valid). This ensures that update in master will compensate for the increase in* memory on slave.*/private def reportBlockStatus(blockId: BlockId,info: BlockInfo,status: BlockStatus,droppedMemorySize: Long = 0L): Unit = {val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)if (needReregister) {logInfo(s"Got told to re-register updating block $blockId")// Re-registering will report our new block for free.asyncReregister()}logDebug(s"Told master about block $blockId")}

3. 数据读取过程

数据读取的入口是get函数,首先尝试从本地获取数据,如果数据不在本地则从远程获取:

 /*** Get a block from the block manager (either local or remote).*/def get(blockId: BlockId): Option[BlockResult] = {//首先尝试从本地获取,如果数据在本地返回,否则从远程拉取数据val local = getLocal(blockId)if (local.isDefined) {logInfo(s"Found block $blockId locally")return local}val remote = getRemote(blockId)if (remote.isDefined) {logInfo(s"Found block $blockId remotely")return remote}None}

获取本地数据时,首先尝试从内存中获取,接着到堆外内存中尝试或者,最后尝试去磁盘中读取数据。
远程获取数据调用路径为getRemote ---> doGetRemote ---> BlockTransferService.fetchBlockSync
fetchBlockSync方法的源码如下,通过BlockFetchingListener监视器来得知获取数据是否成功:

/*** A special case of [[fetchBlocks]], as it fetches only one block and is blocking.** It is also only available after [[init]] is invoked.*/def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = {// A monitor for the thread to wait on.val result = Promise[ManagedBuffer]()fetchBlocks(host, port, execId, Array(blockId),new BlockFetchingListener {//获取数据失败override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {result.failure(exception)}//获取数据成功override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {val ret = ByteBuffer.allocate(data.size.toInt)ret.put(data.nioByteBuffer())ret.flip()result.success(new NioManagedBuffer(ret))}})Await.result(result.future, Duration.Inf)}

至此,关于Spark的存储机制的源码剖析结束,如有任何问题,欢迎留言讨论。

Spark存储机制源码剖析相关推荐

  1. Zookeeper--Watcher机制源码剖析二

    Watcher触发 我们从实际操作时候的表现来看Watcher的触发,比如Zookeeper中NodeDataChanged时间的触发是"Watcher监听的对应数据节点的数据内容发生变更& ...

  2. Zookeeper--Watcher机制源码剖析一

    Watcher-- 数据变更通知 我们知道Zookeeper提供来分布式数据的订阅/发布功能,一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能让多个订阅者同时监听某个主题对象,当这个被监听对 ...

  3. Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

    Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadO ...

  4. Spark Shuffle机制-源码实现

    . 一 .前言 二 .Shuffle Write框架设计和实现 2.1. BypassMergeSortShuffleWriter 2.2. UnsafeShuffleWriter 2.3. Sort ...

  5. Spark读取配置源码剖析

    我们知道,有一些配置可以在多个地方配置.以配置executor的memory为例,有以下三种方式:1. spark-submit的--executor-memory选项2. spark-default ...

  6. python源码剖析代码例子_Python源码剖析笔记5-模块机制

    python中经常用到模块,比如import xxx,from xxx import yyy这样子,里面的机制也是需要好好探究一下的,这次主要从黑盒角度来探测模块机制,源码分析点到为止,详尽的源码分析 ...

  7. Swoft 源码剖析 - Swoft 中的注解机制

    作者:bromine 链接:https://www.jianshu.com/p/ef7... 來源:简书 著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版. Swoft Github ...

  8. boost源码剖析之:多重回调机制signal(下)

    boost源码剖析之:多重回调机制signal(下) 刘未鹏 C++的罗浮宫(http://blog.csdn.net/pongba) 在本文的上篇中,我们大刀阔斧的剖析了signal的架构.不过还有 ...

  9. boost源码剖析之:多重回调机制signal(上)

    boost源码剖析之:多重回调机制signal(上) 刘未鹏 C++的罗浮宫(http://blog.csdn.net/pongba) boost库固然是技术的宝库,却更是思想的宝库.大多数程序员都知 ...

最新文章

  1. 奖客富翁系统python_作业 2018-12-28 20.1 奖客富翁
  2. JavaScript 第十章总结:first class functions
  3. 缓存设计方案 你了解吗 SpringBoot 快速集成实现一级缓存Redis和二级缓存Caffeine 可自定义扩展
  4. 集成电路设计的运作模式
  5. AutoIT如何制作自动化安装脚本-SketchUp 2017
  6. SAR空间自回归模型
  7. 蚂蚁金服CTO程立谈创新发展数字时代金融关键技术
  8. 第五章 整合视图层技术
  9. codevs 3315 时空跳跃者的魔法 MST
  10. 小虾米闯江湖服务器维护中,小虾米闯江湖数据总结及中期注意事项一览
  11. JDBC API 学习
  12. 如何使用Java获取货币符号?
  13. 会计电算化什么是计算机硬件,会计电算化考试计算机硬件、软件.doc
  14. 唱吧创始人:可能再过半年,这一波创业潮就彻底消失了
  15. 【ACWing】1278. 树的统计
  16. python手机壁纸超清_python爬虫学习之爬取5K分辨率超清唯美壁纸
  17. mysql datetime 计算相隔时间
  18. RabbitMq(1)之安装
  19. win7一直卡在正在启动windows
  20. filecoin工作原理

热门文章

  1. js获取html body的宽度,JS获取元素的宽度和高度
  2. zookeeper-3.5.5安装报错:找不到或无法加载主类 org.apache.zookeeper.server.quorum.QuorumPeerMain-新版本zookeeper易犯错误
  3. 今天给大家聊一聊Apple ID的那些事
  4. 散户投资者如何成为“七亏二平一赚”中的一赚?
  5. python基础:def函数
  6. python艺术画_Python turtle艺术画赏析
  7. 使用Pageoffice打开Office word报错0x80040154、POBrowse问题
  8. 控制Android充电震动的代码,Android编程实现手机震动功能的方法
  9. 华为OD真题:学习成长记录篇:深度优先搜索算法+递归思想
  10. c语言int四个字节取值范围,c语言有符号int取值范围