一、SparkSQL的发展

1.1 概述

SparkSQL是Spark⽣态体系中的构建在SparkCore基础之上的⼀个基于SQL的计算模块。 SparkSQL的前身不叫SparkSQL,⽽叫Shark,最开始的时候底层代码优化,sql的解析、执⾏引擎等等完全基于 Hive,总是Shark的执⾏速度要⽐hive⾼出⼀个数量级,但是hive的发展制约了Shark,所以在15年中旬的时候, shark负责⼈,将shark项⽬结束掉,重新独⽴出来的⼀个项⽬,就是sparksql,不再依赖hive,做了独⽴的发展, 逐渐的形成两条互相独⽴的业务:SparkSQL和Hive-On-Spark。在SparkSQL发展过程中,同时也吸收了Shark的一些特点:基于内存的列存储,动态字节码优化技术。

1.2 特点

官网的原话:**Spark SQL is Apache Spark’s module for working with structured data. **
即Spark SQL是Apache Spark处理结构化数据的模块。

  • 集成

无缝地将SQL查询与Spark程序混合。
Spark SQL允许您使用SQL或熟悉的DataFrame API在Spark程序中查询结构化数据。适用于Java、Scala、Python和R语言。

  • 统一的数据访问

以相同的方式连接到任何数据源。
DataFrames和SQL提供了一种访问各种数据源的通用方法,包括Hive、Avro、Parquet、ORC、JSON和JDBC。您甚至可以通过这些源连接数据。

  • 蜂巢集成

在现有仓库上运行SQL或HiveQL查询。
Spark SQL支持HiveQL语法以及Hive SerDes和udf,允许您访问现有的Hive仓库。

  • 标准的连接

通过JDBC或ODBC连接。
服务器模式为业务智能工具提供了行业标准JDBC和ODBC连接。

1.3 总结

SparkSQL就是Spark生态体系中用于处理结构化数据的⼀个模块。结构化数据是什么?存储在关系型数据库中的数据,就是结构化数据;半结构化数据是什么?类似xml、json等的格式的数据被称之为半结构化数据;非结构化数据是什么?音频、视频、图片等为非结构化数据。 换句话说,SparkSQL处理的就是⼆维表数据。

二、SparkSQL的编程入口和模型

2.1 SparkSQL编程入口

SparkSQL中的编程模型,不再是SparkContext,但是创建需要依赖SparkContext。SparkSQL中的编程模型,在spark2.0以前的版本为SQLContext和HiveContext,HiveContext是SQLContext的一个子类,提供Hive中特有的一些功能,比如row_number开窗函数等等,这是SQLContext所不具备的,Spark2.0之后将这两个模型进行了合并——SparkSession。SparkSession的构建需要依赖SparkConf或者SparkContext。使用工厂构建器 (Builder方式)模式创建SparkSession。

import org.apache.spark.sql.SparkSession;SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate();

2.2 编程模型简介

主要通过两种方式操作SparkSQL,一种就是SQL,另一种为DataFrame和Dataset。

  • SQL

SQL不用多说,就和Hive操作⼀样,但需要清楚一点,SQL操作的是表,所以如果用SQL进行操作,就需要将SparkSQL对应的编程模型转化成为一张表。

  • DataFrame和Dataset

DataFrame和Dataset是SparkSQL中的编程模型。DataFrame和Dataset我们都可以理解为是一张mysql中的二维表,表有什么?表头,表名,字段,字段类型。RDD其实说⽩了也是一张二维表,但是这张二维表相比较于DataFrame和Dataset却少了很多东⻄,⽐如表头,表名,字段,字段类型,只有数据。 Dataset是在spark1.6.2开始出现出现的api,DataFrame是1.3的时候出现的,早期的时候DataFrame叫 SchemaRDD,SchemaRDD和SparkCore中的RDD相比较,就多了Schema,所谓约束信息,元数据信息。 ⼀般的,将RDD称之为Spark体系中的第⼀代编程模型;DataFrame⽐RDD多了⼀个Schema元数据信息, 被称之为Spark体系中的第二代编程模型;Dataset吸收了RDD的优点(强类型推断和强⼤的函数式编程)和 DataFrame中的优化(SQL优化引擎,内存列存储),成为Spark的最新⼀代的编程模型。

