一、前提

每一个过程的任务数,对应一个inputSplit1, Partition输入可能以多个文件的形式存储在HDFS上,,每个File都包含了很多块,(128M切分),称为Block。

当Spark读取这些文件作为输入时,会根据具体数据格式对应的InputFormat进行解析,一般是将若干个Block合并成一个输入分片,称为InputSplit,注意InputSplit不能跨越文件。

随后将为这些输入分片生成具体的TaskInputSplit与Task是一一对应的关系。

随后这些具体的Task每个都会被分配到集群上的某个节点的某个Executor去执行。

  • 每个节点可以起一个或多个Executor。
  • 每个Executor由若干core组成,每个Executor的每个core一次只能执行一个Task。
  • 每个Task执行的结果就是生成了目标RDD的一个partiton。

注意: 这里的core是虚拟的core而不是机器的物理CPU核,可以理解为就是Executor的一个工作线程。Task被执行的并发度 = Executor数目 * 每个Executor核数(=core总个数)

1、partition的数目设置:

对于数据读入阶段,例如sc.textFile,输入文件被划分为多少InputSplit就会需要多少初始Task。

Map阶段partition数目保持不变。

Reduce阶段,RDD的聚合会触发shuffle操作,聚合后的RDD的partition数目跟具体操作有关,

例如repartition操作会聚合成指定分区数,还有一些算子是可配置的。

RDD在计算的时候,每个分区都会起一个task,所以rdd的分区数目决定了总的task数目

申请的计算节点(Executor)数目和每个计算节点核数,决定了你同一时刻可以并行执行的task。

比如:

RDD有100个分区,那么计算的时候就会生成100个task,设置task间并行的参数是conf spark.sql.shuffle.partitions=100

你的资源配置为10个计算节点,(执行器excutor)  --num-executors 10    默认为2一般设置在50-100之间,每个2个核,executor-cores 2   一般 2~4 为宜。同一时刻可以并行的task数目为20,计算这个RDD就需要5个轮次。Task被执行的并发度 = Executor数目 * 每个Executor核数(=core总个数)

如果计算资源不变,你有101个task的话,就需要6个轮次,在最后一轮中,只有一个task在执行,其余核都在空转。

如果资源不变,你的RDD只有2个分区,那么同一时刻只有2个task运行,其余18个核空转,造成资源浪费。

这就是在spark调优中,增大RDD分区数目,增大任务并行度的原因

2、spark分区 什么时候增加的,增加有什么用?

接下来的描述,是针对于sparksql(也就是把数据加载成Dataset之后再处理)来说的。

1.增加分区数,可以增加并行度,当spark申请的cpu核心足够的情况下,可以同时跑不同分区的数据(因为一个分区的数据,只能由一个核心来跑,不能多个)

2.手动增加,使用repartition来将所有数据打散

3.自动增加,spark有个参数:spark.sql.shuffle.partitions,默认值为200。也就是说当触发shuffle逻辑的时候,数据会自动分为200个分区运行,但是在数据量大的情况下,每个分区的数据量太大,而且假设spark申请到了300个核心,但是因为分区数只有200,会导致只有200个核心在运行,另外100个核心在空转(虽然占用资源但是却不干活)。所以可以将该参数设置为500甚至更大,来增加分区数和并行度。

二、repartition适用场景

使用repartition 使得任务能够并行执行的话,分配的core的数量一定要略微大于最大的分区数,才能使得所有的task能够并行执行。

--conf spark.sql.shuffle.partitions=100 #默认大小为200。
        这里举个例子,比如当前RDD有100个分区,如果我分配的是50个core(spark.executor.instances * spark.executor.cores),那么同时执行的任务最多只有50。同一个操作需分成两部分串行执行,这个时候如果有一个节点有问题(并行度是50 -1*2 = 48),那么就需要再次申请资源,如果资源紧张,需要等待,如果申请的时候申请的是54个core,虽然看起来有点浪费,但是对于任务的稳定性而言,却可能会有很大的提升。当然最好的视情况是分配102-110比较合理。

1、处理能力不够

RDD单个分区数据量比较大,或者单个分区处理比较慢,都可以通过repartition进行操作,这个时候numPartitions自然是要比当前的分区数要大一些。

单个分区数据量比较大,这种case在读取和处理hdfs文件的时候并不常见,因为默认spark读取hdfs文件的分区数和hdfs文件的block的数量是一致的。这个mr的mapper数量类似。但是对于Spark Streaming 任务,如果是window窗口较长,就比较容易出现单个分区数据量较大的情况。

