参考:https://github.com/rklick-solutions/spark-tutorial/wiki/Spark-SQL#introduction

Skip to co

Spark SQL is a component on top of Spark Core that introduces a new data abstraction called SchemaRDD, which provides support for structured and semi-structured data.Spark SQL is to execute SQL queries written using either a basic SQL syntax or HiveQL. Spark SQL can also be used to read data from an existing Hive installation.It provides a programming abstraction called DataFrame and can act as distributed SQL query engine. Spark’s interface for working with structured and semi structured data. Structured data is any data that has a schema—that is, a known set of fields for each record. When you have this type of data, Spark SQL makes it both easier and more efficient to load and query.There are several ways to interact with Spark SQL including SQL, the DataFrames API and the Datasets API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation.

Create SQL Context

To create a basic SQLContext, all you need is a SparkContext.

val sc = SparkCommon.sparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Basic Query

Spark SQL can load JSON files and infer the schema based on that data. Here is the code to load the json files, register the data in the temp table called "Cars1" and print out the schema based on that. To make a query against a table, we call the sql() method on the SQLContext. The first thing we need to do is tell Spark SQL about some data to query. In this case we will load some Cars data from JSON, and give it a name by registering it as a “Cars1” so we can query it with SQL.

Here we are using JSON document named cars.json with the following content and generate a table based on the schema in the JSON document.

[{"itemNo" : 1, "name" : "ferrari", "speed" : 259 , "weight": 800},  {"itemNo" : 2, "name" : "jaguar", "speed" : 274 , "weight":998},  {"itemNo" : 3, "name" : "mercedes", "speed" : 340 , "weight": 1800},  {"itemNo" : 4, "name" : "audi", "speed" : 345 , "weight": 875},  {"itemNo" : 5, "name" : "lamborghini", "speed" : 355 , "weight": 1490},{"itemNo" : 6, "name" : "chevrolet", "speed" : 260 , "weight": 900},  {"itemNo" : 7, "name" : "ford", "speed" : 250 , "weight": 1061},  {"itemNo" : 8, "name" : "porche", "speed" : 320 , "weight": 1490},  {"itemNo" : 9, "name" : "bmw", "speed" : 325 , "weight": 1190},  {"itemNo" : 10, "name" : "mercedes-benz", "speed" : 312 , "weight": 1567}]
object BasicQueryExample {val sc = SparkCommon.sparkContextval sqlContext = new org.apache.spark.sql.SQLContext(sc)def main(args: Array[String]) {import sqlContext.implicits._val input = sqlContext.read.json("src/main/resources/cars1.json")input.registerTempTable("Cars1")val result = sqlContext.sql("SELECT * FROM Cars1")result.show()}}case class Cars1(name: String)

    val cars = sqlContext.sql("SELECT COUNT(*) FROM Cars1").collect().foreach(println)

    val result1 = sqlContext.sql("SELECT name, COUNT(*) AS cnt FROM Cars1 WHERE name <> '' GROUP BY name ORDER BY cnt DESC LIMIT 10").collect().foreach(println)

DataFrames

A DataFrame is a distributed collection of data organized into named columns.DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

Creating DataFrames

A DataFrame is a distributed collection of data, which is organized into named columns. Conceptually, it is equivalent to relational tables with good optimization techniques. A DataFrame can be constructed from an array of different sources such as Hive tables, Structured Data files, external databases, or existing RDDs. Here we are using JSON document named cars.json with the following content and generate a table based on the schema in the JSON document.

[{"itemNo" : 1, "name" : "Ferrari", "speed" : 259 , "weight": 800},  {"itemNo" : 2, "name" : "Jaguar", "speed" : 274 , "weight":998},  {"itemNo" : 3, "name" : "Mercedes", "speed" : 340 , "weight": 1800},  {"itemNo" : 4, "name" : "Audi", "speed" : 345 , "weight": 875},  {"itemNo" : 5, "name" : "Lamborghini", "speed" : 355 , "weight": 1490}]
package com.tutorial.sparksqlimport com.tutorial.utils.SparkCommonobject CreatingDataFarmes {val sc = SparkCommon.sparkContext/*** Create a Scala Spark SQL Context.*/val sqlContext = new org.apache.spark.sql.SQLContext(sc)def main(args: Array[String]) {/*** Create the DataFrame*/val df = sqlContext.read.json("src/main/resources/cars.json")/*** Show the Data*/df.show()}
}

