Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现
/** Spark SQL源代码分析系列文章*/
接上一篇文章Spark SQL Catalyst源代码分析之Physical Plan。本文将介绍Physical Plan的toRDD的详细实现细节:
我们都知道一段sql,真正的运行是当你调用它的collect()方法才会运行Spark Job,最后计算得到RDD。
lazy val toRdd: RDD[Row] = executedPlan.execute()
Spark Plan基本包括4种操作类型,即BasicOperator基本类型,还有就是Join、Aggregate和Sort这样的稍复杂的。
如图:
一、BasicOperator
1.1、Project
这个f函数事实上是new了一个MutableProjection,然后循环的对每一个partition进行Convert。
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {override def output = projectList.map(_.toAttribute)override def execute() = child.execute().mapPartitions { iter => //对每一个分区进行f映射@transient val reusableProjection = new MutableProjection(projectList) iter.map(reusableProjection)}
}
通过观察MutableProjection的定义,能够发现。就是bind references to a schema 和 eval的过程:
假设输入的Row已经有Schema了,则传入的Seq[Expression]也会bound到当前的Schema。
case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) {def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =this(expressions.map(BindReferences.bindReference(_, inputSchema))) //bound schemaprivate[this] val exprArray = expressions.toArrayprivate[this] val mutableRow = new GenericMutableRow(exprArray.size) //新的Rowdef currentValue: Row = mutableRowdef apply(input: Row): Row = {var i = 0while (i < exprArray.length) {mutableRow(i) = exprArray(i).eval(input) //依据输入的input,即一个Row,计算生成的Rowi += 1}mutableRow //返回新的Row}
}
1.2、Filter
case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {override def output = child.outputoverride def execute() = child.execute().mapPartitions { iter =>iter.filter(condition.eval(_).asInstanceOf[Boolean]) //计算表达式 eval(input row)}
}
1.3、Sample
case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan)extends UnaryNode
{override def output = child.output// TODO: How to pick seed?override def execute() = child.execute().sample(withReplacement, fraction, seed)
}
1.4、Union
case class Union(children: Seq[SparkPlan])(@transient sqlContext: SQLContext) extends SparkPlan {// TODO: attributes output by union should be distinct for nullability purposesoverride def output = children.head.outputoverride def execute() = sqlContext.sparkContext.union(children.map(_.execute())) //子查询的结果进行unionoverride def otherCopyArgs = sqlContext :: Nil
}
1.5、Limit
case class Limit(limit: Int, child: SparkPlan)(@transient sqlContext: SQLContext)extends UnaryNode {// TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan:// partition local limit -> exchange into one partition -> partition local limit againoverride def otherCopyArgs = sqlContext :: Niloverride def output = child.outputoverride def executeCollect() = child.execute().map(_.copy()).take(limit) //直接在driver调用takeoverride def execute() = {val rdd = child.execute().mapPartitions { iter =>val mutablePair = new MutablePair[Boolean, Row]()iter.take(limit).map(row => mutablePair.update(false, row)) //每一个分区先计算limit}val part = new HashPartitioner(1)val shuffled = new ShuffledRDD[Boolean, Row, Row, MutablePair[Boolean, Row]](rdd, part) //须要shuffle,来repartitionshuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))shuffled.mapPartitions(_.take(limit).map(_._2)) //最后单独一个partition来take limit}
}
1.6、TakeOrdered
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)(@transient sqlContext: SQLContext) extends UnaryNode {override def otherCopyArgs = sqlContext :: Niloverride def output = child.output@transientlazy val ordering = new RowOrdering(sortOrder) //这里是通过RowOrdering来实现排序的override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ordering)// TODO: Terminal split should be implemented differently from non-terminal split.// TODO: Pick num splits based on |limit|.override def execute() = sqlContext.sparkContext.makeRDD(executeCollect(), 1)
}
1.7、Sort
case class Sort(sortOrder: Seq[SortOrder],global: Boolean,child: SparkPlan)extends UnaryNode {override def requiredChildDistribution =if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil@transientlazy val ordering = new RowOrdering(sortOrder) //排序顺序override def execute() = attachTree(this, "sort") {// TODO: Optimize sorting operation?child.execute().mapPartitions(iterator => iterator.map(_.copy()).toArray.sorted(ordering).iterator, //每一个分区调用sorted方法,传入<span style="font-family: Arial, Helvetica, sans-serif;">ordering排序规则,进行排序</span>preservesPartitioning = true)}override def output = child.output
}
1.8、ExistingRdd
object ExistingRdd {def convertToCatalyst(a: Any): Any = a match {case o: Option[_] => o.orNullcase s: Seq[Any] => s.map(convertToCatalyst)case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray)case other => other}def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {data.mapPartitions { iterator =>if (iterator.isEmpty) {Iterator.empty} else {val bufferedIterator = iterator.bufferedval mutableRow = new GenericMutableRow(bufferedIterator.head.productArity)bufferedIterator.map { r =>var i = 0while (i < mutableRow.length) {mutableRow(i) = convertToCatalyst(r.productElement(i))i += 1}mutableRow}}}}def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))}
}
二、 Join Related Operators
HashJoin:
![](/assets/blank.gif)
![](/assets/blank.gif)
trait HashJoin {val leftKeys: Seq[Expression]val rightKeys: Seq[Expression]val buildSide: BuildSideval left: SparkPlanval right: SparkPlanlazy val (buildPlan, streamedPlan) = buildSide match { //模式匹配,将physical plan封装形成Tuple2,假设是buildLeft。那么就是(left,right),否则是(right,left)case BuildLeft => (left, right)case BuildRight => (right, left)}lazy val (buildKeys, streamedKeys) = buildSide match { //模式匹配,将expression进行封装<span style="font-family: Arial, Helvetica, sans-serif;">Tuple2</span>case BuildLeft => (leftKeys, rightKeys)case BuildRight => (rightKeys, leftKeys)}def output = left.output ++ right.output@transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output) //生成buildSideKey来依据Expression来计算Row返回结果@transient lazy val streamSideKeyGenerator = //<span style="font-family: Arial, Helvetica, sans-serif;">生成</span><span style="font-family: Arial, Helvetica, sans-serif;">streamSideKeyGenerator</span><span style="font-family: Arial, Helvetica, sans-serif;">来依据Expression来计算Row返回结果</span>() => new MutableProjection(streamedKeys, streamedPlan.output)def joinIterators(buildIter: Iterator[Row], streamIter: Iterator[Row]): Iterator[Row] = { //把build表的Iterator[Row]和streamIterator[Row]进行join操作返回Join后的Iterator[Row]// TODO: Use Spark's HashMap implementation.val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]() //匹配主要使用HashMap实现var currentRow: Row = null// Create a mapping of buildKeys -> rows while (buildIter.hasNext) { //眼下仅仅对build Iterator进行迭代,形成rowKey,Rows,相似wordCount,可是这里不是累加Value,而是Row的集合。
currentRow = buildIter.next() val rowKey = buildSideKeyGenerator(currentRow) //计算rowKey作为HashMap的key if(!rowKey.anyNull) { val existingMatchList = hashTable.get(rowKey) val matchList = if (existingMatchList == null) { val newMatchList = new ArrayBuffer[Row]() hashTable.put(rowKey, newMatchList) //(rowKey, matchedRowList) newMatchList } else { existingMatchList } matchList += currentRow.copy() //返回matchList } } new Iterator[Row] { //最后用streamedRow的Key来匹配buildSide端的HashMap private[this] var currentStreamedRow: Row = _ private[this] var currentHashMatches: ArrayBuffer[Row] = _ private[this] var currentMatchPosition: Int = -1 // Mutable per row objects. private[this] val joinRow = new JoinedRow private[this] val joinKeys = streamSideKeyGenerator() override final def hasNext: Boolean = (currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) || (streamIter.hasNext && fetchNext()) override final def next() = { val ret = buildSide match { case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition)) //右连接的话,streamedRow放左边。匹配到的key的Row放到右表 case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow) //左连接的话,相反。
} currentMatchPosition += 1 ret } /** * Searches the streamed iterator for the next row that has at least one match in hashtable. * * @return true if the search is successful, and false if the streamed iterator runs out of * tuples. */ private final def fetchNext(): Boolean = { currentHashMatches = null currentMatchPosition = -1 while (currentHashMatches == null && streamIter.hasNext) { currentStreamedRow = streamIter.next() if (!joinKeys(currentStreamedRow).anyNull) { currentHashMatches = hashTable.get(joinKeys.currentValue) //streamedRow从buildSide里的HashTable里面匹配rowKey } } if (currentHashMatches == null) { false } else { currentMatchPosition = 0 true } } } } }
class JoinedRow extends Row {private[this] var row1: Row = _private[this] var row2: Row = _.........def copy() = {val totalSize = row1.size + row2.size val copiedValues = new Array[Any](totalSize)var i = 0while(i < totalSize) {copiedValues(i) = apply(i)i += 1}new GenericRow(copiedValues) //返回一个新的合并后的Row}
2.1、LeftSemiJoinHash
case class LeftSemiJoinHash(leftKeys: Seq[Expression],rightKeys: Seq[Expression],left: SparkPlan,right: SparkPlan) extends BinaryNode with HashJoin {val buildSide = BuildRight //buildSide是以右表为基准override def requiredChildDistribution =ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Niloverride def output = left.outputdef execute() = {buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => //右表的物理计划运行后生成RDD,利用zipPartitions对Partition进行合并。然后用上述方法实现。val hashSet = new java.util.HashSet[Row]()var currentRow: Row = null// Create a Hash set of buildKeyswhile (buildIter.hasNext) {currentRow = buildIter.next()val rowKey = buildSideKeyGenerator(currentRow)if(!rowKey.anyNull) {val keyExists = hashSet.contains(rowKey)if (!keyExists) {hashSet.add(rowKey)}}}val joinKeys = streamSideKeyGenerator()streamIter.filter(current => {!joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue)})}}
}
2.2、BroadcastHashJoin
case class BroadcastHashJoin(leftKeys: Seq[Expression],rightKeys: Seq[Expression],buildSide: BuildSide,left: SparkPlan,right: SparkPlan)(@transient sqlContext: SQLContext) extends BinaryNode with HashJoin {override def otherCopyArgs = sqlContext :: Niloverride def outputPartitioning: Partitioning = left.outputPartitioningoverride def requiredChildDistribution =UnspecifiedDistribution :: UnspecifiedDistribution :: Nil@transientlazy val broadcastFuture = future { //利用SparkContext广播表sqlContext.sparkContext.broadcast(buildPlan.executeCollect())}def execute() = {val broadcastRelation = Await.result(broadcastFuture, 5.minute)streamedPlan.execute().mapPartitions { streamedIter =>joinIterators(broadcastRelation.value.iterator, streamedIter) //调用joinIterators对每一个分区map}}
}
2.3、ShuffleHashJoin
然后利用SparkContext里的zipPartitions方法对每一个分区进行zip。
case class ShuffledHashJoin(leftKeys: Seq[Expression],rightKeys: Seq[Expression],buildSide: BuildSide,left: SparkPlan,right: SparkPlan) extends BinaryNode with HashJoin {override def outputPartitioning: Partitioning = left.outputPartitioningoverride def requiredChildDistribution =ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nildef execute() = {buildPlan.execute().zipPartitions(streamedPlan.execute()) {(buildIter, streamIter) => joinIterators(buildIter, streamIter)}}
}
未完待续 :)
原创文章,转载请注明:
转载自:OopsOutOfMemory盛利的Blog。作者: OopsOutOfMemory
本文链接地址:http://blog.csdn.net/oopsoom/article/details/38274621
转载于:https://www.cnblogs.com/cxchanpin/p/6869232.html
Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现相关推荐
- Spark SQL 源代码分析系列
从决定写Spark SQL文章的源代码分析,到现在一个月的时间,一个又一个几乎相同的结束很快,在这里也做了一个综合指数,方便阅读,下面是读取顺序 :) 第一章 Spark SQL源代码分析之核心流程 ...
- 慕课网Spark SQL日志分析 - 4.从Hive平滑过渡到Spark SQL
4.1 SQLContext/HiveContext/SparkSesson 1.SQLContext 老版本文档:spark.apache.org/docs/1.6.1/ SQLContext示例文 ...
- 使用outputstream写到指定位置_MaxCompute Spark与Spark SQL对比分析及使用注意事项
以下内容根据演讲视频以及PPT整理而成. 本次分享主要围绕以下三个方面: 一.功能特性 二.代码开发 三.DataWorks模式 一.功能特性 1.Spark部署模式 Spark开源文档中表明部署模式 ...
- Flume+Spark+Hive+Spark SQL离线分析系统
前段时间把Scala和Spark一起学习了,所以借此机会在这里做个总结,顺便和大家一起分享一下目前最火的分布式计算技术Spark!当然Spark不光是可以做离线计算,还提供了许多功能强大的组件,比如说 ...
- Spark SQL(四)之DataSet与RDD转换
一.创建DataSet DataSet与RDD相似,但是,它们不使用Java序列化或Kryo,而是使用专用的Encoder对对象进行序列化以进行网络处理或传输.虽然编码器和标准序列化都负责将对象转换为 ...
- Spark SQL PERCENTILE分析调研
目录 Spark percentile原理 Hive percentile Spark percentile 优化方案 Spark percentile原理 https://zhuanlan.zhih ...
- Spark SQL Catalyst源代码分析之TreeNode Library
/** Spark SQL源代码分析系列文章*/ 前几篇文章介绍了Spark SQL的Catalyst的核心执行流程.SqlParser,和Analyzer,本来打算直接写Optimizer的,可是发 ...
- Spark SQL Catalyst源代码分析Optimizer
/** Spark SQL源代码分析系列*/ 前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将具体解说S ...
- Spark SQL之External DataSource外部数据源(二)源代码分析
上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External Da ...
- Spark SQL之queryExecution运行流程解析Logical Plan(三)
1.整体运行流程 使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程 // sc is an existing SparkCont ...
最新文章
- 深度思考:从头开始训练目标检测
- 表格某一列不固定其余全固定_如何利用Python一键拆分表格并进行邮件发送~
- 也谈贝叶斯分类(C#)版本
- 开源 多进程 框架 c++_linux fork多进程并发服务器模型之C/C++代码实战
- openjdk17体验
- oracle rman optimization,ORACLE rman设置
- Git笔记(19) 生成SSH公钥
- idea的html表单老提交后显示404,IDEA项目部署顺利后访问一直404
- 数据库的运维策略脚本篇(内附脚本,无私分享)
- 15b万用表怎么测电容_指针式万用表和数字式万用表的使用与口诀,值得收藏!...
- js省市二级联动;)
- HTTPS重定向到HTTP
- (大数据工程师学习路径)第三步 Git Community Book----基本用法(下)
- 陈桂林个人博客传送门
- OpenCasCade鼠标移动高亮颜色设置与选中颜色设置
- 数据分析师需要掌握哪方面的计算机技能?
- 周立功USBCANFD_200U的Python调用
- 微信小游戏开发新手教程1-人人都能做游戏
- 图标显示方框问题的一种原因
- DeepFlow高效的光流匹配算法(上)
热门文章
- Github博客私人订制(一)
- ASP.NET 2.0的编译行为
- 波形分析--DSRC 时间窗测量
- pycharm连接数据库失败
- 洛谷P2939 [USACO09FEB]改造路Revamping Trails(最短路)
- WPF Invoke与BeginInvoke的区别
- jQuery height()、innerHeight()、outerHeight()函数的区别详解
- Kali2.0中peda与pwntools的安装
- UVa 10118 免费糖果(记忆化搜索+哈希)
- 【系统架构】缓存Memcache 使用原子性操作add,实现并发锁