概述

spark的内存管理有两套方案,新旧方案分别对应的类是UnifiedMemoryManager和StaticMemoryManager。

旧方案是静态的,storageMemory(存储内存)和executionMemory(执行内存)拥有的内存是独享的不可相互借用,故在其中一方内存充足,另一方内存不足但又不能借用的情况下会造成资源的浪费。新方案是统一管理的,初始状态是内存各占一半,但其中一方内存不足时可以向对方借用,对内存资源进行合理有效的利用,提高了整体资源的利用率。

总的来说内存分为三大块,包括storageMemory、executionMemory、系统预留,其中storageMemory用来缓存rdd,unroll partition,存放direct task result、广播变量,在 Spark Streaming receiver 模式中存放每个 batch 的 blocks。executionMemory用于shuffle、join、sort、aggregation 中的缓存。除了这两者以外的内存都是预留给系统的。

旧方案 StaticMemoryManager

在SparkEnv中会创建memoryManager:

val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)val memoryManager: MemoryManager =if (useLegacyMemoryManager) {new StaticMemoryManager(conf, numUsableCores)} else {UnifiedMemoryManager(conf, numUsableCores)}

默认使用的是统一管理方案UnifiedMemoryManager,这里我们简要的看看旧方案StaticMemoryManager。

storageMemory能分到的内存是:

systemMaxMemory * memoryFraction * safetyFraction

其中:

  • systemMaxMemory :Runtime.getRuntime.maxMemory,即JVM能获得的最大内存空间。
  • memoryFraction:由参数spark.storage.memoryFraction控制,默认0.6。
  • safetyFraction:由参数spark.storage.safetyFraction控制,默认是0.9,因为cache block都是估算的,所以需要一个安全系数来保证安全。

executionMemory能分到的内存是:

systemMaxMemory * memoryFraction * safetyFraction

其中:

  • systemMaxMemory :Runtime.getRuntime.maxMemory,即JVM能获得的最大内存空间。
  • memoryFraction:由参数spark.shuffle.memoryFraction控制,默认0.2。
  • safetyFraction:由参数spark.shuffle.safetyFraction控制,默认是0.8。

memoryFraction系数之外和安全系数之外的内存就是给系统预留的了。

executionMemory能分到的内存直接影响了shuffle中spill的频率,增加executionMemory可减少spill的次数,但storageMemory能cache的容量也相应减少。

execution 和 storage 被分配到内存后大小就一直不变了,每次申请内存都只能申请自己独有的不能相互借用,会造成资源的浪费。另外,只有 execution 内存支持 off heap,storage 内存不支持 off heap。

新方案 UnifiedMemoryManager

由于新方案中storageMemory和executionMemory是统一管理的,我们看看两者一共能拿到多少内存。

private def getMaxMemory(conf: SparkConf): Long = {val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)val reservedMemory = conf.getLong("spark.testing.reservedMemory",if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)val minSystemMemory = (reservedMemory * 1.5).ceil.toLongif (systemMemory < minSystemMemory) {throw new IllegalArgumentException(s"System memory $systemMemory must " +s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +s"option or spark.driver.memory in Spark configuration.")}// SPARK-12759 Check executor memory to fail fast if memory is insufficientif (conf.contains("spark.executor.memory")) {val executorMemory = conf.getSizeAsBytes("spark.executor.memory")if (executorMemory < minSystemMemory) {throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +s"$minSystemMemory. Please increase executor memory using the " +s"--executor-memory option or spark.executor.memory in Spark configuration.")}}val usableMemory = systemMemory - reservedMemoryval memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)(usableMemory * memoryFraction).toLong}

首先给系统内存reservedMemory预留了300M,若jvm能拿到的最大内存和配置的executor内存分别不足以reservedMemory的1.5倍即450M都会抛出异常,最后storage和execution能拿到的内存为:

 (heap space - 300) * spark.memory.fraction (默认为0.6)

storage和execution各占所获内存的50%。

申请storage内存

为某个blockId申请numBytes大小的内存:

override def acquireStorageMemory(blockId: BlockId,numBytes: Long,memoryMode: MemoryMode): Boolean = synchronized {assertInvariants()assert(numBytes >= 0)val (executionPool, storagePool, maxMemory) = memoryMode match {case MemoryMode.ON_HEAP => (onHeapExecutionMemoryPool,onHeapStorageMemoryPool,maxOnHeapStorageMemory)case MemoryMode.OFF_HEAP => (offHeapExecutionMemoryPool,offHeapStorageMemoryPool,maxOffHeapMemory)}// 申请的内存大于storage和execution内存之和if (numBytes > maxMemory) {// Fail fast if the block simply won't fitlogInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +s"memory limit ($maxMemory bytes)")return false}// 大于storage空闲内存if (numBytes > storagePool.memoryFree) {// There is not enough free memory in the storage pool, so try to borrow free memory from// the execution pool.val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)executionPool.decrementPoolSize(memoryBorrowedFromExecution)storagePool.incrementPoolSize(memoryBorrowedFromExecution)}storagePool.acquireMemory(blockId, numBytes)}
  • 若申请的numBytes比两者总共的内存还大,直接返回false,说明申请失败。
  • 若numBytes比storage空闲的内存大,则需要向executionPool借用
    • 借用的大小为此时execution的空闲内存和numBytes的较小值(个人观点应该是和(numBytes-storage空闲内存)的较小值)
    • 减小execution的poolSize
    • 增加storage的poolSize

即使向executionPool借用了内存,但不一定就够numBytes,因为不可能把execution正在使用的内存都接过来,接着调用了storagePool的acquireMemory方法在不够numBytes的情况下去释放storage中共cache的rdd,以增加storagePool.memoryFree的值:

def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {val numBytesToFree = math.max(0, numBytes - memoryFree)acquireMemory(blockId, numBytes, numBytesToFree)}

计算出向execution借了内存后还差多少内存才能满足numBytes,即需要释放的内存numBytesToFree 。接着调用了acquireMemory方法:

def acquireMemory(blockId: BlockId,numBytesToAcquire: Long,numBytesToFree: Long): Boolean = lock.synchronized {assert(numBytesToAcquire >= 0)assert(numBytesToFree >= 0)assert(memoryUsed <= poolSize)if (numBytesToFree > 0) {memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)}// NOTE: If the memory store evicts blocks, then those evictions will synchronously call// back into this StorageMemoryPool in order to free memory. Therefore, these variables// should have been updated.val enoughMemory = numBytesToAcquire <= memoryFreeif (enoughMemory) {_memoryUsed += numBytesToAcquire}enoughMemory}

当numBytesToFree 大于0的情况下,就真的要去释放缓存在memory中的block,释放完后再看空闲内存是否能满足numBytes,若满足则将numBytes加到已使用的变量里。

看看当需要从storay中释放block的时候是怎么释放的:

private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId],space: Long,memoryMode: MemoryMode): Long = {assert(space > 0)memoryManager.synchronized {var freedMemory = 0Lval rddToAdd = blockId.flatMap(getRddId)val selectedBlocks = new ArrayBuffer[BlockId]def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))}// This is synchronized to ensure that the set of entries is not changed// (because of getValue or getBytes) while traversing the iterator, as that// can lead to exceptions.entries.synchronized {val iterator = entries.entrySet().iterator()while (freedMemory < space && iterator.hasNext) {val pair = iterator.next()val blockId = pair.getKeyval entry = pair.getValueif (blockIsEvictable(blockId, entry)) {// We don't want to evict blocks which are currently being read, so we need to obtain// an exclusive write lock on blocks which are candidates for eviction. We perform a// non-blocking "tryLock" here in order to ignore blocks which are locked for reading:if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {selectedBlocks += blockIdfreedMemory += pair.getValue.size}}}}def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {val data = entry match {case DeserializedMemoryEntry(values, _, _) => Left(values)case SerializedMemoryEntry(buffer, _, _) => Right(buffer)}val newEffectiveStorageLevel =blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)if (newEffectiveStorageLevel.isValid) {// The block is still present in at least one store, so release the lock// but don't delete the block infoblockInfoManager.unlock(blockId)} else {// The block isn't present in any store, so delete the block info so that the// block can be stored againblockInfoManager.removeBlock(blockId)}}if (freedMemory >= space) {logInfo(s"${selectedBlocks.size} blocks selected for dropping " +s"(${Utils.bytesToString(freedMemory)} bytes)")for (blockId <- selectedBlocks) {val entry = entries.synchronized { entries.get(blockId) }// This should never be null as only one task should be dropping// blocks and removing entries. However the check is still here for// future safety.if (entry != null) {dropBlock(blockId, entry)}}logInfo(s"After dropping ${selectedBlocks.size} blocks, " +s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")freedMemory} else {blockId.foreach { id =>logInfo(s"Will not store $id")}selectedBlocks.foreach { id =>blockInfoManager.unlock(id)}0L}}}

