内存池MemoryPool是对存储内存的具体管理,内存管理器MemoryManager是提供给外界进行管理内存的接口,而MemoryStore是用来将数据块保存到申请的storage内存中,并提供了从内存获取保存的数据的方法。在storage内存不足时,负责将内存中保存的数据刷新到磁盘上并释放占用的内存。MemoryStore在保存数据之前,会调用MemoryManager的相关acquire方法,判断StorageMemoryPool中是否有足够的内存可以分配,如果可用内存不足则直接返回false,由调用者调用BlockEvictionHandler.dropFromMemory来移除内存中缓存的数据块,释放内存空间。如果可用内存充足则直接将数据块保存到内存中。本文先介绍与MemoryStore相关的MemoryEntry,然后详细分析MemoryStore的主要源码。

MemoryEntry

MemoryEntry是块在内存中的抽象表示,定义如下:

// 内存中的Block抽象为特质MemoryEntry
private sealed trait MemoryEntry[T] {def size: Long // 当前Block的大小def memoryMode: MemoryMode // 当前Block的存储的内存类型def classTag: ClassTag[T] // 当前Block的类型标记
}

size表示块大小,memoryMode表示块存储在堆内内存还是堆外内存,classTag则是该块所存储的对象的类型标记。MemoryEntry有序列化和反序列化的两种实现,如下所示:

// 表示反序列化后的MemoryEntry
private case class DeserializedMemoryEntry[T](value: Array[T],size: Long,classTag: ClassTag[T]) extends MemoryEntry[T] {val memoryMode: MemoryMode = MemoryMode.ON_HEAP
}// 表示序列化后的MemoryEntry
private case class SerializedMemoryEntry[T](buffer: ChunkedByteBuffer,memoryMode: MemoryMode,classTag: ClassTag[T]) extends MemoryEntry[T] {def size: Long = buffer.size
}

可以看到,反序列化的DeserializedMemoryEntry只能用堆内内存存储<ON_HEAP>,其数据是T类型的对象的数组。序列化的SerializedMemoryEntry能用堆内和堆外内存存储,数据用字节缓存ChunkedByteBuffer包装,并且其长度就是该SerializedMemoryEntry的大小。

ValueHolder

在以迭代器数据形式写入存储内存数据时候,插入数据最主要的工作是由ValuesHolder对象来完成的。ValuesHolder特质有两个实现类:DeserializedValuesHolder和SerializedValuesHolder,我们来简单分析下这两个类。

  1. DeserializedValuesHolder对象内部有两个成员:vector,是一个SizeTrackingVector,可以估算数组中元素的大小,同时可以自动扩容;arrayValues,是一个存放值的数组,用于在所有数据插入后,将vector中数据转移到一个数组中,然后包装成一个DeserializedMemoryEntry对象<数据是T类型的对象数组>,工作大部分是由SizeTrackingVector来做的。

  2. SerializedValuesHolder对象是对SerializedMemoryEntry对象构建的辅助类,使用包装的压缩流和序列化流,对数据进行序列化,压缩,然后写入到ChunkedByteBuffer中,最后包装成SerializedMemoryEntry,记录在MemoryStore中。

MemoryStore

MemoryStore依赖于MemoryManager,块写入时候,需要从MemoryManager中获取on-heap/off-heap的存储内存,分配给Block存储用;块删除时候,需要向MemoryManager归还相应占用的存储内存。

构造与属性成员

我们先来看一下MemoryStore构造方法和成员属性,如下所示:

private[spark] class MemoryStore(conf: SparkConf,blockInfoManager: BlockInfoManager, // 块元信息管理器serializerManager: SerializerManager, // 序列化memoryManager: MemoryManager, // 负责存储内存分配blockEvictionHandler: BlockEvictionHandler) // 负责从内存中删除占用执行内存的空间的块,将其存储到磁盘里面,释放空间extends Logging {// LRU map, 在删除时候可以根据访问时间进行删除最早未使用的Blockprivate val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)// TaskAttempt线程的标识TaskAttemptId与该TaskAttempt线程在堆内存展开的所有Block占用的内存大小之和之间的映射关系。private val onHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()// TaskAttempt线程的标识TaskAttemptId与该TaskAttempt线程在堆外内存展开的所有Block占用的内存大小之和之间的映射关系private val offHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
}

