Spark- 之不同Source产生RDD的分区数与数据分配

通常Spark的数据源可以分为很多中,这里主要是从源码剖析内存集合文件分区数的确定与数据分配。

1 集合RDD的分区与数据分配

具体看以下代码及注释。

package com.shufang.parallel_yuanliimport com.shufang.utils.ScUtil
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD/***/
object RddFromMemoryCollection {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[8]").setAppName("parellel").set("spark.default.parallelism", "5")val sc: SparkContext = new SparkContext(conf)/*** TODO makeRDD() 底层调用的就是 parallelize()* TODO 1 :如何确定分区的数量* 通常numSlices代表RDD的分区的个数,那么这个分区的个数呢可以手动指定,也可以使用默认值* 当手动指定时,RDD的分区个数:numSlices* 如果使用默认值,RDD的分区个数:numSlices => numSlices = defaultParallelism = defaultParallelism()* def defaultParallelism: Int = {* assertNotStopped()* taskScheduler.defaultParallelism* }** taskScheduler.defaultParallelism =* override def defaultParallelism(): Int =* backend.defaultParallelism()** override def defaultParallelism(): Int =* scheduler.conf.getInt("spark.default.parallelism", totalCores)* totalCores是当前环境master能够使用的最大核数,比如totalCores = local[*]或者* set spark.executor.cores = 5;* set executor.nums = 3;* =>>>>>> totalCores = 15* ==================================================================================* TODO 2 :如何给数据找准对应的分区进行分配* def parallelize[T: ClassTag](* seq: Seq[T],* numSlices: Int = defaultParallelism): RDD[T] = withScope {* assertNotStopped()* TODO 2.1 这是主要的数据分区分配的入口* new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())* }* TODO 2.2 找到每个分区的元素的下标范围:[start,end)* def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {* (0 until numSlices).iterator.map { i =>* val start = ((i * length) / numSlices).toInt* val end = (((i + 1) * length) / numSlices).toInt* (start, end)* }* }* TODO 2.3 seq是传入的集合,然后将范围内的元素进行切分给不同的分区* case _ =>* val array = seq.toArray // To prevent O(n^2) operations for List etc* positions(array.length, numSlices).map { case (start, end) =>* array.slice(start, end).toSeq* }.toSeq* TODO 当前有 5 个元素,分区数为2* 分区0  [0*5/2,1*5/2) => [0,2)  => (1,2)* 分区1  [1*5/2,2*5/2) => [2,5)  => (3,4,5)* 所以最终输出产生2个文件:* part-00000* 1* 2* part-00001* 3* 4* 5**/val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 2)rdd.saveAsTextFile("output")sc.stop()}
}

2 文件RDD的分区与数据分配

package com.shufang.parallel_yuanliimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RddFromFile {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[8]").setAppName("parellel").set("spark.default.parallelism", "5")val sc: SparkContext = new SparkContext(conf)/*** TODO 1 分区数确定:文件的分区数与minPartition这个参数有关,通常可以通过textFile指定最小分区数,但这个并不是最终分区数量!!* TODO 1.1 如果没有指定,那么minPartitions = defaultMinPartitions = math.min(defaultParallelism,2)* def textFile(* path: String,* minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {* assertNotStopped()* hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],* minPartitions).map(pair => pair._2.toString).setName(path)* }* TODO 1.2 很显然 Spark读取数据按照Hadoop TextInputFormat的方式进行读取,所以是按照行读取,分区计算方式如下:* extends FileInputFormat<LongWritable, Text>,进入FileInputFormat的 getSplits()获取切片的方法*  - totalSize : 文件的总字节大小  <= totalSize = file.getLength();*  - goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits); numSplits就是minPartitions* TODO 1.3 goalSize就是最终的分区存储的字节数量(如果能整除),假如 totalSize = 7 ,minPartitions(numSplits) = 2* => goalSize = 7 / 2 =  3 ... 1 ,现在每个分区的字节数为3,余数为1,按照Hadoop的分区规则,1+3/3 > 1.1* => 最终RDD的分区个数为:2 + 1 = 3** TODO 1.4 files/wordcount.txt为75个字节,包括换行符等,此时使用默认的defaultMinPartitions = 2* => totalSize = 75* => goalSize = 75/2 = 37...1  1/37 < 0.1 所以最终的分区个数为2* =======================================================================================================* TODO 2 数据的分区分配* TODO 2.1 数据按照行读取,偏移量不会重复读取* TODO 2.2 数据分配按照偏移量分配* 分区  偏移量    最终读取的行号,第一行22字节,第二行22字节,第三行31字节* 分区0 [0,37]   N1、N2* 分区1 [38,75]  N3* TODO 所以最终的数据第1,2行被分配到第一个分区文件* part-00000* spark   spark   hello* flink   world   hello* TODO 第三行被分配到第二个分区文件* part-00001* good man    good    good good**/val rdd: RDD[String] = sc.textFile("files/wordcount.txt")rdd.saveAsTextFile("output")sc.stop()}
}