DataFrame API Example Using Different types of Functionality

Diiferent type of DataFrame operatios are :

Action:

Action are operations (such as take, count, first, and so on) that return a value after running a computation on an DataFrame.

Some Action Operation with examples:

show()

If you want to see top 20 rows of DataFrame in a tabular form then use the following command.

carDataFrame.show()

show(n)

If you want to see n rows of DataFrame in a tabular form then use the following command.

carDataFrame.show(2)

take()

take(n) Returns the first n rows in the DataFrame.

carDataFrame.take(2).foreach(println)

count()

Returns the number of rows.

carDataFrame.groupBy("speed").count().show()

head()

head () is used to returns first row.

val resultHead = carDataFrame.head()println(resultHead.mkString(","))

head(n)

head(n) returns first n rows.

val resultHeadNo = carDataFrame.head(3)println(resultHeadNo.mkString(","))

first()

Returns the first row.

 val resultFirst = carDataFrame.first()println("fist:" + resultFirst.mkString(","))

collect()

Returns an array that contains all of Rows in this DataFrame.

val resultCollect = carDataFrame.collect()println(resultCollect.mkString(","))

Basic DataFrame functions:

printSchema()

If you want to see the Structure (Schema) of the DataFrame, then use the following command.

carDataFrame.printSchema()

toDF()

toDF() Returns a new DataFrame with columns renamed. It can be quite convenient in conversion from a RDD of tuples into a DataFrame with meaningful names.

val car = sc.textFile("src/main/resources/fruits.txt").map(_.split(",")).map(f => Fruit(f(0).trim.toInt, f(1), f(2).trim.toInt)).toDF().show()

dtypes()

Returns all column names and their data types as an array.

carDataFrame.dtypes.foreach(println)

columns ()

Returns all column names as an array.

carDataFrame.columns.foreach(println)

cache()

cache() explicitly to store the data into memory. Or data stored in a distributed way in the memory by default.

val resultCache = carDataFrame.filter(carDataFrame("speed") > 300)resultCache.cache().show()

Data Frame operations:

sort()

Returns a new DataFrame sorted by the given expressions.

carDataFrame.sort($"itemNo".desc).show()

orderBy()

Returns a new DataFrame sorted by the specified column(s).

carDataFrame.orderBy(desc("speed")).show()

groupBy()

counting the number of cars who are of the same speed .

carDataFrame.groupBy("speed").count().show()

na()

Returns a DataFrameNaFunctions for working with missing data.

carDataFrame.na.drop().show()

as()

Returns a new DataFrame with an alias set.

carDataFrame.select(avg($"speed").as("avg_speed")).show()

alias()

Returns a new DataFrame with an alias set. Same as as.

carDataFrame.select(avg($"weight").alias("avg_weight")).show()

select()

To fetch speed-column among all columns from the DataFrame.

carDataFrame.select("speed").show()

filter()

filter the cars whose speed is greater than 300 (speed > 300).

carDataFrame.filter(carDataFrame("speed") > 300).show()

where()

Filters age using the given SQL expression.

carDataFrame.where($"speed" > 300).show()

agg()

Aggregates on the entire DataFrame without groups.

carDataFrame.agg(max($"speed")).show()

limit()

Returns a new DataFrame by taking the first n rows.The difference between this function and head is that head returns an array while limit returns a new DataFrame.

carDataFrame1.limit(3).show()

unionAll()

Returns a new DataFrame containing union of rows in this frame and another frame.

carDataFrame.unionAll(empDataFrame2).show()

intersect()

Returns a new DataFrame containing rows only in both this frame and another frame.

carDataFrame1.intersect(carDataFrame).show()

except()

Returns a new DataFrame containing rows in this frame but not in another frame.

carDataFrame.except(carDataFrame1).show()

withColumn()

Returns a new DataFrame by adding a column or replacing the existing column that has the same name.

val coder: (Int => String) = (arg: Int) => {if (arg < 300) "slow" else "high"}val sqlfunc = udf(coder)carDataFrame.withColumn("First", sqlfunc(col("speed"))).show()

withColumnRenamed()

Returns a new DataFrame with a column renamed.