可以看出来,MemoryStore一共有8个属性:

  1. conf: spark配置信息。
  2. blockInfoManager: 负责块元信息管理。
  3. serializerManager: 负责序列化处理。
  4. memoryManager: 负责存储内存的分配和回收。
  5. blockEvictionHandler: 驱逐块的特质,只有BlockManager里面实现了。
  6. Entries:使用LinkedHashMap存储的BlockId->MemoryEntry的Map,这个数据结构内部实现了LRU,在删除时候会先删除最早未被访问过的块。
  7. onHeapUnrollMemoryMap: 记录了TaskAttemptId与该TaskAttempt线程在堆内存展开的所有Block占用的内存大小之和之间的映射关系。
  8. offHeapUnrollMemoryMap: 记录了TaskAttemptId与该TaskAttempt线程在堆外内存展开的所有Block占用的内存大小之和之间的映射关系。

直接写入字节

直接写入字节方法比较简单,首先通过MemoryManager方法申请所需的内存,然后调用参数中传入的偏函数_bytes,获取已经转化为ChunkedByteBuffer的数据,再创建出对应的SerializedMemoryEntry,并将该MemoryEntry放入entries映射。注意LinkedHashMap本身不是线程安全的,因此对其并发访问都要加锁。

def putBytes[T: ClassTag](blockId: BlockId,size: Long,memoryMode: MemoryMode,_bytes: () => ChunkedByteBuffer): Boolean = {require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) { // 申请空间,能申请到空间,可以从执行内存租借内存// We acquired enough memory for the block, so go ahead and put itval bytes = _bytes()  // 获取Block的数据,函数产出ChunkedByteBuffer<大多数情况下是将其他类型的数据变为ChunkedByteBuffer类型>assert(bytes.size == size)// 序列化val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])entries.synchronized { // 将Block数据写入entries,即写入内存entries.put(blockId, entry)}logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))true} else { // 空间不足false}
}

写入迭代器化的数据

迭代器化的数据,就是指用Iterator[T]形式表示的块数据。之所以会这样表示,是因为有时单个块对应的数据可能过大,不能一次性存入内存。为了避免造成OOM,就可以一边遍历迭代器,一边周期性地写内存,并检查内存是否够用,就像翻书一样。“展开”(Unroll)这个词形象地说明了该过程。不过unroll memory和storage memory本质上是同一份内存,只是在任务执行的不同阶段的不同逻辑表述形式。在从hdfs文件的partition数据的读取到存储内存过程中,这份内存叫做unroll memory,而当成功读取存储了所有record到内存中后,这份内存就改了个名字叫storage memory了。注意,unroll memory的概念只存在于spark的存储模块中,在执行模块中是不存在unroll memory的。 我们先来看一下写入操作调用全过程:

ShuffleMapTask/ResultTask.runTask -> RDD.iterator -> RDD.getOrCompute -> BlockManager.getOrElseUpdate-> BlockManager.doPutIterator -> MemoryStore.putIteratorAsBytes -> MemoryStore.putIterator -> MemoryStore.reserveUnrollMemoryForThisTask ->  MemoryManager.acquireUnrollMemory

可以看到,task[shuffle map task和result task]执行时调用RDD.iterator获取指定partition的数据迭代器,这个过程中的MemoryStore.putIterator会遍历指定partition的所有records,获取每个value并将其存放在连续内存中,下面我们来分析具体的写入过程。

非序列化方式写入