Spark- 之不同Source产生RDD的分区数与数据分配相关推荐

  1. 学习笔记Spark(四)—— Spark编程基础(创建RDD、RDD算子、文件读取与存储)

    文章目录 一.创建RDD 1.1.启动Spark shell 1.2.创建RDD 1.2.1.从集合中创建RDD 1.2.2.从外部存储中创建RDD 任务1: 二.RDD算子 2.1.map与flat ...

  2. python spark进行大数据分析_第2天Python实战Spark大数据分析及调度-RDD编程

    Spark提供的主要抽象是resilient distributed dataset(RDD)弹性分布式数据集,它是跨集群节点划分的元素的集合,可以并行操作.通过从Hadoop文件系统(或任何其他Ha ...

  3. [Spark]PySpark入门学习教程---RDD介绍(2)

    一 RDD pyspark.RDD        SparkRDD RDD指的是弹性分布式数据集(Resilient Distributed Dataset),它是spark计算的核心.尽管现在都使用 ...

  4. Spark核心编程系列(一)——RDD详解

    目录 Spark核心编程系列--RDD详解(一) RDD概念 RDD与IO之间的关系 RDD的核心属性 RDD执行原理 基础编程 RDD创建 RDD的并行度与分区 参考 Spark核心编程系列--RD ...

  5. Spark论文思想之-基于RDD构建的模型(Shark的来龙去脉)

    3.1 介绍 首先RDD提供以下功能: 跨集群的不可变存储(在Spark中,记录是指Java Object) 使用键对数据进行分区控制 考虑分区的粗粒度运算符 由于是内存计算,所以低延迟 3.2 在R ...

  6. 【大数据开发】SparkCore——Spark作业执行流程、RDD编程的两种方式、简单算子

    文章目录 一.Spark作业执行流程(重点) 二.RDD编程 2.1创建RDD的⼆种⽅式: 2.2Transformation算⼦ 2.3Action算子 三.简单算子(必须掌握) 3.1 map.m ...

  7. Hive数据分析——Spark是一种基于rdd(弹性数据集)的内存分布式并行处理框架,比于Hadoop将大量的中间结果写入HDFS,Spark避免了中间结果的持久化...

    转自:http://blog.csdn.net/wh_springer/article/details/51842496 近十年来,随着Hadoop生态系统的不断完善,Hadoop早已成为大数据事实上 ...

  8. spark将rdd转为string_大数据技术之SparkCore(三)RDD依赖关系

    2.6.1 Lineage RDD只支持粗粒度转换,即在大量记录上执行的单个操作.将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区.RDD的Lineage会记录RDD的元数据信息 ...

  9. dataframe记录数_大数据系列之Spark SQL、DataFrame和RDD数据统计与可视化

    Spark大数据分析中涉及到RDD.Data Frame和SparkSQL的操作,本文简要介绍三种方式在数据统计中的算子使用. 1.在IPython Notebook运行Python Spark程序 ...

最新文章

  1. python迭代器生成器 学会再缩短一半开发效率 看看大牛是怎么写的
  2. 车辆动力学及控制pdf_Simulink软件仿真平台之车辆模型
  3. ios-UIButton-常用方法
  4. linux服务器防病毒,Linux系统中你不需要防病毒?_服务器评论-中关村在线
  5. VC2010“添加资源-引入JPG图片”会改变图片大小
  6. java 线程 释放_java线程似乎不会被释放
  7. WebRTC架构和协议栈-zz
  8. jms是java平台中面向_面向Web的JMS应用系统
  9. matlab时变函数,MATLAB在《复变函数》教学中的应用(图文)
  10. 幼儿园计算机课程心得,幼儿主题式课程教学心得体会
  11. html表格宽度设置没效果,html表格宽度设置失效
  12. 你离大牛就差这10家国内知名的慕课网站。
  13. Android Canvas rotate 和translate 两个方法的研究
  14. 杨卫华:新浪微博的架构发展历程(转)
  15. 树莓派WIFI配置遇到的坑 之 连接不上WIFI
  16. java英语介绍_java自我介绍英语口语
  17. android百度人脸采集免费,Android 调用百度人脸采集
  18. 将idea设置为中文
  19. 应试教育的困惑,如今得到了解答。
  20. 东社村计算机学校,苍南县义务教育阶段部分学校施教区范围

热门文章

  1. 二进制、八进制、十进制、十六进制之间的转换(图文解释)
  2. 【CentOS】CentOS安装yum
  3. 多个pdf怎么合并在一起?
  4. PPa-GO/NPs/PEG/DSPE焦脱镁叶绿酸-a修饰氧化石墨烯/纳米粒子/聚乙二醇/磷脂/细胞膜合成
  5. 如何在 macOS 12 Monterey 中使用快速笔记
  6. 移动端有哪些常见布局方式?
  7. Java 懒汉式代码
  8. 湖南本地购物网站——路在何方
  9. 。。。。AC米兰夺冠
  10. 平价降噪耳机有哪些?学生党降噪耳机推荐