empDataFrame2.withColumnRenamed("id", "employeeId").show()

drop()

Returns a new DataFrame with a column dropped.

carDataFrame.drop("speed").show()

dropDuplicates()

Returns a new DataFrame that contains only the unique rows from this DataFrame. This is an alias for distinct.

carDataFrame.dropDuplicates().show()

describe()

describe returns a DataFrame containing information such as number of non-null entries (count),mean, standard deviation, and minimum and maximum value for each numerical column.

carDataFrame.describe("speed").show()

Interoperating with RDDs

SparkSQL supports two different types methods for converting existing RDDs into DataFrames:

1. Inferring the Schema using Reflection:

The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and they become the names of the columns. RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.

  def main(args: Array[String]) {/*** Create RDD and Apply Transformations*/val fruits = sc.textFile("src/main/resources/fruits.txt").map(_.split(",")).map(frt => Fruits(frt(0).trim.toInt, frt(1), frt(2).trim.toInt)).toDF()/*** Store the DataFrame Data in a Table*/fruits.registerTempTable("fruits")/*** Select Query on DataFrame*/val records = sqlContext.sql("SELECT * FROM fruits")/*** To see the result data of allrecords DataFrame*/records.show()}
}case class Fruits(id: Int, name: String, quantity: Int)

2. Programmatically Specifying the Schema:

Creating DataFrame is through programmatic interface that allows you to construct a schema and then apply it to an existing RDD. DataFrame can be created programmatically with three steps. We Create an RDD of Rows from an Original RDD. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step first. Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.

object ProgrammaticallySchema {val sc = SparkCommon.sparkContextval schemaOptions = Map("header" -> "true", "inferSchema" -> "true")//sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.SQLContext(sc)def main(args: Array[String]) {// Create an RDDval fruit = sc.textFile("src/main/resources/fruits.txt")// The schema is encoded in a stringval schemaString = "id name"// Generate the schema based on the string of schemaval schema =StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))schema.foreach(println)// Convert records of the RDD (fruit) to Rows.val rowRDD = fruit.map(_.split(",")).map(p => Row(p(0), p(1).trim))rowRDD.foreach(println)// Apply the schema to the RDD.val fruitDataFrame = sqlContext.createDataFrame(rowRDD, schema)fruitDataFrame.foreach(println)// Register the DataFrames as a table.fruitDataFrame.registerTempTable("fruit")/*** SQL statements can be run by using the sql methods provided by sqlContext.*/val results = sqlContext.sql("SELECT * FROM fruit")results.show()}}

Data Sources

Spark SQL supports a number of structured data sources. These sources include Hive tables, JSON, and Parquet files.Spark SQL supports operating on a variety of data sources through the DataFrame interface. A DataFrame can be operated on as normal RDDs and can also be registered as a temporary table. Registering a DataFrame as a table allows you to run SQL queries over its data.

DataFrame Operations in JSON file:

Here we include some basic examples of structured data processing using DataFrames. As an example, the following creates a DataFrame based on the content of a JSON file. Read a JSON document named cars.json with the following content and generate a table based on the schema in the JSON document.

[{"itemNo" : 1, "name" : "ferrari", "speed" : 259 , "weight": 800},  {"itemNo" : 2, "name" : "jaguar", "speed" : 274 , "weight":998},  {"itemNo" : 3, "name" : "mercedes", "speed" : 340 , "weight": 1800},  {"itemNo" : 4, "name" : "audi", "speed" : 345 , "weight": 875},  {"itemNo" : 5, "name" : "lamborghini", "speed" : 355 , "weight": 1490}]
object DataFrameOperations {val sc = SparkCommon.sparkContext/*** Use the following command to create SQLContext.*/val ssc = SparkCommon.sparkSQLContextval schemaOptions = Map("header" -> "true", "inferSchema" -> "true")def main(args: Array[String]) {/*** Create the DataFrame*/val cars = "src/main/resources/cars.json"/*** read the JSON document* Use the following command to read the JSON document named cars.json.* The data is shown as a table with the fields − itemNo, name, speed and weight.*/val carDataFrame: DataFrame = ssc.read.format("json").options(schemaOptions).load(cars)/*** Show the Data* If you want to see the data in the DataFrame, then use the following command.*/carDataFrame.show()/*** printSchema Method* If you want to see the Structure (Schema) of the DataFrame, then use the following command*/carDataFrame.printSchema()/*** Select Method* Use the following command to fetch name-column among three columns from the DataFrame*/carDataFrame.select("name").show()/*** Filter used to* cars whose speed is greater than 300 (speed > 300).*/carDataFrame.filter(empDataFrame("speed") > 300).show()/*** groupBy Method* counting the number of cars who are of the same speed.*/carDataFrame.groupBy("speed").count().show()}}

Show the Data

If you want to see the data in the DataFrame, then use the following command.

carDataFrame.show()

printSchema Method

If you want to see the Structure (Schema) of the DataFrame, then use the following command