还有一种case,比如每个需要有外部的一些读写操作,这个时候大量数据在一个partition中,会因为外部存储、单机处理能力,网络等的性能瓶颈导致数据每个partition的数据处理变慢,举个例子,往redis里写数据,如果每个partition数据量特别大,如果使用的是redis的非批量的,基本每个数据需要一次数据请求,假设采用的是pipleline的形式,依然会因为数据量很大导致这个分区的写入很慢,从而导致任务执行时间较长。这种问题在Spark Streaming的任务中影响比较大。这个时候通过repartition增加分区数量效果也会很明显。

2、数据倾斜

数据倾斜,这个自然不不用说了,使用repartition可以将数据进行打散,避免倾斜导致的执行耗时不均匀的问题。虽然这种情况是完全可以减少任务执行的时间,但是对于数据量比价小的分区而言,很快就能处理完,此时如果executor空闲的话,其实是比较浪费资源的。所以如果是通过这个处理数据倾斜的话,建议开启动态资源分配。另外使用reapartition处理数据倾斜的时候,最好提前统计一下key的分布,这个比较常见的方法就是:
1 如果是hive的数据

select  count(1)  from table group by key

2 如果是rdd中统计

val keyCount =rdd.map((k,v) =>(k,1)).reduceByKey((a,b) =>(a+b)).sortBy(_._2,false).take(100)
print keyCount

需要进行合并场景

本身需要进行分区合并的时候使用coalesce是最合适的,原因就在于coalesce默认shuffle是false,也就是说,如果100个分区 想要变成50个分区,这个时候coalesce并不会进行shuffle操作。进行coalesce的场景,数据分区很多,但是每个分区数较少,这个时候其实并不太影响执行效率,但是对于一些需要外部链接,或者写入文件的场景来说,这是很浪费资源的。
需要使用shuffle进行分区数减少的场景,原始的rdd分区数据分散不是很均匀,比如 当前RDD是100个分区,想要合并成50个分区,但是100个分区总数据分布从10K-200M分配不均,这个时候为了方便下游数据处理,我们可以将数据进行shuffle形式的合并。

repartition是spark开发中使用非常多的一个操作,而且使用的是否合理直接影响到任务执行的快慢以及成功与否。是否需要进行repartition,以及repartition的数量是一个需要开发的同学多方面综合考虑的结果,并没有一成不变的经验。但是我这儿依然给出一个具体的思路以供参考:确定原始数据分区数量,以及分区大小,key的分布式,参考集群资源的是否充足以及资源使用分布(cpu和内存使用比例),然后综合考虑。

spark.default.parallelism    只有在处理RDD时才会起作用,对Spark SQL的无效。
spark.sql.shuffle.partitions 则是对sparks SQL专用的设置

RDD分区的一个分区原则:尽可能是得分区的个数等于集群核心数目

无论是本地模式、Standalone模式、YARN模式或Mesos模式,我们都可以通过spark.default.parallelism来配置其默认分区个数,若没有设置该值,则根据不同的集群环境确定该值

三、Spark数据分区方式

在Spark中,RDD(Resilient Distributed Dataset)是其最基本的抽象数据集,其中每个RDD是由若干个Partition组成。在Job运行期间,参与运算的Partition数据分布在多台机器的内存当中。这里可将RDD看成一个非常大的数组其中Partition是数组中的每个元素,并且这些元素分布在多台机器中。

图一中,RDD1包含了5个Partition,RDD2包含了3个Partition,这些Partition分布在4个节点中。

Spark包含两种数据分区方式:HashPartitioner(哈希分区)和RangePartitioner(范围分区)。一般而言,对于初始读入的数据是不具有任何的数据分区方式的。数据分区方式只作用于<Key,Value>形式的数据。因此,当一个Job包含Shuffle操作类型的算子时,如groupByKey,reduceByKey etc,此时就会使用数据分区方式来对数据进行分区,即确定某一个Key对应的键值对数据分配到哪一个Partition中。在Spark Shuffle阶段中,共分为Shuffle Write阶段和Shuffle Read阶段,其中在Shuffle Write阶段中,Shuffle Map Task对数据进行处理产生中间数据,然后再根据数据分区方式对中间数据进行分区。最终Shffle Read阶段中的Shuffle Read Task会拉取Shuffle Write阶段中产生的并已经分好区的中间数据。图2中描述了Shuffle阶段与Partition关系。下面则分别介绍Spark中存在的两种数据分区方式。

