概述

SortShuffleWriter使用ExternalSorter进行ShuffleMapTask数据内存以及落盘操作,ExternalSorter中使用内存进行数据的缓存过程中根据是否需要map-side聚合以及是否需要排序来选择不同的内存存储方式,分别为PartitionedPairBufferPartitionedAppendOnlyMap。我们先看下两种数据结构的异同点:

  1. PartitionedAppendOnlyMap中数据存储在父类AppendOnlyMap的data数组中,PartitionedPairBuffer数据就存在该类的data数组中;
  2. PartitionedAppendOnlyMap间接的继承SizeTrackerPartitionedPairBuffer是直接继承SizeTracker,用来进行要记录数据采样大小,以便上层进行适时的申请内存以及溢写磁盘操作;
  3. AppendOnlyMap会对元素在内存中进行更新或聚合,而PartitionedPairBuffer不支持map端聚合操作,只起到数据缓冲的作用;
  4. AppendOnlyMap的行为更像map,元素以散列的方式放入data数组,而PartitionedPairBuffer的行为更像collection,元素都是从data数组的起始索引0和1开始连续放入的;
  5. 两者都实现了WritablePartitionedPairCollection,可以根据partitonId排序,也可以根据partitionId+key进行排序操作返回排序后的迭代器数据。

下面我们来详细的分析两者的实现过程。

SizeTracker

Spark是一个内存计算框架,因此内存是重要的资源,Shuffle过程中大量的使用执行内存,所以精确地估计内存使用情况,意义重大。SizeTracker就是Spark中来抽样估计集合使用内存大小的特质。PartitionedAppendOnlyMapSizeTrackingAppendOnlyMap都使用它来进行内存估计,适时的为数据申请内存以及溢写到磁盘,但是它是近似估计,会有偏差,有可能会导致OOM的发生。

我们先来看下sizeTracker的属性:

// 采样增长的速率。例如,速率为2时,分别对在1,2,4,8...位置上的元素进行采样
private val SAMPLE_GROWTH_RATE = 1.1// 样本队列,最后两个样本将被用于估算大小
private val samples = new mutable.Queue[Sample]// 平均每次更新的字节数
private var bytesPerUpdate: Double = _// 更新操作(包括插入和更新)的总次数
private var numUpdates: Long = _// 下次采样时,numUpdates的值,即numUpdates的值增长到与nextSampleNum相同时,才会再次采样
private var nextSampleNum: Long = _
  1. SAMPLE_GROWTH_RATE是一个斜率,代表下次抽样时候更新的次数应该是这次抽样更新次数的1.1倍,比如上次是更新10000次时候抽样,下次抽样就得是更新11000次时候再抽样,可以避免每次更新都抽样<开销过大>,减少抽样花销,同样由于不是每次抽样所以会导致内存计算只是近似计算,有一定的概率导致OOM;
  2. samples是一个队列, 里面的类型是样例类Sample,包含了当前样本对应的内存大小以及对应的第几次更新,队列中只有两个元素,通过两个元素近似估计最终占用空间大小;
  3. bytesPerUpdate是抽样之后得到区间增长量/个数增长量,代表每次更新字节数速率,就是一个斜率;
  4. numUpdates就是代表抽样集合里面元素个数,当前总个数减去上一次采样的个数就是这段时间内采样总个数,它乘以3中的速率就是这段时间的近似内存增长量;
  5. nextSampleNum代表下次要抽样的时候集合的个数,就是此次抽样时候的个数*1.1。

那么什么时候抽样,如何抽样以及估算大小的呢?SizeTracker提供了afterUpdate在更新数据后进行更新抽样,提供了takeSample用来做具体的抽样,estimateSize来估算大小。

更新抽样

afterUpdate提供了向集合中更新了元素后的后续操作,主要是更新目前已经更新元素的个数,以及达到采样间隔时候进行采样。

// 用于向集合中更新了元素之后进行回调,以触发对集合的采样。
protected def afterUpdate(): Unit = {// 更新numUpdatesnumUpdates += 1if (nextSampleNum == numUpdates) { // 如果nextSampleNum与numUpdates相等// 调用takeSample()方法采样takeSample()}
}

采样

takeSample来实施真正的采样操作,先调用SizeEstimator的estimate()方法估算集合的大小<具体的估算方法我们后续在专门来分析,这个估算是耗时的,所以不能每添加一个就估算一次>,然后加入到sample队列里面,队列里面只保留最新的两次更新的<size, 更新次数>,然后队列翻转,通过公式