2.3 RDD V.S. DataFrame V.S. Dataset

创建DataFrame

1.spark.read.format("xxx")

# 这种方式对应与文件/外部输入 例如csv对应了CSVFileFormat

2.spark.createDataFrame(RDD[A])

[A<:Product] : 元组和样例类都是Product子类型

3.import spark.implicits._    // 导入sparkSession中的隐式转换操作

Seq(("a",1),("b",2),("c",3)).toDF("k","v").show()

Dataset的创建注意事项:

# 不建议使用普通类
class User(val id:Int,val name:String)val list3 = List(new User(100,"xx"))spark.createDataset(list3).show  // list3中的类型应该存在Encoder# 对于普通类型,要么转成样例类,否则需要手动提供Encoder隐式值implicit val e:Encoder[User] = Encoders.javaSerialization(classOf[User])class User(val id:Int,val name:String) extends Serializable

2.4 RDD、DataFrame、Dataset之间的转换

  • RDD=>DataFrame             rdd.toDF
  • RDD=>Dataset                   rdd.toDS
  • DataFrame=>RDD             df.rdd
  • DataFrame=>Dataset        df.as[User]
  • Dataset=>RDD                  ds.rdd
  • Dataset=>DataFrame        ds.toDF

2.5 DataFrame & Dataset API

网址:http://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset

分类:

  • Action算子:collect show
  • Typed transformations 强类型转换 返回Dataset[U]
  • Untyped transformations  返回DataFrame \Column
  • 其它

三、SparkSQL的数据加载和落地

3.1 数据的加载

SparkSQL中加载外部的数据,使用统一的API入口

spark.read.format(数据⽂件格式).load(path)

这个方式有更加清晰的简写方式,比如要加载json格式的文件

spark.read.json(path)

注:默认加载的文件格式为parquet

def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("SparkSQLLoadAndSave").getOrCreate()//加载数据 默认的加载⽂件的格式为parquetvar pdf = spark.read.format("json").load("file:///E:/data/spark/sql/people.json")//简写⽅式pdf = spark.read.json("file:///E:/data/spark/sql/people.json")//parquetpdf = spark.read.parquet("file:///E:/data/spark/sql/users.parquet")//text 加载普通的⽂本⽂件,只能解析成⼀列pdf = spark.read.text("file:///E:/data/spark/sql/dailykey.txt")//csv 普通的⽂本⽂件,列之间以","作为分隔符pdf = spark.read.csv("file:///E:/data/spark/sql/province.csv").toDF("pid", "province", "code", "cid")// 根据需要重新命名列名 数据类型均为字符串//orc 是rc⽂件格式的升级版本pdf = spark.read.orc("file:///E:/data/spark/sql/student.orc")//jdbcval url = "jdbc:mysql://localhost:3306/test"val table = "wordcount"val properties = new Properties()properties.put("user", "bigdata")properties.put("password", "sorry")pdf = spark.read.jdbc(url, table, properties)pdf.printSchema()pdf.show()spark.stop()
}

3.2 数据的落地

SparkSQL对数据的落地保存使用的api为:spark.write.save(),需要指定数据的落地格式。

和read的默认格式⼀样,save的默认格式也是parquet,需要在write和save之间指定具体的格式

format(format) ,同样也有简写方式:spark.write.json/parquet等