一、HashPartitioner(哈希分区)

HashPartitioner采用哈希的方式对<Key,Value>键值对数据进行分区。其数据分区规则为 partitionId = Key.hashCode % numPartitions,其中partitionId代表该Key对应的键值对数据应当分配到的Partition标识,Key.hashCode表示该Key的哈希值,numPartitions表示包含的Partition个数。图3简单描述了HashPartitioner的数据分区过程。

2、HashPartitioner源码详解

  • HashPartitioner源码较为简单,这里不再进行详细解释。
class HashPartitioner(partitions: Int) extends Partitioner {require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")/*** 包含的分区个数*/def numPartitions: Int = partitions/*** 获得Key对应的partitionId*/def getPartition(key: Any): Int = key match {case null => 0case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)}override def equals(other: Any): Boolean = other match {case h: HashPartitioner =>h.numPartitions == numPartitionscase _ =>false}override def hashCode: Int = numPartitions
}def nonNegativeMod(x: Int, mod: Int): Int = {val rawMod = x % modrawMod + (if (rawMod < 0) mod else 0)
}

二、RangePartitioner(范围分区)

Spark引入RangePartitioner的目的是为了解决HashPartitioner所带来的分区倾斜问题,也即分区中包含的数据量不均衡问题。HashPartitioner采用哈希的方式将同一类型的Key分配到同一个Partition中,因此当某一或某几种类型数据量较多时,就会造成若干Partition中包含的数据过大问题,而在Job执行过程中,一个Partition对应一个Task,此时就会使得某几个Task运行过慢。RangePartitioner基于抽样的思想来对数据进行分区。图4简单描述了RangePartitioner的数据分区过程。

2、RangePartitioner源码详解
① 确定采样数据的规模:RangePartitioner默认对生成的子RDD中的每个Partition采集20条数据,样本数据最大为1e6条。

// 总共需要采集的样本数据个数,其中partitions代表最终子RDD中包含的Partition个数
val sampleSize = math.min(20.0 * partitions, 1e6)

② 确定父RDD中每个Partition中应当采集的数据量:这里注意的是,对父RDD中每个Partition采集的数据量会在平均值上乘以3,这里是为了后继在进行判断一个Partition是否发生了倾斜,当一个Partition包含的数据量超过了平均值的三倍,此时会认为该Partition发生了数据倾斜,会对该Partition调用sample算子进行重新采样。

// 被采样的RDD中每个partition应该被采集的数据,这里将平均采集每个partition中数据的3倍
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt

③ 调用sketch方法进行数据采样:sketch方法返回的结果为<采样RDD的数据量,<partitionId, 分区数据量,分区采样的数据量>>。在sketch方法中会使用水塘抽样算法对待采样的各个分区进行数据采样,这里采用水塘抽样算法是由于实现无法知道每个Partition中包含的数据量,而水塘抽样算法可以保证在不知道整体的数据量下仍然可以等概率地抽取出每条数据。图4简单描述了水塘抽样过程。

// 使用sketch方法进行数据抽样
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)/*** @param rdd 需要采集数据的RDD* @param sampleSizePerPartition 每个partition采集的数据量* @return <采样RDD数据总量,<partitionId, 当前分区的数据量,当前分区采集的数据量>>*/
def sketch[K : ClassTag](rdd: RDD[K],sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {val shift = rdd.idval sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>val seed = byteswap32(idx ^ (shift << 16))// 使用水塘抽样算法进行抽样,抽样结果是个二元组<Partition中抽取的样本量,Partition中包含的数据量>val (sample, n) = SamplingUtils.reservoirSampleAndCount(iter, sampleSizePerPartition, seed)Iterator((idx, n, sample))}.collect()val numItems = sketched.map(_._2).sum(numItems, sketched)
}

④ 数据抽样完成后,需要对不均衡的Partition重新进行抽样,默认当Partition中包含的数据量大于平均值的三倍时,该Partition是不均衡的。当采样完成后,利用样本容量和RDD中包含的数据总量,可以得到整体的一个数据采样率fraction。利用此采样率对不均衡的Partition调用sample算子重新进行抽样。

// 计算数据采样率
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
// 存放采样Key以及采样权重
val candidates = ArrayBuffer.empty[(K, Float)]
// 存放不均衡的Partition
val imbalancedPartitions = mutable.Set.empty[Int]
//(idx, n, sample)=> (partition id, 当前分区数据个数,当前partition的采样数据)
sketched.foreach { case (idx, n, sample) =>// 当一个分区中的数据量大于平均分区数据量的3倍时,认为该分区是倾斜的if (fraction * n > sampleSizePerPartition) {imbalancedPartitions += idx}// 在三倍之内的认为没有发生数据倾斜else {// 每条数据的采样间隔 = 1/采样率 = 1/(sample.size/n.toDouble) = n.toDouble/sample.sizeval weight = (n.toDouble / sample.length).toFloat// 对当前分区中的采样数据,对每个key形成一个二元组<key, weight>for (key <- sample) {candidates += ((key, weight))}}
}
// 对于非均衡的partition,重新采用sample算子进行抽样
if (imbalancedPartitions.nonEmpty) {val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)val seed = byteswap32(-rdd.id - 1)val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()val weight = (1.0 / fraction).toFloatcandidates ++= reSampled.map(x => (x, weight))
}