spark中内存中的block都是通过memoryStore来存储的,用

private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)

来维护了blockId和MemoryEntry(对应value的包装)的关联,另外方法中还定义了两个方法,blockIsEvictable方法是判断遍历到的blockId和当前blockId是否属于同一个rdd,因为不能提出同一个rdd的另外一个block。dropBlock方法就是真正执行从内存中移除block的,若StorageLevel包括了使用disk,则会写到磁盘文件。

整段代码的逻辑简单概述就是:遍历当前memoryStore中存的每个block(不是和当前请求的block属于同于同一rdd),直到block对应的内存之和大于所需释放的内存才停止遍历,也有可能遍历完了都还不能满足所需的内存。若能释放的内存满足所需的内存,则真正执行移除,否则不移除,因为不可能一个block在内存中一部分,在磁盘一部分,最后返回真正剔除block释放的内存。

总结一下向StorageMemory申请内存的过程(在MemoryMode.ON_HEAP模式下):

  • 若numBytes大于storage和execution内存之和,抛异常。
  • 若numBytes大于storage空闲内存,向execution借用min(executionFree,numBytes)大的内存,并更新各自的poolSize。
  • 若申请完后还不够,则释放storage中的block来补足。
    • memoryStore缓存的block大小满足需要补足的大小,则真正执行剔除(遍历block直到内存满足需求对应的block),否则不剔除。
  • 最终若空闲内存满足numBytes则返回true,否则返回false。

申请execution内存

在execution内存不足向storage借用时,还是不满足所需内存的情况下能借多少借多少。看看在需要向execution申请内存时是怎么处理的(MemoryMode.ON_HEAP模式下):