putIteratorAsValues这个方法主要是用于存储级别是非序列化的情况,即直接以java对象的形式将数据存放在jvm堆内存上。在jvm堆内存上存放大量的对象并不是什么好事,gc压力大,挤占内存,可能引起频繁的gc,但是也有明显的好处,就是省去了序列化和反序列化耗时,而且直接从堆内存取数据显然比任何其他方式(磁盘和直接内存)都要快很多,所以对于内存充足且要缓存的数据量不是很大的情况,是一种不错的选择。该方法使用DeserializedValuesHolder然后调用putIterator方法来进行具体的写入,这个后面在分析。该方法成功时候返回写入内存的数据大小,失败时候返回PartiallyUnrolledIterator供DiskStore写入磁盘操作。

private[storage] def putIteratorAsValues[T](blockId: BlockId,values: Iterator[T],classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { // 非序列化数据只能写入到堆内内存val valuesHolder = new DeserializedValuesHolder[T](classTag) // 使用sizeTracker来采样计算数据的size,Vector存储unroll的数据putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match {case Right(storedSize) => Right(storedSize)case Left(unrollMemoryUsedByThisBlock) =>// 已经unroll的数据val unrolledIterator = if (valuesHolder.vector != null) {valuesHolder.vector.iterator} else {valuesHolder.arrayValues.toIterator}Left(new PartiallyUnrolledIterator(this,MemoryMode.ON_HEAP,unrollMemoryUsedByThisBlock, // 未读取的unrolled = unrolledIterator, // 已经读取到内存的iteratorrest = values))}
}

序列化方式写入

putIteratorAsBytes的实现结构基本和putIteratorAsValues是一样的。只不过这里的序列化形式存储使用的是SerializedMemoryEntry,valueHolder也选择了SerializedValuesHolder来进行,指定chunk的大小和存储内存类型,进行序列化写入,成功时候返回写入的大小,失败时候返回的是PartiallySerializedBlock,供DiskStore写入磁盘操作。

private[storage] def putIteratorAsBytes[T](blockId: BlockId,values: Iterator[T],classTag: ClassTag[T],memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = {require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")// Initial per-task memory to request for unrolling blocks (bytes).val initialMemoryThreshold = unrollMemoryThresholdval chunkSize = if (initialMemoryThreshold > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " +s"is too large to be set as chunk size. Chunk size has been capped to " +s"${Utils.bytesToString(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)}")ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH} else {initialMemoryThreshold.toInt}// 使用非序列化valueHolder,可以在堆外/堆内内存申请空间val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag,memoryMode, serializerManager)putIterator(blockId, values, classTag, memoryMode, valuesHolder) match {case Right(storedSize) => Right(storedSize)case Left(unrollMemoryUsedByThisBlock) =>Left(new PartiallySerializedBlock(this,serializerManager,blockId,valuesHolder.serializationStream,valuesHolder.redirectableStream,unrollMemoryUsedByThisBlock,memoryMode,valuesHolder.bbos,values,classTag))}
}

putIterator分析