 carDataFrame.printSchema()

Select Method

Use the following command to fetch name-column among three columns from the DataFrame

carDataFrame.select("name").show()

Filter Method

Use the following command to filter whose speed is greater than 300 (speed > 300)from the DataFrame

carDataFrame.filter(carDataFrame("speed") > 300).show()

DataFrame Operations in Text file:

As an example, the following creates a DataFrame based on the content of a text file. Read a text document named fruits.txt with the following content and generate a table based on the schema in the text document.

1, Grapes, 25
2, Guava, 28
3, Gooseberry, 39
4,  Raisins, 23
5, Naseberry, 23
 val fruits = sc.textFile("src/main/resources/fruits.txt").map(_.split(",")).map(frt => Fruits(frt(0).trim.toInt, frt(1), frt(2).trim.toInt)).toDF()/*** Store the DataFrame Data in a Table*/fruits.registerTempTable("fruits")/*** Select Query on DataFrame*/val records = sqlContext.sql("SELECT * FROM fruits")/*** To see the result data of allrecords DataFrame*/records.show()}
}case class Fruits(id: Int, name: String, quantity: Int)

DataFrame Operations in CSV file :

As an example, the following creates a DataFrame based on the content of a CSV file. Read a csv document named cars.csv with the following content and generate a table based on the schema in the csv document.

year,make,model,comment,blank
"2012","Tesla","S","No comment",1997,Ford,E350,"Go get one now they are going fast",
2015,Chevy,Volt
object csvFile {val sc = SparkCommon.sparkContextval sqlContext = SparkCommon.sparkSQLContextdef main(args: Array[String]) {val sqlContext = new SQLContext(sc)
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.load("src/main/resources/cars.csv")df.show()df.printSchema()val selectedData = df.select("year", "model")
selectedData.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save(s"src/main/resources/${UUID.randomUUID()}")println("OK")}}

Dataset

Dataset is a new experimental interface that tries to provide the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).

Creating Datasets:

A Dataset is a strongly-typed, immutable collection of objects that are mapped to a relational schema. Datasets are similar to RDDs, however, instead of using Java Serialization they use a specialized Encoder to serialize the objects for processing or transmitting over the network. Dataset API is a new concept called an encoder, which is responsible for converting between JVM objects and tabular representation. Dataset support for automatically generating encoders for a wide variety of types, including primitive types (e.g. String, Integer, Long), Scala case classes, and Java Beans. Here we are using text document named test_file.txt with the following content and generate a table based on the schema in the text document.

JSON is a popular semistructured data format. The simplest way to load JSON data is
by loading the data as a text file and then mapping over the values with a JSON
parser. Likewise, we can use our preferred JSON serialization library to write out the
values to strings, which we can then write out.In Java and Scala we can also work
with JSON data using a custom Hadoop format.“JSON” on page 172 also shows how to
load JSON data with Spark SQL.
package com.tutorial.sparksqlimport com.tutorial.utils.SparkCommonobject CreatingDatasets {val sc = SparkCommon.sparkContextval sqlContext = new org.apache.spark.sql.SQLContext(sc)def main(args: Array[String]) {import sqlContext.implicits._val lines = sqlContext.read.text("src/main/resources/test_file.txt").as[String]val words = lines.flatMap(_.split(" ")).filter(_ != "").groupBy(_.toLowerCase).count().show()}
}

Basic Opeartion

Encoders are also created for case classes.

    case class Cars(name: String, kph: Long)val ds = Seq(Cars("lamborghini", 32)).toDS()ds.show()

DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name.

   case class Cars(name: String, kph: Long)val car = sqlContext.read.json("src/main/resources/cars.json").as[Cars]car.show()