⑤ 确定各个Partition的Key范围:使用determineBounds方法来确定每个Partition中包含的Key范围,先对采样的Key进行排序,然后计算每个Partition平均包含的Key权重,然后采用平均分配原则来确定各个Partition包含的Key范围。如当前采样Key以及权重为:

<1, 0.2>, 
<2, 0.1>, 
<3, 0.1>, 
<4, 0.3>,
<5, 0.1>,
<6, 0.3>,

现在将其分配到3个Partition中,则每个Partition的平均权重为:(0.2 + 0.1 + 0.1 + 0.3 + 0.1 + 0.3) / 3 = 0.36。

此时Partition1 ~ 3分配的Key以及总权重为

<Partition1, {1, 2, 3}, 0.4> 
<Partition2, {4, 5}, 0.4> 
<Partition1, {6}, 0.3>。

/*** @param candidates 未按采样间隔排序的抽样数据* @param partitions 最终生成的RDD包含的分区个数* @return 分区边界*/
def determineBounds[K : Ordering : ClassTag](candidates: ArrayBuffer[(K, Float)],partitions: Int): Array[K] = {val ordering = implicitly[Ordering[K]]// 对样本按照key进行排序val ordered = candidates.sortBy(_._1)// 抽取的样本容量val numCandidates = ordered.size// 抽取的样本对应的采样间隔之和val sumWeights = ordered.map(_._2.toDouble).sum// 平均每个分区的步长val step = sumWeights / partitionsvar cumWeight = 0.0var target = step// 分区边界值val bounds = ArrayBuffer.empty[K]var i = 0var j = 0var previousBound = Option.empty[K]while ((i < numCandidates) && (j < partitions - 1)) {val (key, weight) = ordered(i)cumWeight += weight// 当前的采样间隔小于target,继续迭代,也即这些key应该放在同一个partition中if (cumWeight >= target) {// Skip duplicate values.if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {bounds += keytarget += stepj += 1previousBound = Some(key)}}i += 1}bounds.toArray
}

⑥ 计算每个Key所在Partition:当分区范围长度在128以内,使用顺序搜索来确定Key所在的Partition,否则使用二分查找算法来确定Key所在的Partition。

/*** 获得每个Key所在的partitionId*/
def getPartition(key: Any): Int = {val k = key.asInstanceOf[K]var partition = 0// 如果得到的范围不大于128,则进行顺序搜索if (rangeBounds.length <= 128) {// If we have less than 128 partitions naive searchwhile (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {partition += 1}}// 范围大于128,则进行二分搜索该key所在范围,即可得到该key所在的partitionIdelse {// Determine which binary search method to use only once.partition = binarySearch(rangeBounds, k)// binarySearch either returns the match location or -[insertion point]-1if (partition < 0) {partition = -partition-1}if (partition > rangeBounds.length) {partition = rangeBounds.length}}if (ascending) {partition} else {rangeBounds.length - partition}
}

四、自定义分区(定义partitioner个数)

体案例:对List里面的单词进行wordcount,并且输出按照每个单词的长度分区输出到不同文件里面

//只需要继承Partitioner,重写两个方法
class MyPartitioner(val num:Int) extends Partitioner {//这里定义partitioner个数override def numPartitions: Int = num//这里定义分区规则override def getPartition(key: Any): Int = {val len = key.toString.length //根据单词长度对分区个数取模len % num}
}

App的使用:

bject testMyPartitioner {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test").setMaster("local[*]")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(List("lijie hello lisi", "zhangsan wangwu mazi", "hehe haha nihaoa heihei lure hehe hello word"))val rdd2=rdd1.flatMap(_.split(" ")).map(x=>{(x,1)})//这里指定自定义分区,然后输出val rdd3 =rdd2.sortBy(_._2).partitionBy(new MyPartitioner(4)).mapPartitions(x=>x).saveAsTextFile("file:///f:/out")println(rdd2.collect().toBuffer)sc.stop()}
}
  • 结果:

因为这里定义的是4个partition 所以最后产生4个文件

其中part-00000 和 part-00001如下:

其中part-00002 和 part-00003如下:

其中part-00000中zhangsan的长度对4取模为0和这个文件中其他较短的单词一样,所以在一个分区, part-00003没有内容,说明上面的单词的长度对4取模结果没有为3的

参考:spark的自定义partitioner_allen的博客-CSDN博客

Spark分区 partition 详解相关推荐

  1. Spark RDD 论文详解(四)表达 RDDs

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  2. Spark 内存管理详解(下):内存管理

    本文转自:Spark内存管理详解(下)--内存管理 本文最初由IBM developerWorks中国网站发表,其链接为Apache Spark内存管理详解 在这里,正文内容分为上下两篇来阐述,这是下 ...

  3. Spark RDD 论文详解(五)实现

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  4. MS SQL Server:分区表、分区索引详解

    MS SQL Server:分区表.分区索引 详解 1. 分区表简介 使用分区表的主要目的,是为了改善大型表以及具有各种访问模式的表的可伸缩性和可管理性.  大型表:数据量巨大的表.  访问模式: ...

  5. Spark RDD 论文详解(三)Spark 编程接口

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  6. Spark RDD 论文详解(一)摘要和介绍

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  7. Spark RDD 论文详解(二)RDDs

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  8. Spark RDD 论文详解(七)讨论

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  9. Linux磁盘分区论文3000字,磁盘分区对齐详解与配置 – Linux篇

    磁盘分区对齐详解与配置 – Linux篇 介绍 许多系统管理员可能不曾听过磁盘分区对齐之说,甚至一些有经验的存储管理员对分区对齐也不甚了解.磁盘分区不对齐现象是什么,为什么会造成比较严重的性能下降?相 ...

最新文章

  1. 1小时学会:最简单的iOS直播推流(一)介绍
  2. ​kdevelop用法_weixin_44594953的博客-CSDN博客_kdevelop​
  3. python获取当前目录路径和文件
  4. Axure RP pro 6.5 密钥
  5. python mk趋势检验_【C语言】MK趋势检验C语言代码
  6. iconpath 微信小程序_【报Bug】微信小程序 map 标记点iconPath图标 苹果手机 不能单个设置了。以前没有问题。现在不知道为啥不行了...
  7. 【android】进程优先级(Framework设置优先级,LowMemoryKiller查杀)
  8. 前端样板资源概览及总评
  9. JavaScript 面向对象编程(三) —— 函数进阶 / 严格模式 / 高阶函数 / 闭包 / 浅拷贝和深拷贝
  10. OpenCV-美食—鲜美滤镜
  11. gpg加密命令 linux_Ubuntu下加密命令GPG和KEY
  12. count是java关键字吗_countinue关键字和break关键字与java基本格式
  13. docker的macvlan网络
  14. 小龙秋招【面试笔记】正式发布,速来围观!(已有40+同学斩获大厂offer)
  15. FileZilla下载及基本用法
  16. 小米一直显示在android,小米三刷机失败!开机一直显示power by android
  17. 基于JPEG压缩编码的数据压缩算法的研究与实现(转)
  18. dellr740服务器智能风扇开启,Dell PowerEdge R740 机架式服务器,配置磁盘阵列的方法...
  19. 三、Python学习(五)海龟模块turtle使用案列-西瓜切图
  20. java 获取两颜色值的中间值

热门文章

  1. 用Hutool工具包制作图形验证码
  2. VisualVM安装插件报错 总结
  3. java VisualVm远程连接
  4. 在arcgis常见的叠加分析情况汇总 (转)
  5. IEEE浮点数 换算方法【超易懂详解】
  6. ubuntu 16.04 启用root用户方法
  7. YTU-CST冬训大本营
  8. 视频伪原创教程 修改短视频md5
  9. vue简单明了的中文转英文
  10. 网络编程学习,项目er图