def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("SparkSQLLoadAndSave").getOrCreate()val df = spark.read.orc("file:///E:/data/spark/sql/student.orc")/*数据的落地默认的存储格式为parquet,同时基于snappy压缩⽅式存储# 落地的保存⽅式SaveModeErrorIfExists:⽬录存在报错,默认的格式Append:在原有的基础之上追加Ignore:忽略,如果⽬录存在则忽略,不存在则创建Overwrite:覆盖(删除并重建)*/df.write.format("json").mode(SaveMode.Overwrite).save("file:///E:/data/spark/sql/stu")val url = "jdbc:mysql://localhost:3306/test"val table = "student"val properties = new Properties()properties.put("user", "bigdata")properties.put("password", "sorry")df.write.mode(SaveMode.Append).jdbc(url, table, properties)spark.stop()}

四、SparkSQL与Hive的整合

SparkSQL和Hive的整合,是⼀种比较常见的关联处理方式,SparkSQL加载Hive中的数据进行业务处理,同时将计 算结果落地回Hive中。 ⾸先将服务器中Hadoop安装路径下的hdfs-site.xml、core-site.xml以及hive中hive-site.xml三个⽂件拿出来,放到⼯程中的resource⽂件夹下。

重点:注意修改每个文件中的路径,一定要改成虚拟机的服务ip地址或者映射名称