override private[memory] def acquireExecutionMemory(numBytes: Long,taskAttemptId: Long,memoryMode: MemoryMode): Long = synchronized {assertInvariants()assert(numBytes >= 0)val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {case MemoryMode.ON_HEAP => (onHeapExecutionMemoryPool,onHeapStorageMemoryPool,onHeapStorageRegionSize,maxHeapMemory)case MemoryMode.OFF_HEAP => (offHeapExecutionMemoryPool,offHeapStorageMemoryPool,offHeapStorageMemory,maxOffHeapMemory)}/*** Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.** When acquiring memory for a task, the execution pool may need to make multiple* attempts. Each attempt must be able to evict storage in case another task jumps in* and caches a large block between the attempts. This is called once per attempt.*/def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {if (extraMemoryNeeded > 0) {// There is not enough free memory in the execution pool, so try to reclaim memory from// storage. We can reclaim any free memory from the storage pool. If the storage pool// has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim// the memory that storage has borrowed from execution.val memoryReclaimableFromStorage = math.max(storagePool.memoryFree,storagePool.poolSize - storageRegionSize)if (memoryReclaimableFromStorage > 0) {// Only reclaim as much space as is necessary and available:val spaceToReclaim = storagePool.freeSpaceToShrinkPool(math.min(extraMemoryNeeded, memoryReclaimableFromStorage))storagePool.decrementPoolSize(spaceToReclaim)executionPool.incrementPoolSize(spaceToReclaim)}}}/*** The size the execution pool would have after evicting storage memory.** The execution memory pool divides this quantity among the active tasks evenly to cap* the execution memory allocation for each task. It is important to keep this greater* than the execution pool size, which doesn't take into account potential memory that* could be freed by evicting storage. Otherwise we may hit SPARK-12155.** Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness* in execution memory allocation across tasks, Otherwise, a task may occupy more than* its fair share of execution memory, mistakenly thinking that other tasks can acquire* the portion of storage memory that cannot be evicted.*/def computeMaxExecutionPoolSize(): Long = {maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)}executionPool.acquireMemory(numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)}

这里先讲解这里面的两个方法:

maybeGrowExecutionPool就是需要向storage借内存的方法,能借用的最大内存memoryReclaimableFromStorage 为storage的空闲内存和storage向execution借用的内存(即已经使用也要释放来归还)的较大值,若memoryReclaimableFromStorage为0,则说明storage之前没有向execution借用内存,并且此时storage没有空闲的内存可借。

最终申请借用的是所需内存和memoryReclaimableFromStorage的较小值(缺多少借多少),跟进storagePool.freeSpaceToShrinkPool方法看看其实现:

def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemoryif (remainingSpaceToFree > 0) {// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:val spaceFreedByEviction =memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.spaceFreedByReleasingUnusedMemory + spaceFreedByEviction} else {spaceFreedByReleasingUnusedMemory}}

若storage空闲内存不足以所申请的内存,则需要通过释放storage中缓存的block来补充。

方法computeMaxExecutionPoolSize即计算的是execution拥有的最大可用内存。

接着通过这两个函数作为参数调用了方法executionPool.acquireMemory:

private[memory] def acquireMemory(numBytes: Long,taskAttemptId: Long,maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")// TODO: clean up this clunky method signature// Add this task to the taskMemory map just so we can keep an accurate count of the number// of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`if (!memoryForTask.contains(taskAttemptId)) {memoryForTask(taskAttemptId) = 0L// This will later cause waiting tasks to wake up and check numTasks againlock.notifyAll()}// Keep looping until we're either sure that we don't want to grant this request (because this// task would have more than 1 / numActiveTasks of the memory) or we have enough free// memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).// TODO: simplify this to limit each task to its own slotwhile (true) {val numActiveTasks = memoryForTask.keys.sizeval curMem = memoryForTask(taskAttemptId)// In every iteration of this loop, we should first try to reclaim any borrowed execution// space from storage. This is necessary because of the potential race condition where new// storage blocks may steal the free execution memory that this task was waiting for.maybeGrowPool(numBytes - memoryFree)// Maximum size the pool would have after potentially growing the pool.// This is used to compute the upper bound of how much memory each task can occupy. This// must take into account potential free memory as well as the amount this pool currently// occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,// we did not take into account space that could have been freed by evicting cached blocks.val maxPoolSize = computeMaxPoolSize()val maxMemoryPerTask = maxPoolSize / numActiveTasksval minMemoryPerTask = poolSize / (2 * numActiveTasks)// How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasksval maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))// Only give it as much memory as is free, which might be none if it reached 1 / numTasksval toGrant = math.min(maxToGrant, memoryFree)// We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;// if we can't give it this much now, wait for other tasks to free up memory// (this happens if older tasks allocated lots of memory before N grew)if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")lock.wait()} else {memoryForTask(taskAttemptId) += toGrantreturn toGrant}}0L  // Never reached}

里面定义了一个Task能使用的execution内存:

val maxPoolSize = computeMaxPoolSize()val maxMemoryPerTask = maxPoolSize / numActiveTasksval minMemoryPerTask = poolSize / (2 * numActiveTasks)

其中maxPoolSize 为从 storage 借用了内存后,executionMemoryPool 的最大可用内存,保证一个Task可用的内存在 1/2*numActiveTasks ~ 1/numActiveTasks 范围内,整体保证各个Task资源占用平衡。

向execution申请内存代码流程:

  1. 先获取Task目前已经分配到的内存。

  2. 当numBytes大于execution空闲内存,则会通过maybeGrowPool方法向storage借内存。

  3. 能获取的最大内存maxToGrant为numBytes和(maxMemoryPerTask - curMem)的较小值。

  4. 本次循环能获取真正的内存toGrant为maxToGrant和(execution向memory借用后可用的内存)的较小值。

  5. 若最终能申请的内存小于numBytes且申请的内存加上原来有的内存还不足以一个Task最小的使用内存minMemoryPerTask,则会阻塞,直到有足够的内存或者有新的Task进来减小了minMemoryPerTask的值。
    否则直接返回本次分配到的内存。

对于向storage和execution申请内存以及相互借用内存的方式至此讲解完成。用到storage和execution内存的地方很多(看概述),其中缓存rdd会向storage申请内存,运行Task会向execution申请内存,接下来分别看看是在什么时候申请的。

缓存 RDD

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {if (storageLevel != StorageLevel.NONE) {getOrCompute(split, context)} else {computeOrReadCheckpoint(split, context)}}

每个rdd分区的数据都是通过对应的迭代器得到,其中若存储级别不为NONE,则会先尝试从储存介质中(内存、磁盘文件等)获取,第一次获取当然都没有,只有先计算完缓存起来以供后续的计算直接获取。缓存序列化和非序列化的数据的缓存方式不一样,非序列化的缓存的代码是:

memoryStore.putIteratorAsValues(blockId, iterator(), classTag)
 private[storage] def putIteratorAsValues[T](blockId: BlockId,values: Iterator[T],classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")// Number of elements unrolled so farvar elementsUnrolled = 0// Whether there is still enough memory for us to continue unrolling this blockvar keepUnrolling = true// Initial per-task memory to request for unrolling blocks (bytes).val initialMemoryThreshold = unrollMemoryThreshold// How often to check whether we need to request more memoryval memoryCheckPeriod = 16// Memory currently reserved by this task for this particular unrolling operationvar memoryThreshold = initialMemoryThreshold// Memory to request as a multiple of current vector sizeval memoryGrowthFactor = 1.5// Keep track of unroll memory used by this particular block / putIterator() operationvar unrollMemoryUsedByThisBlock = 0L// Underlying vector for unrolling the blockvar vector = new SizeTrackingVector[T]()(classTag)// Request enough memory to begin unrollingkeepUnrolling =reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)if (!keepUnrolling) {logWarning(s"Failed to reserve initial memory threshold of " +s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")} else {unrollMemoryUsedByThisBlock += initialMemoryThreshold}// Unroll this block safely, checking whether we have exceeded our threshold periodicallywhile (values.hasNext && keepUnrolling) {vector += values.next()if (elementsUnrolled % memoryCheckPeriod == 0) {// If our vector's size has exceeded the threshold, request more memoryval currentSize = vector.estimateSize()if (currentSize >= memoryThreshold) {val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLongkeepUnrolling =reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)if (keepUnrolling) {unrollMemoryUsedByThisBlock += amountToRequest}// New threshold is currentSize * memoryGrowthFactormemoryThreshold += amountToRequest}}elementsUnrolled += 1}if (keepUnrolling) {// We successfully unrolled the entirety of this blockval arrayValues = vector.toArrayvector = nullval entry =new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)val size = entry.sizedef transferUnrollToStorage(amount: Long): Unit = {// Synchronize so that transfer is atomicmemoryManager.synchronized {releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)assert(success, "transferring unroll memory to storage memory failed")}}// Acquire storage memory if necessary to store this block in memory.val enoughStorageMemory = {if (unrollMemoryUsedByThisBlock <= size) {val acquiredExtra =memoryManager.acquireStorageMemory(blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)if (acquiredExtra) {transferUnrollToStorage(unrollMemoryUsedByThisBlock)}acquiredExtra} else { // unrollMemoryUsedByThisBlock > size// If this task attempt already owns more unroll memory than is necessary to store the// block, then release the extra memory that will not be used.val excessUnrollMemory = unrollMemoryUsedByThisBlock - sizereleaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)transferUnrollToStorage(size)true}}if (enoughStorageMemory) {entries.synchronized {entries.put(blockId, entry)}logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))Right(size)} else {assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,"released too much unroll memory")Left(new PartiallyUnrolledIterator(this,MemoryMode.ON_HEAP,unrollMemoryUsedByThisBlock,unrolled = arrayValues.toIterator,rest = Iterator.empty))}} else {// We ran out of space while unrolling the values for this blocklogUnrollFailureMessage(blockId, vector.estimateSize())Left(new PartiallyUnrolledIterator(this,MemoryMode.ON_HEAP,unrollMemoryUsedByThisBlock,unrolled = vector.iterator,rest = values))}}

代码太长了,我自己看到都头大了,没事,咱一点一点的慢慢来~

参数中的blockId是一个block的唯一标示,格式是"rdd_" + rddId + "_" + splitIndex,value就是该partition对应数据的迭代器。

  1. 通过reserveUnrollMemoryForThisTask方法向Storage申请initialMemoryThreshold(初始值可通过spark.storage.unrollMemoryThreshold配置,默认1M)的内存来unroll 迭代器:

    def reserveUnrollMemoryForThisTask(blockId: BlockId,memory: Long,memoryMode: MemoryMode): Boolean = {memoryManager.synchronized {val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)if (success) {val taskAttemptId = currentTaskAttemptId()val unrollMemoryMap = memoryMode match {case MemoryMode.ON_HEAP => onHeapUnrollMemoryMapcase MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap}unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory}success}
    }

    跟进acquireUnrollMemory可看见底层调用的就是前面所讲的向storage申请内存的方法acquireStorageMemory,若申请成功则将对应的onHeapUnrollMemoryMap加上申请到的内存,即unroll使用的内存。

  2. 若申请成功则跟新unrollMemoryUsedByThisBlock的值,即在该block上unroll使用的内存。
  3. 接着进行遍历,停止遍历的条件有两个,一是迭代器全部遍历完,二是没有申请到内存。
    • 每迭代一条数据都会加到SizeTrackingVector类型的vector中(底层由数组实现),每迭代16次都会估算vector的大小是否超过了memoryThreshold(申请的内存)。
    • 若超过了memoryThreshold,则会计算再次申请内存的大小,1.5倍当前vector大小-已经申请到的内存大小。
    • 再次向Storage申请内存,若申请成功,则跟新unrollMemoryUsedByThisBlock,继续遍历进入下次循环,否则停止遍历。
  4. 循环结束后,若keepUnrolling 为 true,则说明values 一定被全部展开了;若为false,则没有全部被展开,说明没有申请到足够的内存来展开这个values,意味着该partition缓存到内存失败。
  5. 在values全部成功展开的前提下,会将vector构造成一个DeserializedMemoryEntry对象,其中包括数据的大小,接着会将展开后的数据大小和申请的内存大小作比较:
    • 若申请的内存比数据小,则再次向storage申请对应的大小,申请成功则将unroll使用的内存转化到storage中去,转化对应的逻辑是:释放掉该Task占用的所有unroll内存,又向storage申请对应的内存,其实unroll内存就是storage内存,即操作的都是storage的内存,减去某值又加上某值,结果没有变,但流程还得这么走,因为为了将 MemoryStore 和 MemoryManager 的解耦。
    • 若申请的内存比数据大,则释放掉对应的unroll内存,接着将unroll使用的内存转化到storage中去。
    • 最后将blockId和对应的entry加入到memorySore所管理的entries中去。