​ (倒数第1个样本记录的大小 - 倒数第2个样本记录的大小) / (倒数第1个样本的采样编号 - 倒数第2个的采样编号)

计算平均每次更新字节数速率,最后更新下一次抽象的位置。

private def takeSample(): Unit = {// 调用SizeEstimator的estimate()方法估算集合的大小,并将估算的大小和numUpdates作为样本放入队列samples中。samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates))// 仅适用最后的两个样本进行推断if (samples.size > 2) { // 保留样本队列的最后两个样本// 队首出队,扔掉一个无用样本samples.dequeue()}// 将队列翻转后进行匹配val bytesDelta = samples.toList.reverse match {// 分别是 倒数第1个,倒数第2个和剩下的case latest :: previous :: tail =>// (倒数第1个样本记录的大小 - 倒数第2个样本记录的大小) / (倒数第1个样本的采样编号 - 倒数第2个的采样编号)(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)case _ => 0}// 得到根据采样计算的每次更新字节数速率,最小为0bytesPerUpdate = math.max(0, bytesDelta)// 机选下次采样的采样号nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
}

大小估计

最后,我们来看下提供给使用方获取目前集合内存占用大小的方法,当前的numUpdates与上次采样的numUpdates之间的差值,乘以bytesPerUpdate作为估计要增加的大小,即采样的样本数目*样本的平均大小=该时间段内的大小,然后加上上次采样时集合的精确估算大小<通过SizeEstimator进行的计算>,即为最终的集合大小。

def estimateSize(): Long = {assert(samples.nonEmpty)// 使用当前的numUpdates与上次采样的numUpdates之间的差值,乘以bytesPerUpdate作为估计要增加的大小。val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)// 将上次采样时的集合大小与extrapolatedDelta相加作为估算的集合大小(samples.last.size + extrapolatedDelta).toLong
}

清空设置

当集合占用内存过大,无法在申请内存时候,需要进行spill到磁盘操作,这时候会回收内存,重新设置采样,将相关参数都清空,主要进行更新次数,下次采样次数以及采样队列。

protected def resetSamples(): Unit = {// 将numUpdates设置为1numUpdates = 1// 将nextSampleNum设置为1nextSampleNum = 1// 清空samples中的样本samples.clear()// 调用takeSample()方法采集样本takeSample()
}

WritablePartitionedPairCollection

正如其名字一样WritablePartitionedPairCollection是一个带有partitionId的key-value的集合特质,提供了高效的内存排序,并且可以按照keyComparator规定的排序方式,提供给写入磁盘 一个WritablePartitionedIterator接口,写入数据到磁盘。

抽象方法

insert方法需要具体实现方进行实现,一般是将partition-key作为key,value作为value进行插入数据,partitionedDestructiveSortedIterator则是需要给定一个key的比较器,然后返回对集合中的数据按照分区ID的顺序进行迭代的迭代器。这两个方法都交由是实现方进行实现。

// 将键值对与相关联的分区插入到集合中。
def insert(partition: Int, key: K, value: V): Unit//  根据给定的对key进行比较的比较器,返回对集合中的数据按照分区ID的顺序进行迭代的迭代器。此方法需要子类实现。
def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
: Iterator[((Int, K), V)]

默认排序器

WritablePartitionedPairCollection提供了两个默认的排序器,一个是按照patitionId进行排序,如下所示,只对分区排序,不对同一分区内的key进行排序。

// 生成对由partition id和key构成的两个对偶对象按照partition id进行排序的比较器。
def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] {// 对由partition id和key构成的两个对偶对象按照partition id进行排序override def compare(a: (Int, K), b: (Int, K)): Int = {// 根据partition id进行比较a._1 - b._1}
}

另外提供了一个先按照分区id进行排序,再按照key进行排序的比较器,需要提供key的比较器keyComparator,如下所示:

// 生成对由partition id和key构成的两个对偶对象先按照partition id进行比较,再根据指定的键比较器按照key进行第二级比较的比较器。
def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = {new Comparator[(Int, K)] {override def compare(a: (Int, K), b: (Int, K)): Int = {// 对partition id和key构成的两个对偶对象按照partition id比较val partitionDiff = a._1 - b._1if (partitionDiff != 0) { // 第一级比较已经区分出了胜负// 返回比较结果partitionDiff} else { // 第一级比较没有区分出胜负// 再根据指定的键比较器按照key进行第二级比较keyComparator.compare(a._2, b._2)}}}
}

排序迭代器

当内存存储不足时候,需要spill到磁盘操作,spill之前需要对内存中的数据进行排序,然后返回一个可以写入到WritablePartitionedIterator的匿名实现类的实例。

// 迭代每个元素,将每个元素通过DiskBlockObjectWriter写出到磁盘。迭代的每个元素都是按照分区ID进行排序的。这个操作可能会破坏底层集合。
def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]])
: WritablePartitionedIterator = {// 获得对集合中的数据按照分区ID的顺序进行迭代的迭代器,该方法由子类实现val it = partitionedDestructiveSortedIterator(keyComparator)// 创建并返回WritablePartitionedIterator的匿名实现类的实例new WritablePartitionedIterator {private[this] var cur = if (it.hasNext) it.next() else null// 使用DiskBlockObjectWriter将键值对写入磁盘def writeNext(writer: DiskBlockObjectWriter): Unit = {writer.write(cur._1._2, cur._2)cur = if (it.hasNext) it.next() else null}// 用于判断是否还有下一个元素def hasNext(): Boolean = cur != null// 用于获取下一个元素的分区的IDdef nextPartition(): Int = cur._1._1}
}

PartitionedPairBuffer

介绍完内存估计的SizeTracker和带有分区-key组合且提供排序写入的迭代器的WritablePartitionedPairCollection后,我们来看下ExternalSorter中具体的内存实现结构PartitionedPairBufferPartitionedAppendOnlyMap

PartitionedPairBuffer将键值对缓存在内存中,并支持对元素进行排序的数据结构,它继承于WritablePartitionedPairCollection,并且实现了SizeTracker接口,由于内部是由Array存储<最大长度Int.MaxValue>的,而且每个元素会占用相邻的两个位置,所以MAXIMUM_CAPACITYInt.MaxValue / 2

private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)extends WritablePartitionedPairCollection[K, V] with SizeTracker{val MAXIMUM_CAPACITY = Int.MaxValue / 2 // 2 ^ 30 - 1private var data = new Array[AnyRef](2 * initialCapacity)
}

PartitionedPairBuffer继承于WritablePartitionedPairCollection,实现了insertpartitionedDestructiveSortedIterator两个方法。

添加元素

由于PartitionedPairBuffer只是一个数据缓冲区,不需要对元素进行聚合操作等,所以添加元素直接将元素append到数组的back即可,不过需要先判断数据容量是否已经满了,满了则需要扩容。然后首先会将<partition, key>作为Tuple放在2*curSize位置上,然后相邻位置2*curSize+1放具体的value,添加完毕后需要进行重采样操作。