import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 使⽤SparkSQL来操作Hive
*/
object _04SparkSQLOnHiveOps {def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir","E:\\hadoop-common-2.2.0-bin-master")val spark = SparkSession.builder().appName("SparkSQLOnHive").master("local").enableHiveSupport() // 开启hive机制.getOrCreate()/*** 如果我们开起了Hive的机制后,并且导⼊了相关的xml⽂件,可能会有⼀个⼩⼩的问题之前读取的本地磁盘⽂件,就突然找不到了,为什么?因为我们使⽤了hive,那么它默认会搜索Hdfs上的数据⽂件,所以本地磁盘会找不到⽂件,这个问题很 好解决,在路径前⾯加上file:///D:\BaiduNetdiskDownload\sql-data如果无效,那么请把连接Hive的配置⽂件删掉,因为现在不需要⽤它*/val df = spark.read.text("E:\\test.txt")df.createTempView("stu")spark.sql(s"""|select| tmp.word,| count(tmp.word) as counts| from(| select| explode(split(value," ")) word| from stu| ) tmp| group by tmp.word| order by counts desc|""".stripMargin).show()// 获取hive中的表或者数据
// spark.sql("select * from test.fact_access_log")
// val url = "jdbc:mysql://localhost:3306/spark"
// val tab = "stu"
// val prop = new Properties()
// prop.setProperty("user","root")
// prop.setProperty("password","123456")// 读取MySQL数据
// val df: DataFrame = spark.read.jdbc(url, tab, prop)// 接下来将数据存⼊hive
/**
* hive建表语句
* CREATE TABLE `stu` (
* `id` bigint,
* `name` string,
* `age` bigint
* )
*/
// df.write.insertInto("test.stu")}
}

注:执行如上代码时,首先要在Hive中先建表

五、SparkSQL中的函数操作

5.1 函数的定义

SQL中函数,其实就是各大编程语言中的函数,或者方法,就是对某⼀特定功能的封装,通过它可以完成较为复杂的统计。这里函数的学习,就基于Hive中的函数来学习。

5.2 函数的分类

1)功能上划分

数值

  • round(x,[d]):对x保留d位小数,同时会对结果四舍五入
  • floor(x):获取不大于x的最大整数
  • ceil(x):获取不小于x的最小整数
  • rand():
  • 获取0到1之间的随机数   /  获取表中随机的两条记录
hive> select * , rand() rand from teacher order by rand limit 2;
or
hive> select * from teacher order by rand() limit 2;

数学

  • abs(x):取绝对值

条件

  1. if(expr1, expr2, expr3):如果expr1为true,返回expr2,反之返回expr3
  2. case when 多条件表达式

日期

  1. current_date(),获取当前的日期,日期格式为标准格式:yyyy-MM-dd
  2. current_timestamp():获取当前日期的时间戳,格式:yyyy-MM-dd HH:mm:ss.SSS
  3. add_months(start_date, num_months):返回start_date之后num_months月的日期
  4. date_add(start_date, num_days):返回start_date之后num_days天的日期
  5. date_sub(start_date, num_days):返回start_date之前num_days天的日期
  6. next_day(start_date, day_of_week),返回start_date之后最接近的day_of_week对应的日期
  7. dayofmonth(date) 返回date对应月份中的第几天
  8. weekofyear(date) 返回date对应年份中的第几周
  9. minute hour day month year 获取日期中对应的年月日时分
  10. date_format(date,format),返回指定格式化时间
  11. datediff(date1, date2),返回date1和date2之间的差值(单位是天),换句话说就是date1-date2
  12. from_unixtime(unix_time, format)将unix_time转化为格式化时间
  13. to_date(datetime)返回datetime中的日期部分

字符串

注意:数据库中的字符串索引从1开始,而不是0

  1. length(str) 返回字符串str的长度
  2. instr(str, substr),作⽤等同于str.indexOf(substr)
  3. substr substring(str, pos[, len]):从str的pos位置开始,截取子字符串,截取len的长度,如果不传len,截取余下所有
  4. substring_index(str, delim, count):将字符串str使⽤delim进⾏分割,返回强count个使⽤delim拼接的子字符串
  5. concat(str1, str2)拼接字串
  6. concat_ws(separator, str1, str2):使⽤指定分隔符来拼接字符串

统计函数

  1. index(arr, n),就是arr(n)获取索弓|n对应的元素
  2. sum、count、max、avg、 min等

特殊

  • array:返回数组
  • collect_set:返回⼀个元素不重复的set集合
  • collect_list:返回⼀个元素可重复的list集合
  • split(str, regex):使⽤regex分隔符将str进⾏切割,返回⼀个字符串数组
  • explode(array):将⼀个数组,转化为多⾏
  •  cast(type1 as type2):将数据类型type1的数据转化为数据类型type2

Demo:使用SQL方式统计WordCount

selecttmp.word,count(1) counts
from (selectexplode(split(line, "\\s+")) wordfrom test_wc
) tmp
group by tmp.word
order by counts desc, tmp.word;

2)实现方式上划分

  • UDF(User Defined function)用户自定义函数

一路输入,一路输出,比如year,date_add, instr

  • UDAF(User Defined aggregation function)⽤户⾃定义聚合函数

多路输入,⼀路输出,常见的聚合函数,count、sum、collect_list

  • UDTF(User Defined table function)⽤户⾃定义表函数

一路输入,多路输出,explode

  • 开窗函数

row_number() ——>分组topN的求解

selecttmp.*
from (selectname,age,married,height,row_number() over(partition by married order by height) rankfrom teacher
) tmp
where tmp.rank < 3;

5.3 自定义函数

5.3.1 概述

当系统提供的这些函数,满足不了实际需要时,就需要进行自定义相关的函数,一般自定义的函数分为两种, UDF和UDAF。

5.3.2 UDF

一路输⼊,一路输出,完成就是基于scala函数。

通过模拟获取字符串长度的udf来学习自定义udf操作。

object _01SparkSQLUDFOps {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("SparkSQLUDF").getOrCreate()import spark.implicits._val rdd = spark.sparkContext.parallelize(List("songhaining","yukailu","liuxiangyuan","maningna"))//使⽤sparksession进⾏udf和udaf的注册
// spark.udf.register[Int, String]("myLen", (str:String) => myStrLength(str))
// spark.udf.register[Int, String]("myLen", str => myStrLength(str))spark.udf.register[Int, String]("myLen", myStrLength)/* private val myLen: UserDefinedFunction = udf((s: String) => {s.length
})df.select(myLen($"name")).show()s*/val df = rdd.toDF("name")df.createOrReplaceTempView("test")//求取每个字符串的⻓度val sql ="""|select| name,| length(name) nameLen,| myLen(name) myNameLen|from test""".stripMarginspark.sql(sql).show()spark.stop()}//⾃定义udfdef myStrLength(str:String):Int = str.length
}

5.3.3 UDAF

多路输入,一路输出,类似combineByKey

通过模拟avg函数,来学习如何自定义UDAF操作。

object _02SparkSQLUDAFOps {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("SparkSQLUDAF").getOrCreate()import spark.implicits._val rdd = spark.sparkContext.parallelize(List(Student(1, "宋海宁", 168.5, 105.5, 18),Student(2, "麻宁娜", 165.0, 101.0, 19),Student(3, "刘⾹媛", 170.5, 108.5, 17),Student(4, "蔚凯璐", 172.5, 115, 16)))spark.udf.register("myAvg", new MyUDAFAVG)val df = rdd.toDS()df.createOrReplaceTempView("student")
val sql ="""|select| round(avg(height), 1) avg_height,| avg(weight) avg_weight,| avg(age) avg_age,| myAvg(weight) myAvg_wight|from student""".stripMarginspark.sql(sql).show()spark.stop()}
}
case class Student(id:Int, name:String, height:Double, weight:Double, age:Int)

自定义UDAF

class MyUDAFAVG extends UserDefinedAggregateFunction {/*指定⽤户⾃定义udaf输⼊参数的元数据datediff(date1, date2)*/override def inputSchema: StructType = {StructType(List(StructField("weight", DataTypes.DoubleType, false)))}//udaf返回值的数据类型override def dataType: DataType = DataTypes.DoubleType//udaf⾃定义函数求解过程中的临时变量的数据类型override def bufferSchema: StructType = {StructType(List(StructField("sum", DataTypes.DoubleType, false),StructField("count", DataTypes.IntegerType, false)))}//聚合函数是否幂等,相同输⼊是否总是得到相同输出override def deterministic: Boolean = true/*分区内的初始化操作其实就是给sum和count赋初始值*/
override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer.update(0, 0.0)buffer.update(1, 0)}/*** 分区内的更新操作* @param buffer 临时变量* @param input ⾃定义函数调⽤时传⼊的值*/override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer.update(0, buffer.getDouble(0) + input.getDouble(0))buffer.update(1, buffer.getInt(1) + 1)}//分区间的合并操作override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0))buffer1.update(1, buffer1.getInt(1) + buffer2.getInt(1))}//udaf聚合结果的返回值override def evaluate(buffer: Row): Double = {buffer.getDouble(0) / buffer.getInt(1)}
}

5.4 多维立方体分析函数

grouping sets 、rollup 、cube 是用来处理多维分析的函数:

  • grouping sets:对分组集中指定的组表达式的每个⼦集执⾏group by, group by A,B grouping sets(A,B)就等价于 group by A union group by B,其中A和B也可以是⼀个集合,比如group by A,B,C grouping sets((A,B),(A,C))。
  • rollup:在指定表达式的每个层次级别创建分组集。group by A,B,C with rollup ⾸先会对(A、B、C)进行group by,然后对(A、B)进⾏group by,然后是(A)进⾏group by,最后对全表进行group by操作。
  • cube : 为指定表达式集的每个可能组合创建分组集。group by A,B,C with cube 首先会对(A、B、C)进行group by,然后依次是(A、B),(A、C),(A),(B、C),(B),(C),最后对全表进行group by操作。

六、SparkSQL之SQL调优

6.1 缓存数据至内存

Spark SQL可以通过调用sqlContext.cacheTable("tableName") 或者dataFrame.cache(),将表⽤⼀种柱状格式(an inmemory columnar format)缓存至内存中。然后Spark SQL在执行查询任务时,只需扫描必需的列,从而以减少扫描数据量、提高性能。通过缓存数据,Spark SQL还可以自动调节压缩,从而达到最小化内存使用率和降低GC压力的目的。调用sqlContext.uncacheTable("tableName")可将缓存的数据移出内存。

可通过两种方式配置缓存数据功能:

  • 使用SQLContext的setConf方法
  • 执行SQL命令SET key=value
Property Name Default Meaning
spark.sql.inMemoryColumnarStorage.compressed true 如果假如设置为true,SparkSql会根据统计信息⾃动的为每个列 选择压缩⽅式进⾏压缩。
spark.sql.inMemoryColumnarStorage.compressed 10000 控制列缓存的批量大小。批次⼤有助于改善内存使⽤和压缩,但 是缓存数据会有OOM的⻛险。

6.2 参数调优

可以通过配置下表中的参数调节Spark SQL的性能。

Property Name Default Meaning
spark.sql.files.maxPartitionBytes 134217728 获取数据到分区中的最大字节数。
spark.sql.files.openCostlnBytes 4194304 (4MB) 该参数默认4M,表示小于4M的小文件会合并到一个分区中,
spark.sql.broadcastTimeout 300 广播等待超时时间,单位秒。
spark.sql.autoBroadcastJoinThreshold 10485760(10 MB) 最大广播表的大小。设置为-1可以禁止该功能。当前统计信息仅支持Hive Metastore表。
spark.sql.shuffle.partitions 200 设置huffle分区数,默认200。

6.3 SQL数据倾斜优化

以group By出现数据倾斜为例进行解决。采用的案例就是wordcount。两阶段聚合进行解决:局部聚合+全局聚合。

object _03SparkSQLDataskewOps {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)Logger.getLogger("org.spark_project").setLevel(Level.WARN)val spark = SparkSession.builder().master("local[*]").appName("SparkSQLDataskew").getOrCreate()val list = List("zhang zhang wen wen wen wen yue yue","gong yi can can can can can can can can can can","chang bao peng can can can can can can")import spark.implicits._val rdd = spark.sparkContext.parallelize(list)val df = rdd.toDF("line")df.createOrReplaceTempView("test")println("原始表中的数据---------------------")df.show()println("step 1-----------进⾏数据拆分-------------")var sql ="""|select| split(line, '\\s+')|from test""".stripMarginspark.sql(sql).show()println("step 2-----------进⾏列转化为多⾏数据-------------")sql ="""|select| explode(split(line, '\\s+')) word|from test""".stripMarginspark.sql(sql).show()println("step 3-----------进⾏添加前缀打散数据-------------")sql ="""|select| concat_ws("_", cast(floor(rand() * 2) as string), t1.word) prefix_word|from (| select| explode(split(line, '\\s+')) word
| from test|) t1""".stripMarginspark.sql(sql).show()println("step 4-----------进⾏有前缀的局部聚合-------------")sql ="""|select| concat_ws("_", cast(floor(rand() * 2) as string), t1.word)
prefix_word,| count(1) countz|from (| select| explode(split(line, '\\s+')) word| from test|) t1|group by prefix_word""".stripMarginspark.sql(sql).show()println("step 5-----------进⾏去前缀操作-------------")sql ="""|select| t2.prefix_word,| substr(t2.prefix_word, instr(t2.prefix_word, "_") + 1) up_word,| t2.countz|from (| select| concat_ws("_", cast(floor(rand() * 2) as string), t1.word)
prefix_word,| count(1) countz| from (| select| explode(split(line, '\\s+')) word| from test| ) t1| group by prefix_word|) t2""".stripMarginspark.sql(sql).show()println("step 6-----------进⾏全局聚合-------------")sql ="""|select| substr(t2.prefix_word, instr(t2.prefix_word, "_") + 1) up_word,| sum(t2.countz) counts|from (| select
| concat_ws("_", cast(floor(rand() * 2) as string), t1.word)
prefix_word,| count(1) countz| from (| select| explode(split(line, '\\s+')) word| from test| ) t1| group by prefix_word|) t2|group by up_word""".stripMarginspark.sql(sql).show()spark.stop()}//⾃定义添加前缀的函数
/* def addPrefix(str:String):String = {val random = new Random()random.nextInt(2) + "_" + str}*/
}

由于处理过程中,使用了两层group By,所以经常将使用sql的处理称之为双重group by。

参考资料:https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

七、SparkSQL运行架构及原理

Spark SQL对SQL语句的处理与关系型数据库类似,即词法/语法解析、绑定、优化、执行。Spark SQL会先将SQL语句解析成⼀棵树,然后使用规则(Rule)对Tree进⾏绑定、优化等处理过程。Spark SQL由Core、Catalyst、 Hive、Hive-ThriftServer四部分构成:

  • Core: 负责处理数据的输⼊和输出,如获取数据,查询结果输出成DataFrame等
  • Catalyst: 负责处理整个查询过程,包括解析、绑定、优化等
  • Hive: 负责对Hive数据进⾏处理
  • Hive-ThriftServer: 主要⽤于对hive的访问

7.1 SparkSQL运行架构

7.2 SparkSQL运行原理

1.使用SessionCatalog保存元数据

  • 在解析SQL语句之前,会创建SparkSession,或者如果是2.0之前的版本初始化SQLContext,SparkSession只是封 装了SparkContext和SQLContext的创建⽽已。会把元数据保存在SessionCatalog中,涉及到表名,字段名称和字 段类型。创建临时表或者视图,其实就会往SessionCatalog注册

2.解析SQL,使用ANTLR生成未绑定的逻辑计划

  • 当调用SparkSession的sql或者SQLContext的sql方法,我们以2.0为准,就会使用SparkSqlParser进⾏解析SQL. 使 ⽤的ANTLR进⾏词法解析和语法解析。它分为2个步骤来生成Unresolved LogicalPlan:
  1. 词法分析:Lexical Analysis,负责将token分组成符号类
  2. 构建⼀个分析树或者语法树AST

3 使用分析器Analyzer绑定逻辑计划

  • 在该阶段,Analyzer会使用Analyzer Rules,并结合SessionCatalog,对未绑定的逻辑计划进行解析,生成已绑定的逻辑计划。

4.使⽤优化器Optimizer优化逻辑计划

  • 优化器也是会定义一套Rules,利用这些Rule对逻辑计划和Exepression进行迭代处理,从而使得树的节点进行和并和优化

5.使用SparkPlanner生成物理计划

  • SparkSpanner使用Planning Strategies,对优化后的逻辑计划进行转换,生成可以执行的物理计划SparkPlan.

6.使用QueryExecution执行物理计划

  • 此时调⽤SparkPlan的execute⽅法,底层其实已经再触发JOB了,然后返回RDD

大数据之Spark(四):Spark SQL相关推荐

  1. 【大数据计算】(四) Spark的安装和基础编程

    文章目录 1. 使用Spark Sell编写代码 1.1启动Spark Shell 1.2 读取文件 1.3 编写词频统计程序 2. 编写Spark独立应用程序 2.1 用Scala语言编写Spark ...

  2. 大数据项目(基于spark)--新冠疫情防控指挥作战平台项目

    大数据项目(基于spark)–新冠疫情防控指挥作战平台项目 文章目录 第一章 项目介绍 1.1 项目背景 1.2 项目架构 1.3 项目截图 1.4 功能模块 第二章 数据爬取 2.1 数据清单 2. ...

  3. 【计算机大数据毕设之基于spark+hadoop的大数据分析论文写作参考案例】

    [计算机大数据毕设之基于spark+hadoop的大数据分析论文写作参考案例-哔哩哔哩] https://b23.tv/zKOtd3L 目  录 一 引言​1 二 系统分析​2 2.1 必要性和可行性 ...

  4. Spark大数据开发学习:Spark基础入门

    在诸多的大数据技术框架当中,Spark可以说是占据了非常重要的地位,继Hadoop之后,基于实时数据处理需求的不断上升,Spark开始占据越来越大的市场份额,而Spark,也成为大数据的必学知识点.今 ...

  5. CDH大数据平台搭建之SPARK集群搭建

    CDH大数据平台搭建之SPARK集群搭建 一.安装规划 二.下载 三.安装及配置 1.解压 2.修改配置文件 四.启动 五.JPS检查节点 一.安装规划 参考CHD大数据平台搭建之集群规划 二.下载 ...

  6. 我眼中的大数据(五)——Spark

    CSDN话题挑战赛第2期 参赛话题:大数据技术分享 Hadoop MapReduce虽然已经可以满足大数据的应用场景,但是其执行速度和编程复杂度并不让人们满意.Spark因其拥有更快的执行速度和更友好 ...

  7. spark启动的worker节点是localhost_Spark大数据在线培训:Spark运行原理解析

    在大数据技术框架当中,Spark是继Hadoop之后的又一代表性框架,也是学习大数据当中必学的重点技术框架.在这些年的发展当中,Spark所占据的市场地位,也在不断拓展.今天的Spark大数据在线培训 ...

  8. 大数据改变世界,Spark改变大数据——中国Spark技术峰会见闻

    作者:张相於,当当网推荐系统开发经理 责编:周建丁(zhoujd@csdn.net) 笔者5月15日参加了"中国云计算技术大会"中的"中国Spark技术峰会", ...

  9. 打怪升级之小白的大数据之旅(四十一)<大数据与Hadoop概述>

    打怪升级之小白的大数据之旅(四十) Hadoop概述 上次回顾 好了,经过了java,mysql,jdbc,maven以及Linux和Shell的洗礼,我们终于开始正式进入大数据阶段的知识了,首先我会 ...

  10. 2021年大数据Hive(四):Hive查询语法

    全网最详细的Hive文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 系列历史文章 前言 hive查询语法 一.SELECT语句 1.语句结构 2.全表查 ...

最新文章

  1. Python基本数据类型之列表
  2. 面试系列三 如何保证消息不被重复消费
  3. 实现WebSocket和WAMP协议的开源库WampSharp
  4. appium历史版本下载
  5. Tomcat JAAS 身份验证和授权
  6. 安装loadrunner,缺少VC2005_sp1_with_atl的错
  7. 细粒度分类网络之WS-DAN论文阅读附代码
  8. IP地址详解、无分类编址和路由寻址(计算机网络二)
  9. 用阿里云香港云服务器时需要注意的方面
  10. 新墨斯智能鞋让你成为健康达人
  11. python复制word段落_使用python将整个word文档(包括表)复制到另一个
  12. 如何在家里制作服务器,在家如何架设服务器?
  13. java excel导入 日期_java导入excel时处理日期格式(已验证ok)
  14. 拆解了20个企业微信社群的我, 发现【社群营销】最好的方法应该是这样!
  15. js中的3种弹出式消息提醒(警告窗口,确认窗口,信息输入窗口)的命令是什么?
  16. 【CentOS8】网卡重启
  17. WR | 西湖大学鞠峰组揭示微塑料污染对人工湿地菌群与脱氮功能的影响
  18. 软考中级网络工程师必背考试知识点集锦(一)
  19. Oracle ojdbc6-11.2.0.3.jar下载以及Maven手动安装jar包
  20. 常见的web服务器软件

热门文章

  1. #undef 的用法
  2. 一键部署Kubernetes高可用集群
  3. “花朵分类“ 手把手搭建【卷积神经网络】
  4. 期货开户手续费加一分都是薄利多销
  5. 一个程序最多能占用的内存大小
  6. Sql注入的入门教程
  7. 把STM32F103的工程移植到F105或F107
  8. Web 开发技术项目设计日志
  9. 关于:js使用$.parseJSON字符串转json对象报错Uncaught SyntaxError- Unexpected token o in JSON at position 1
  10. 关于Home键:KEYCODE_HOME