缓存序列化rdd支持 ON_HEAP 和 OFF_HEAP,和缓存非序列化rdd的方式类似,只是以流的形式写到bytebuffer中,其中MemoryMode 如果是 ON_HEAP,这里的 ByteBuffer 是 HeapByteBuffer(堆上内存);而如果是 OFF_HEAP,这里的 ByteBuffer 则是 DirectByteBuffer(指向的是堆外内存)。最后根据数据构建成SerializedMemoryEntry来保存在memoryStore的entries中。

shuffle中execution内存的使用

在shuffle write的时候,并不会直接将数据写到磁盘(详情请看Shuffle Write解析),而是先写到一个集合中,此集合占用的内存就是execution内存,初始给的大小是5M,可通过spark.shuffle.spill.initialMemoryThreshold进行设置,每写一次数据就判断是否需要溢写到磁盘,溢写之前还尝试会向execution申请来避免溢写,代码如下:

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {var shouldSpill = falseif (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {// Claim up to double our current memory from the shuffle memory poolval amountToRequest = 2 * currentMemory - myMemoryThresholdval granted = acquireMemory(amountToRequest)myMemoryThreshold += granted// If we were granted too little memory to grow further (either tryToAcquire returned 0,// or we already had more memory than myMemoryThreshold), spill the current collectionshouldSpill = currentMemory >= myMemoryThreshold}shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold// Actually spillif (shouldSpill) {_spillCount += 1logSpillage(currentMemory)spill(collection)_elementsRead = 0_memoryBytesSpilled += currentMemoryreleaseMemory()}shouldSpill}

当insert&update的次数是32的倍数且当前集合的大小已经大于等于了已经申请到的内存,此时会尝试向execution申请更多的内存来避免spill,申请的大小为2倍当前集合大小减去已经申请到的内存大小,跟进acquireMemory方法:

 public long acquireMemory(long size) {long granted = taskMemoryManager.acquireExecutionMemory(size, this);used += granted;return granted;}

这不就是我们前面讲的向execution申请内存的方法吗,这里就不再叙述。

参考

http://www.jianshu.com/p/999ef21dffe8

[spark] 内存管理 MemoryManager 解析相关推荐

  1. Apache Spark 内存管理详解

    原文出处: IBM developerWorks Spark 作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色.理解 Spark 内存管理的基本原理,有助于更好地开发 ...

  2. Spark 内存管理详解(下):内存管理

    本文转自:Spark内存管理详解(下)--内存管理 本文最初由IBM developerWorks中国网站发表,其链接为Apache Spark内存管理详解 在这里,正文内容分为上下两篇来阐述,这是下 ...

  3. spark内存管理模块

    Spark 作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色.理解 Spark 内存管理的基本原理,有助于更好地开发 Spark 应用程序和进行性能调优.本文旨在梳理出 ...

  4. 万字最全Spark内存管理详解

    今天和大家介绍Spark的内存模型,干货多多,不要错过奥~ 与数据频繁落盘的Mapreduce引擎不同,Spark是基于内存的分布式计算引擎,其内置强大的内存管理机制,保证数据优先内存处理,并支持数据 ...

  5. spark从入门到精通spark内存管理详解- 堆内堆外内存管理

    前言 Spark作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色.理解Spark内存管理的基本原理,有助于更好地开发Spark应用程序和进行性能调优.本文将详细介绍两部 ...

  6. Spark内存管理(3)—— 统一内存管理设计理念

    Spark内存管理系列文章:  Spark内存管理(1)-- 静态内存管理  Spark内存管理(2)-- 统一内存管理 在本文中,将会对各个内存的分布以及设计原理进行详细的阐述  相对于静态内存模型 ...

  7. Spark内存管理(2)—— 统一内存管理

    Spark内存管理系列文章:  Spark内存管理(1)-- 静态内存管理 堆内内存 Spark 1.6之后引入的统一内存管理机制,与静态内存管理的区别在于Storage和Execution共享同一块 ...

  8. Spark内存管理(1)—— 静态内存管理

    Spark内存管理简介 Spark从1.6开始引入了动态内存管理模式,即执行内存和存储内存之间可以相互抢占  Spark提供了2种内存分配模式: 静态内存管理 统一内存管理 本系列文章将分别对这两种内 ...

  9. [搬运工]移动游戏加载性能和内存管理全解析

    UWA 六月直播季 | 6.8 移动游戏加载性能和内存管理全解析 https://blog.uwa4d.com/archives/livebroadcast6-8.html 因为这篇文章没有提供PPT ...

最新文章

  1. 同向逆向、多车道线检测
  2. Unsafe(转载)
  3. android webview静态方法,android – 将静态HTML加载到Webview中
  4. matlab simulink笔记02——延迟模块delay与单位延迟模块unit delay
  5. 反应式服务中的线程本地状态可用性
  6. ButterKnife8.5.1最新版本使用详细步骤
  7. 1.17 用Laplace变换解常微分方程
  8. Biopython---part 1
  9. Swing界面设计工具
  10. shell 的大于等于小于等
  11. 利用call与ret实现段内子函数
  12. 1189 SEARCH
  13. 如何注册和设置 zoom Background
  14. Excel-如何隐藏/显示某些行项目或者列项目?
  15. 萌新记一次在openEuler下安装VMware-Tools的失败经历
  16. php授权微信自动扣款,【微信支付】微信代扣开发者文档
  17. [bzoj1003]物流运输trans
  18. 黑白棋(Othello)
  19. html中设置网站全局颜色为黑白
  20. Ubuntu系统安装CUDA或NVIDIA驱动后出现循环登录问题的Solution (附:building kernel modules error)

热门文章

  1. 51单片机 跑步机控制器设计
  2. TS H5页面中判断在微博,微信等环境下
  3. .NET Framework 4.8等微软官方下载地址
  4. Fiber resonator gyroscope: sensitivity and thermal nonreciprocity论文笔记
  5. 当雅虎CEO梅耶尔成为乔布斯:她为何没能挽回雅虎颓势
  6. python源程序执行的方式边编译边执行_五年级Python试题
  7. html旅游地图制作,基于 Leaflet 的旅游地图相册实现
  8. 动态数码管原理解释及多种写法,消影
  9. 如何限制网页只能在微信内置浏览器中打开?
  10. 我心中的未来计算机100字,未来的学校作文100字