从上面看到序列化方式以及非序列化方式写入到内存中都调用了putIterator来进行具体的写入操作,这个方法很长,但是逻辑相对简单,主要做的事情就是把数据一条一条往ValuesHolder中写,并周期性地检查内存,如果内存不够就通过内存管理器MemoryManager申请内存,每次申请当前内存量的1.5倍。最后,将ValuesHolder中的数据转移到一个数组中。最后还有关键的一步,就是释放展开内存,重新申请存储内存,主要步骤如下:

  1. 调用reserveUnrollMemoryForThisTask(),申请初始的展开内存,并记录该块使用了多少展开内存。
  2. 循环迭代块的数据,将其放入一个valueHolder中。
  3. 每当到了检查的时机<16个元素一检查>,如果已经展开的数据大小超过了当前的展开内存阈值,就再次调用reserveUnrollMemoryForThisTask()方法,试图申请新的展开内存,申请到之后,同时更新阈值。
  4. 所有数据都展开之后,标志keepUnrolling为真,表示展开成功。将valueHolder中的数据封装为MemoryEntry。
  5. 如果检查申请到的展开内存是否比实际大小还大,就释放掉多余的展开内存,并将它们返还给存储内存。
  6. 上面一切成功,将块BlockId与MemoryEntry的映射放入entries,并返回Right。注意这个方法返回值的类型是Either类型,它在Scala中表示不相交的两个结果的集合,即可能返回错误的结果(Left),或者正确的结果(Right)。
  7. 如果没有足够的展开内存,或者展开所有数据后keepUnrolling标志为假,都表示这次写入不成功,返回Left,其中又包含PartiallyUnrolledIterator,表示一个没有完全展开的迭代器。
 private def putIterator[T](blockId: BlockId,values: Iterator[T], // 需要写入的数据classTag: ClassTag[T],memoryMode: MemoryMode,valuesHolder: ValuesHolder[T]): Either[Long, Long] = {require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")// 已经展开的元素数量。var elementsUnrolled = 0 // MemoryStore是否仍然有足够的内存,以便于继续展开Block。var keepUnrolling = true // unrollMemoryThreshold 用来展开任何Block之前,初始请求的内存大小,可以修改属性spark.storage.unrollMemoryThreshold(默认为1MB)改变大小val initialMemoryThreshold = unrollMemoryThreshold// 检查内存是否足够的阀值,此值默认为16。即每展开16个元素就检查一次。val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)// 当前任务用于展开Block所保留的内存。var memoryThreshold = initialMemoryThreshold// 展开内存不充足时,请求增长的因子。此值默认为1.5。val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)// Block已经使用的展开内存大小计数器var unrollMemoryUsedByThisBlock = 0L// 请求足够的内存开始展开操作,默认为unrollMemoryThreshold,即1MkeepUnrolling =reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)if (!keepUnrolling) {// 无法请求到足够的初始内存,记录日志logWarning(s"Failed to reserve initial memory threshold of " +s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")} else { // 将申请到的内存添加到已使用的展开内存计数器中unrollMemoryUsedByThisBlock += initialMemoryThreshold}while (values.hasNext && keepUnrolling) {  // 如果还有元素,且申请到了足够的初始内存valuesHolder.storeValue(values.next())  // 将下一个元素添加到vector进行记录if (elementsUnrolled % memoryCheckPeriod == 0) { // 判断是否需要检查内存是否足够val currentSize = valuesHolder.estimatedSize() // 所有已经分配的内存 sizeTracker估算大小// If our vector's size has exceeded the threshold, request more memoryif (currentSize >= memoryThreshold) { // 所有已经分配的内存大于为当前展开保留的内存// 计算还需要请求的内存大小val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong// 尝试申请更多内存keepUnrolling =reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)if (keepUnrolling) { // 申请成功,将申请到的内存添加到已使用的展开内存计数器中,申请不成功时候下次循环就没办法了unrollMemoryUsedByThisBlock += amountToRequest}// New threshold is currentSize * memoryGrowthFactor// 更新为当前展开保留的内存大小memoryThreshold += amountToRequest}}// 完成了一次元素展开,展开个数加1elementsUnrolled += 1}// unroll是将不连续的内存<比方从文件中读取的数据iterator>存储到连续内存中<存储内存>if (keepUnrolling) { // 走到这里,说明计算的申请内存是足够的val entryBuilder = valuesHolder.getBuilder() // 构造器Block MemoryEntryval size = entryBuilder.preciseSize // sizeTracker估算出来的大小if (size > unrollMemoryUsedByThisBlock) { // 如果不足需要再次申请val amountToRequest = size - unrollMemoryUsedByThisBlockkeepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)if (keepUnrolling) { // 申请成功unrollMemoryUsedByThisBlock += amountToRequest}}if (keepUnrolling) {val entry = entryBuilder.build()// Synchronize so that transfer is atomicmemoryManager.synchronized { // 将展开Block的内存转换为存储Block的内存的方法releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock) // 先释放val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode) // 在申请真正size大小的存储内存assert(success, "transferring unroll memory to storage memory failed")}entries.synchronized {  // 将对应的映射关系添加到entries字典entries.put(blockId, entry)}logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))Right(entry.size)} else { // 已经完全unroll了,size是预估的,可能比实际的要少,需要再次向存储内存申请,但是存储内存不足,导致无法最终保存logUnrollFailureMessage(blockId, entryBuilder.preciseSize)Left(unrollMemoryUsedByThisBlock)}} else { // 存储内存不足,导致无法unroll成功,只有部分unrolllogUnrollFailureMessage(blockId, valuesHolder.estimatedSize())Left(unrollMemoryUsedByThisBlock)}}

读取块数据

getBytes对应的是读取SerializedMemoryEntry数据。getValues对应的是读取DeserializedMemoryEntry数据。

/** 获取数据 */
def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {val entry = entries.synchronized { entries.get(blockId) }entry match {case null => Nonecase e: DeserializedMemoryEntry[_] =>throw new IllegalArgumentException("should only call getBytes on serialized blocks")case SerializedMemoryEntry(bytes, _, _) => Some(bytes)}
}// 用于从内存中读取BlockId对应的Block(已经封装为Iterator)。
def getValues(blockId: BlockId): Option[Iterator[_]] = {val entry = entries.synchronized { entries.get(blockId) }entry match {case null => Nonecase e: SerializedMemoryEntry[_] =>throw new IllegalArgumentException("should only call getValues on deserialized blocks")case DeserializedMemoryEntry(values, _, _) =>val x = Some(values)x.map(_.iterator)}
}

淘汰缓存块

当存储内存不足或者执行内存不足时候,都可能需要淘汰缓存块。其执行流程如下:

  1. 循环遍历entries映射中的块,找出其中能够被淘汰的块。 能够淘汰的Block需要满足: 1.该Block使用的内存模式与申请的相同;2. BlockId对应的Block不是RDD,或者BlockId与blockId不是同一个RDD。
  2. 为这些块加写锁,保证当前正在被读取的块不会被淘汰掉,记录将要被淘汰的块ID。
  3. 如果腾出的空间已经达到了目标值,就调用嵌套定义的dropBlock()方法真正地移除这些块,最终仍然调用了BlockManager.dropFromMemory()方法。该方法会产生两种结果:一是块仍然存在,只是StorageLevel发生变化(比如转存到了磁盘),就只需解开它的写锁;二是块被彻底地移除,就得调用BlockInfoManager.remove()方法删掉它。最后将剩余未处理的块解锁。
  4. 如果腾出的空间最终仍然不能达到目标值,就不会执行淘汰动作,新的块也不会被存入。
  private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId],space: Long,memoryMode: MemoryMode): Long = {assert(space > 0)memoryManager.synchronized {var freedMemory = 0Lval rddToAdd = blockId.flatMap(getRddId)val selectedBlocks = new ArrayBuffer[BlockId]// 能够驱逐的Block需要满足: 1.该Block使用的内存模式与申请的相同。2. BlockId对应的Block不是RDD,或者BlockId与blockId不是同一个RDD。def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))} entries.synchronized {val iterator = entries.entrySet().iterator() // 遍历所有的,由于该数据结构底层使用的LRU,所以遍历顺序是从最远未使用的开始while (freedMemory < space && iterator.hasNext) {val pair = iterator.next()val blockId = pair.getKeyval entry = pair.getValueif (blockIsEvictable(blockId, entry)) { // 判断是否满足驱逐条件 if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) { // blockInfo需要获取写锁selectedBlocks += blockId // 需要驱逐的BlockIdfreedMemory += pair.getValue.size // 该Block占用的内存}}}}// 删除一个块def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {val data = entry match {case DeserializedMemoryEntry(values, _, _) => Left(values)case SerializedMemoryEntry(buffer, _, _) => Right(buffer)}// 该handler在BlockManager中实现val newEffectiveStorageLevel =blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)if (newEffectiveStorageLevel.isValid) { blockInfoManager.unlock(blockId) // 不能删除,释放写锁} else { blockInfoManager.removeBlock(blockId) // 删除}}if (freedMemory >= space) { // 能够释放的空间大于需要的空间var lastSuccessfulBlock = -1try {logInfo(s"${selectedBlocks.size} blocks selected for dropping " +s"(${Utils.bytesToString(freedMemory)} bytes)")(0 until selectedBlocks.size).foreach { idx => // 遍历删除val blockId = selectedBlocks(idx)val entry = entries.synchronized {entries.get(blockId)} if (entry != null) {dropBlock(blockId, entry)afterDropAction(blockId)}lastSuccessfulBlock = idx}logInfo(s"After dropping ${selectedBlocks.size} blocks, " +s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")freedMemory} finally { if (lastSuccessfulBlock != selectedBlocks.size - 1) { (lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx =>val blockId = selectedBlocks(idx) // 没删除的需要释放写锁blockInfoManager.unlock(blockId)}}}} else { // 不满足需要的内存blockId.foreach { id =>logInfo(s"Will not store $id")}// 释放写锁selectedBlocks.foreach { id =>blockInfoManager.unlock(id)}0L}}}

预留&归还内存

预留和归还内存比较简单,主要是申请内存,然后对onHeapUnrollMemoryMap/offHeapUnrollMemoryMap进行操作,记录TaskId和使用的展开内存的对应关系。

def reserveUnrollMemoryForThisTask(blockId: BlockId,memory: Long,memoryMode: MemoryMode): Boolean = {memoryManager.synchronized {// 获取memoryMode的内存val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)if (success) {val taskAttemptId = currentTaskAttemptId()val unrollMemoryMap = memoryMode match {case MemoryMode.ON_HEAP => onHeapUnrollMemoryMapcase MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap}// 记录当前TaskId占用的内存大小unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory}success}
}def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit = {val taskAttemptId = currentTaskAttemptId()memoryManager.synchronized { // 释放空间val unrollMemoryMap = memoryMode match {case MemoryMode.ON_HEAP => onHeapUnrollMemoryMapcase MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap}if (unrollMemoryMap.contains(taskAttemptId)) {val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))if (memoryToRelease > 0) {unrollMemoryMap(taskAttemptId) -= memoryToReleasememoryManager.releaseUnrollMemory(memoryToRelease, memoryMode)}if (unrollMemoryMap(taskAttemptId) == 0) {unrollMemoryMap.remove(taskAttemptId)}}}
}

参考

  1. https://blog.csdn.net/nazeniwaresakini/article/details/104220307
  2. https://www.cnblogs.com/zhuge134/archive/2004/01/13/11006860.html
  3. https://www.jianshu.com/p/87a36488993a
  4. https://juejin.im/post/6844903544152129550
  5. https://www.jianshu.com/p/8f4f58b4b8ab

spark存储管理源码分析系列之MemoryStore相关推荐

  1. Spark Shuffle源码分析系列之PartitionedPairBufferPartitionedAppendOnlyMap

    概述 SortShuffleWriter使用ExternalSorter进行ShuffleMapTask数据内存以及落盘操作,ExternalSorter中使用内存进行数据的缓存过程中根据是否需要ma ...

  2. Spark Shuffle源码分析系列之UnsafeShuffleWriter

    前面我们介绍了BypassMergeSortShuffleWriter和SortShuffleWriter,知道了它们的应用场景和实现方式,本节我们来看下UnsafeShuffleWriter,它使用 ...

  3. Spark DAGScheduler源码分析系列之一: 基础

    DAGScheduler DAGScheduler是Spark中比较重要的类,实现了面向DAG的高层次调度,DAGScheduler通过计算将DAG中的一系列RDD划分到不同的Stage,然后构建这些 ...

  4. k8s源码分析 pdf_Spark Kubernetes 的源码分析系列 - features

    1 Overview features 包里的代码,主要是用于构建 Spark 在 K8S 中的各类资源所需要的特征,个人觉得可以理解成这些 features 就是帮你写各类 Kind 的 YAML ...

  5. Kylin源码分析系列三—rowKey编码

    Kylin源码分析系列三-rowKey编码 注:Kylin源码分析系列基于Kylin的2.5.0版本的源码,其他版本可以类比. 1. 相关概念 前面介绍了Kylin中Cube构建的流程,但Cube数据 ...

  6. jQuery源码分析系列

    声明:本文为原创文章,如需转载,请注明来源并保留原文链接Aaron,谢谢! 版本截止到2013.8.24 jQuery官方发布最新的的2.0.3为准 附上每一章的源码注释分析 :https://git ...

  7. MyBatis 源码分析系列文章合集

    1.简介 我从七月份开始阅读MyBatis源码,并在随后的40天内陆续更新了7篇文章.起初,我只是打算通过博客的形式进行分享.但在写作的过程中,发现要分析的代码太多,以至于文章篇幅特别大.在这7篇文章 ...

  8. MyBatis 源码分析系列文章导读

    1.本文速览 本篇文章是我为接下来的 MyBatis 源码分析系列文章写的一个导读文章.本篇文章从 MyBatis 是什么(what),为什么要使用(why),以及如何使用(how)等三个角度进行了说 ...

  9. Spring IOC 容器源码分析系列文章导读

    1. 简介 前一段时间,我学习了 Spring IOC 容器方面的源码,并写了数篇文章对此进行讲解.在写完 Spring IOC 容器源码分析系列文章中的最后一篇后,没敢懈怠,趁热打铁,花了3天时间阅 ...

最新文章

  1. 9款超赞的AI开源项目!| 本周Github精选
  2. 热点面试题目—Java异常
  3. android ui秘笈,看图说话 – Android UI 设计秘笈 :Part I
  4. springboot 修改了端口不生效_Spring Boot 项目 Docker 化快速上手
  5. mod php是什么意思,mod_php模式原理探析
  6. C# WebService发布与调用方法(转)
  7. 移动直播连麦实现思路:整体篇
  8. 小练习——过滤掉出现次数最多的数据
  9. 数据结构与算法-- 八皇后问题(多种实现方案)
  10. js中字符串操作函数
  11. php 各种排序算法,PHP四种常见排序算法
  12. linux crontab 每隔一段时间执行一次
  13. FreeBSD--网络配置
  14. 英伟达显卡控制面板没有显示设置的三种解决方法
  15. linux连接苹果鼠标,Linux 5.13添加对苹果Magic Mouse 2和微软SAM的支持
  16. 华为销售专家LTC专家许浩明老师:流程是数字化转型的基础,以华为营销LTC,华为铁三角为例
  17. 远程桌面提示“用户帐户限制(例如,时间限制)会阻止你登录。请与系统管理员或技术支持联系以获取帮助。”
  18. 37. Hard Disk Drives
  19. 第十届蓝桥杯省赛原题及参考答案
  20. 计算机一级office软件,计算机一级office

热门文章

  1. 神仙级编程神器,吹爆!
  2. 为什么我会选择做软件测试
  3. 计算机网络之FTP、HTTP、DNS、P2P
  4. 云计算的SaaS、PaaS和IaaS三种服务模式之间,主要是什么关系?
  5. jquery省市区三级联动插件,无ajax
  6. Qt功能优化:Qt语音助手
  7. 详解python中的面向对象(下)
  8. JAVAjavawebJSP医药管理系统源码(jsp医药进销存管理系统jsp医院药品管理系统(药品进销存系统)
  9. 牵了手就不要随便分手
  10. Qt designer-窗口布局