[Spark进阶]-- Spark Dataframe操作相关推荐

  1. [Spark进阶]--Spark配置参数说明

    感谢原文链接:http://blog.javachen.com/2015/06/07/spark-configuration.html 参考官方原文:https://spark.apache.org/ ...

  2. 客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu

    Spark操作Kudu dataFrame操作kudu 一.DataFrameApi读取kudu表中的数据 虽然我们可以通过上面显示的KuduContext执行大量操作,但我们还可以直接从默认数据源本 ...

  3. Spark RDD、DataFrame原理及操作详解

    RDD是什么? RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用. RDD内部可以 ...

  4. dataframe scala 修改值_【Spark学习笔记】 Scala DataFrame操作大全

    1.创建DataFrame 本文所使用的DataFrame是通过读取mysql数据库获得的,代码如下: val spark = SparkSession .builder() .appName(&qu ...

  5. 【极简spark教程】DataFrame常用操作

    目录 创建DataFrame List,toDF:使用List[Tuple]包装每行记录,结合toDF接口,,转化为DataFrame DataFrameRDD,StructType:推荐使用RDD和 ...

  6. Spark15:Spark SQL:DataFrame常见算子操作、DataFrame的sql操作、RDD转换为DataFrame、load和save操作、SaveMode、内置函数

    前面我们学习了Spark中的Spark core,离线数据计算,下面我们来学习一下Spark中的Spark SQL. 一.Spark SQL Spark SQL和我们之前讲Hive的时候说的hive ...

  7. Spark修炼之道(进阶篇)——Spark入门到精通:第八节 Spark SQL与DataFrame(一)

    本节主要内宾 Spark SQL简介 DataFrame 1. Spark SQL简介 Spark SQL是Spark的五大核心模块之一,用于在Spark平台之上处理结构化数据,利用Spark SQL ...

  8. 第二部分:Spark进阶篇

    第一部分:Spark基础篇_奔跑者-辉的博客-CSDN博客 第二部分:Spark进阶篇_奔跑者-辉的博客-CSDN博客 第三部分:Spark调优篇_奔跑者-辉的博客-CSDN博客 目录 1 Spark ...

  9. spark sql的简单操作

    测试数据 sparkStu.text zhangxs 24 chenxy wangYr 21 teacher wangx 26 teacher sparksql { "name": ...

最新文章

  1. 找工作,姿势要帅气。
  2. 基于django的视频点播网站开发-step11-后台用户管理功能...
  3. 那位标榜技术驱动的开发者去哪了?
  4. python 最简单的实现适配器设计模式
  5. el-table中奇偶行背景色显示不同的颜色
  6. .net redis定时_一场由fork引发的超时,让我们重新探讨Redis的抖动问题
  7. windows系统mysql-5.7官方绿色版zip包安装教程
  8. winform调用fastreport制作报表(三)绑定数据
  9. 中国行政区域划分图 华北,东北,华东,中南,西南,西北
  10. 华为USG防火墙配置命令
  11. icem合并面网格_icem 混合网格 流沙
  12. 【MapReduce】猫眼电影数据库传输数据
  13. 高级计量经济学及Stata应用 第2版_陈强
  14. H265 的 CU PU TU的关系
  15. 几何布朗运动模拟 MATLAB实现
  16. php中column函数,PHP array_column() 函数用法及示例
  17. 无人货架上演生死时速,谁会成为最后的赢家?
  18. tiledmap 图块属性_Tiled Editor 图块的两种导入方式
  19. 洛阳理工学院linux实验报告,谁来给我抄实验报告啊!这一张又一张写得。。。。...
  20. wordpress发邮件_如何修复WordPress不发送电子邮件的问题

热门文章

  1. 数据库基础知识及概念
  2. 如何用Jmeter发送消息到Solace JNDI
  3. html5中,contextmenu 和 spellcheck
  4. Activiti使用教程
  5. Windows下安装CUDA
  6. openCV minMaxLoc
  7. 电路基础_模拟电路_问答_2023
  8. 交流耦合仪表放大器(AC-Coupled Instrumentation Amplifier)
  9. 【OpenCV 例程300篇】208. Photoshop 对比度自动调整算法
  10. mysql decimal 类型_MySQL中decimal类型用法的简单介绍