// 用于将key的分区ID、key及value添加到PartitionedPairBuffer底层的data数组中。
def insert(partition: Int, key: K, value: V): Unit = {if (curSize == capacity) { // 如果底层data数组已经满了,则对其进行扩容growArray()}// 从存放方式可以看出,它是顺序进行存放,不考虑键重复的问题// 将key及其分区ID作为元组放入data数组data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])// 将value放入data数组data(2 * curSize + 1) = value.asInstanceOf[AnyRef]// 增加已经放入data数组的key与value的数量curSize += 1// 对集合大小进行采样afterUpdate()
}

数组扩容

数组扩容时候将数组的size扩大为两倍,且不超过最大的容量,然后直接用System.arraycopy方法进行复制即可,最后由于数组容量变化还要将SizeTracker进行重置,以便后续估算更准确。

private def growArray(): Unit = {// 防止PartitionedPairBuffer的容量超过MAXIMUM_CAPACITY的限制if (capacity >= MAXIMUM_CAPACITY) {throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements")}// 计算对PartitionedPairBuffer进行扩充后的容量大小val newCapacity =// 扩容的容量不能超过MAXIMUM_CAPACITYif (capacity * 2 < 0 || capacity * 2 > MAXIMUM_CAPACITY) { // OverflowMAXIMUM_CAPACITY} else {capacity * 2}// 创建一个两倍于新容量的大小的新数组val newArray = new Array[AnyRef](2 * newCapacity)// 将底层data数组的每个元素都拷贝到新数组中System.arraycopy(data, 0, newArray, 0, 2 * capacity)// 将新数组设置为底层的data数组data = newArray// 将PartitionedPairBuffer的当前容量设置为新的容量大小capacity = newCapacity// 使用SizeTracker的resetSamples()方法对样本进行重置,以便估算准确resetSamples()
}

排序后的迭代器

根据是否执行keyComparator来选择WritablePartitionedPairCollection默认的两个排序器,如果指定了排序器,则会使用基于partitionId+key的二级比较方法,否则只按照partitionId进行排序即可,指定好排序器后使用TimSort进行排序,然后返回迭代器,迭代器只需要从数组开始往后遍历即可,每次遍历需要选取两个元素,第一个元素是(partitionId, key)的Tuple组合,第二个元素是具体的Value值。

// 根据给定的对key进行比较的比较器,返回对集合中的数据按照分区ID的顺序进行迭代的迭代器
override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
: Iterator[((Int, K), V)] = {// 生成比较器,如果没有指定,则使用partitionComparator生成比较器val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)// 执行内置排序。这其中用到了TimSort,也就是优化版的归并排序。new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)// 获得对data中的数据进行迭代的迭代器iterator
}private def iterator(): Iterator[((Int, K), V)] = new Iterator[((Int, K), V)] {var pos = 0override def hasNext: Boolean = pos < curSizeoverride def next(): ((Int, K), V) = {if (!hasNext) {throw new NoSuchElementException}// 依次返回data数组中的键值对val pair = (data(2 * pos).asInstanceOf[(Int, K)], data(2 * pos + 1).asInstanceOf[V])pos += 1pair}
}

SizeTrackingAppendOnlyMap

SizeTrackingAppendOnlyMap是继承自AppendOnlyMap并实现了SizeTracker特质,所以是一个只能增加不能删除而且可以预估Map大小的Map,实现比较简单,重写了AppendOnlyMap中的更新、聚合以及扩容的逻辑,将SizeTracker的采样以及置为默认逻辑添加到了具体的方法中,实现比较简单,源码如下:

// 更新或添加键值对
override def update(key: K, value: V): Unit = {// 使用AppendOnlyMap的update()方法更新super.update(key, value)// 调用SizeTracker的方法完成采样super.afterUpdate()
}// 聚合操作
override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {// 使用AppendOnlyMap的changeValue()方法进行聚合val newValue = super.changeValue(key, updateFunc)// 调用SizeTracker的方法完成采样super.afterUpdate()// 返回聚合得到的新的值newValue
}// 扩容操作
override protected def growTable(): Unit = {// 调用AppendOnlyMap的growTable()方法进行聚合super.growTable()// 调用SizeTracker的方法对样本进行重置,提高AppendOnlyMap的大小估算准确性resetSamples()
}

PartitionedAppendOnlyMap

PartitionedAppendOnlyMap继承了SizeTrackingAppendOnlyMap,同时实现了WritablePartitionedPairCollection特质。因此该类是一个带有采样估算的AppendOnlyMap,同时还可以存储分区ID并以分区ID进行排序。主要实现了WritablePartitionedPairCollection中未实现的方法。

添加元素

添加元素只需要把<partition, key>重新组合为key,然后交由SizeTrackingAppendOnlyMap处理即可,前面已经讲述过具体的插入逻辑,可以参考Spark Shuffle源码分析系列之AppendOnlyMap。

def insert(partition: Int, key: K, value: V): Unit = {// 使用AppendOnlyMap的update()方法进行更新update((partition, key), value)
}

排序后的迭代器

如果制定了给定的对key进行比较的比较器,则使用默认的对集合中的数据按照分区ID的顺序+key排序的比较器,否则使用只按照分区ID排序的比较器,然后交由AppendOnlyMapdestructiveSortedIterator进行排序获取相应的排序后的迭代器,具体的排序逻辑参考Spark Shuffle源码分析系列之AppendOnlyMap。

def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
: Iterator[((Int, K), V)] = {// 生成比较器val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)// 对AppendOnlyMap底层的data数组进行整理和排序后获得迭代器destructiveSortedIterator(comparator)
}

总结

至此我们分析好了ShuffleMapTask中基于内存缓存的数据结构,由于是基于内存的计算,所以要进行内存占用情况的估算防止OOM,另外根据map-side是否有聚合和排序操作选择不同的数据结构进行处理,下一节我们开始分析SortShuffleWriter的实现,看是如何进行内存和磁盘的写入和权衡的。

参考

  1. https://dataknocker.github.io/2014/07/23/spark-appendonlymap/
  2. http://reader.epubee.com/books/mobile/86/86ace180a75c902ade14ef11fccba342/text00209.html
  3. https://www.turbofei.wang/spark/2016/12/26/spark%E5%86%85%E5%AD%98%E9%A2%84%E6%B5%8B

Spark Shuffle源码分析系列之PartitionedPairBufferPartitionedAppendOnlyMap相关推荐

  1. Spark Shuffle源码分析系列之UnsafeShuffleWriter

    前面我们介绍了BypassMergeSortShuffleWriter和SortShuffleWriter,知道了它们的应用场景和实现方式,本节我们来看下UnsafeShuffleWriter,它使用 ...

  2. Spark DAGScheduler源码分析系列之一: 基础

    DAGScheduler DAGScheduler是Spark中比较重要的类,实现了面向DAG的高层次调度,DAGScheduler通过计算将DAG中的一系列RDD划分到不同的Stage,然后构建这些 ...

  3. k8s源码分析 pdf_Spark Kubernetes 的源码分析系列 - features

    1 Overview features 包里的代码,主要是用于构建 Spark 在 K8S 中的各类资源所需要的特征,个人觉得可以理解成这些 features 就是帮你写各类 Kind 的 YAML ...

  4. Kylin源码分析系列三—rowKey编码

    Kylin源码分析系列三-rowKey编码 注:Kylin源码分析系列基于Kylin的2.5.0版本的源码,其他版本可以类比. 1. 相关概念 前面介绍了Kylin中Cube构建的流程,但Cube数据 ...

  5. jQuery源码分析系列

    声明:本文为原创文章,如需转载,请注明来源并保留原文链接Aaron,谢谢! 版本截止到2013.8.24 jQuery官方发布最新的的2.0.3为准 附上每一章的源码注释分析 :https://git ...

  6. MyBatis 源码分析系列文章合集

    1.简介 我从七月份开始阅读MyBatis源码,并在随后的40天内陆续更新了7篇文章.起初,我只是打算通过博客的形式进行分享.但在写作的过程中,发现要分析的代码太多,以至于文章篇幅特别大.在这7篇文章 ...

  7. MyBatis 源码分析系列文章导读

    1.本文速览 本篇文章是我为接下来的 MyBatis 源码分析系列文章写的一个导读文章.本篇文章从 MyBatis 是什么(what),为什么要使用(why),以及如何使用(how)等三个角度进行了说 ...

  8. Spring IOC 容器源码分析系列文章导读

    1. 简介 前一段时间,我学习了 Spring IOC 容器方面的源码,并写了数篇文章对此进行讲解.在写完 Spring IOC 容器源码分析系列文章中的最后一篇后,没敢懈怠,趁热打铁,花了3天时间阅 ...

  9. Spring IOC 容器源码分析系列文章导读 1

    1. 简介 Spring 是一个轻量级的企业级应用开发框架,于 2004 年由 Rod Johnson 发布了 1.0 版本.经过十几年的迭代,现在的 Spring 框架已经非常成熟了.Spring ...

最新文章

  1. Oracle字符集问题总结
  2. php使用5.2.,请问php5.2.5版本的$_FILES函数的用法?
  3. java 鼠标精灵_纯Java实现跨平台鼠标键盘模拟、找图找色,Java版按键精灵
  4. 批量创建Linux用户账号
  5. npm audit fix
  6. 软件测试从业 3 年+了,怎么兼顾 管理 与 自身成长?
  7. eclipse测试java程序_java-同一项目中的Eclipse junit测试
  8. 「三分钟系列07」3分钟看懂哈夫曼树与哈夫曼编码
  9. Linux进程间通信源码剖析,共享内存(shmget函数详解)
  10. Android微信/QQ红包自动抢(AccessibilityService)
  11. ios 滤镜处理(详细滤镜介绍)及处理方法
  12. POI生成Excel
  13. 概率论与统计推断(四) ------ 统计推断
  14. 【bug】Failed at the node-sass@4.14.1 postinstall script(终于圆满解决)
  15. Android:相对布局RelativeLayout常用属性
  16. “Shopee杯” 武汉大学(网络预选赛)A - A Monument For Heroes
  17. 鹏宇成Revit族库管理器RevitFamily2013
  18. 记一次国产压铸模拟软件的使用过程
  19. 华为无线学习笔记--WLAN基础调优
  20. gstreamer学习笔记---demux使用

热门文章

  1. axis1.4 java_Axis 1.4 使用指南
  2. Identity and Access Management - 介绍
  3. PC企业微信HOOK最新版
  4. 井通科技欧洲研发中心在柏林正式成立
  5. 如何借助 AI ,生成专属图标? #iconify AI
  6. 南大通用GBase8s 常用SQL语句(234)
  7. 【Electron】vue+electron代码签名(mac篇)
  8. MySQL字段类型汇总及用法(超详细)
  9. 造梦西游html5,造梦西游OL(新服开启)
  10. 民工网www.iamworker.com技术专题