Apache Spark 2.0引入了SparkSession,其为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,它减少了用户需要了解的一些概念,使得我们可以很容易地与Spark交互。
  本文我们将介绍在Spark 2.0中如何使用SparkSession。更多关于SparkSession的文章请参见:《SparkSession:新的切入点》、《Spark 2.0介绍:创建和使用相关API》、《Apache Spark 2.0.0正式发布及其功能介绍》

探索SparkSession统一的功能

  首先,我们介绍一个简单的Spark应用案例:SparkSessionZipsExample,其从JSON文件中读取邮政编码,并且通过DataFrame API进行一些分析,之后使用Spark SQL进行一些查询,这些操作并没有使用到SparkContext, SQLContext 或者HiveContext。

创建SparkSession

在2.0版本之前,与Spark交互之前必须先创建SparkConf和SparkContext,代码如下:

[plain] view plaincopyprint?
  1. //set up the spark configuration and create contexts
  2. val sparkConf = new SparkConf().setAppName("SparkSessionZipsExample").setMaster("local")
  3. // your handle to SparkContext to access other context like SQLContext
  4. val sc = new SparkContext(sparkConf).set("spark.some.config.option", "some-value")
  5. val sqlContext = new org.apache.spark.sql.SQLContext(sc)

  然而在Spark 2.0中,我们可以通过SparkSession来实现同样的功能,而不需要显式地创建SparkConf, SparkContext 以及 SQLContext,因为这些对象已经封装在SparkSession中。使用生成器的设计模式(builder design pattern),如果我们没有创建SparkSession对象,则会实例化出一个新的SparkSession对象及其相关的上下文。

[plain] view plaincopyprint?
  1. // Create a SparkSession. No need to create SparkContext
  2. // You automatically get it as part of the SparkSession
  3. val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
  4. val spark = SparkSession
  5. .builder()
  6. .appName("SparkSessionZipsExample")
  7. .config("spark.sql.warehouse.dir", warehouseLocation)
  8. .enableHiveSupport()
  9. .getOrCreate()

到现在我们可以使用上面创建好的spark对象,并且访问其public方法。

配置Spark运行相关属性

  一旦我们创建好了SparkSession,我们就可以配置Spark运行相关属性。比如下面代码片段我们修改了已经存在的运行配置选项。

[plain] view plaincopyprint?
  1. //set new runtime options
  2. spark.conf.set("spark.sql.shuffle.partitions", 6)
  3. spark.conf.set("spark.executor.memory", "2g")
  4. //get all settings
  5. val configMap:Map[String, String] = spark.conf.getAll()

获取Catalog元数据

  通常我们想访问当前系统的Catalog元数据。SparkSession提供了catalog实例来操作metastore。这些方法放回的都是Dataset类型的,所有我们可以使用Dataset相关的API来访问其中的数据。如下代码片段,我们展示了所有的表并且列出当前所有的数据库:

[plain] view plaincopyprint?
  1. //fetch metadata data from the catalog
  2. scala> spark.catalog.listDatabases.show(false)
  3. +--------------+---------------------+--------------------------------------------------------+
  4. |name          |description          |locationUri                                             |
  5. +--------------+---------------------+--------------------------------------------------------+
  6. |default       |Default Hive database|hdfs://iteblogcluster/user/iteblog/hive/warehouse       |
  7. +--------------+---------------------+--------------------------------------------------------+
  8. scala> spark.catalog.listTables.show(false)
  9. +----------------------------------------+--------+-----------+---------+-----------+
  10. |name                                    |database|description|tableType|isTemporary|
  11. +----------------------------------------+--------+-----------+---------+-----------+
  12. |iteblog                                 |default |null       |MANAGED  |false      |
  13. |table2                                  |default |null       |EXTERNAL |false      |
  14. |test                                    |default |null       |MANAGED  |false      |
  15. +----------------------------------------+--------+-----------+---------+-----------+

创建Dataset和Dataframe

  使用SparkSession APIs创建 DataFrames 和 Datasets的方法有很多,其中最简单的方式就是使用spark.range方法来创建一个Dataset。当我们学习如何操作Dataset API的时候,这个方法非常有用。操作如下:

[plain] view plaincopyprint?
  1. scala> val numDS = spark.range(5, 100, 5)
  2. numDS: org.apache.spark.sql.Dataset[Long] = [id: bigint]
  3. scala> numDS.orderBy(desc("id")).show(5)
  4. +---+
  5. | id|
  6. +---+
  7. | 95|
  8. | 90|
  9. | 85|
  10. | 80|
  11. | 75|
  12. +---+
  13. only showing top 5 rows
  14. scala> numDS.describe().show()
  15. +-------+------------------+
  16. |summary|                id|
  17. +-------+------------------+
  18. |  count|                19|
  19. |   mean|              50.0|
  20. | stddev|28.136571693556885|
  21. |    min|                 5|
  22. |    max|                95|
  23. +-------+------------------+
  24. scala> val langPercentDF = spark.createDataFrame(List(("Scala", 35),
  25. | ("Python", 30), ("R", 15), ("Java", 20)))
  26. langPercentDF: org.apache.spark.sql.DataFrame = [_1: string, _2: int]
  27. scala> val lpDF = langPercentDF.withColumnRenamed("_1", "language").withColumnRenamed("_2", "percent")
  28. lpDF: org.apache.spark.sql.DataFrame = [language: string, percent: int]
  29. scala> lpDF.orderBy(desc("percent")).show(false)
  30. +--------+-------+
  31. |language|percent|
  32. +--------+-------+
  33. |Scala   |35     |
  34. |Python  |30     |
  35. |Java    |20     |
  36. |R       |15     |
  37. +--------+-------+

使用SparkSession读取CSV

创建完SparkSession之后,我们就可以使用它来读取数据,下面代码片段是使用SparkSession来从csv文件中读取数据:

[plain] view plaincopyprint?
  1. val df = sparkSession.read.option("header","true").
  2. csv("src/main/resources/sales.csv")

上面代码非常像使用SQLContext来读取数据,我们现在可以使用SparkSession来替代之前使用SQLContext编写的代码。下面是完整的代码片段:

[html] view plaincopyprint?
  1. package com.iteblog
  2. import org.apache.spark.sql.SparkSession
  3. /**
  4. * Spark Session example
  5. *
  6. */
  7. object SparkSessionExample {
  8. def main(args: Array[String]) {
  9. val sparkSession = SparkSession.builder.
  10. master("local")
  11. .appName("spark session example")
  12. .getOrCreate()
  13. val df = sparkSession.read.option("header","true").csv("src/main/resources/sales.csv")
  14. df.show()
  15. }
  16. }

使用SparkSession API读取JSON数据

  我们可以使用SparkSession来读取JSON、CVS或者TXT文件,甚至是读取parquet表。比如在下面代码片段里面,我将读取邮编数据的JSON文件,并且返回DataFrame对象:

[plain] view plaincopyprint?
  1. // read the json file and create the dataframe
  2. scala> val jsonFile = "/user/iteblog.json"
  3. jsonFile: String = /user/iteblog.json
  4. scala> val zipsDF = spark.read.json(jsonFile)
  5. zipsDF: org.apache.spark.sql.DataFrame = [_id: string, city: string ... 3 more fields]
  6. scala> zipsDF.filter(zipsDF.col("pop") > 40000).show(10, false)
  7. +-----+----------+-----------------------+-----+-----+
  8. |_id  |city      |loc                    |pop  |state|
  9. +-----+----------+-----------------------+-----+-----+
  10. |01040|HOLYOKE   |[-72.626193, 42.202007]|43704|MA   |
  11. |01085|MONTGOMERY|[-72.754318, 42.129484]|40117|MA   |
  12. |01201|PITTSFIELD|[-73.247088, 42.453086]|50655|MA   |
  13. |01420|FITCHBURG |[-71.803133, 42.579563]|41194|MA   |
  14. |01701|FRAMINGHAM|[-71.425486, 42.300665]|65046|MA   |
  15. |01841|LAWRENCE  |[-71.166997, 42.711545]|45555|MA   |
  16. |01902|LYNN      |[-70.941989, 42.469814]|41625|MA   |
  17. |01960|PEABODY   |[-70.961194, 42.532579]|47685|MA   |
  18. |02124|DORCHESTER|[-71.072898, 42.287984]|48560|MA   |
  19. |02146|BROOKLINE |[-71.128917, 42.339158]|56614|MA   |
  20. +-----+----------+-----------------------+-----+-----+
  21. only showing top 10 rows

在SparkSession中还用Spark SQL

  通过SparkSession我们可以访问Spark SQL中所有函数,正如你使用SQLContext访问一样。下面代码片段中,我们创建了一个表,并在其中使用SQL查询:

[plain] view plaincopyprint?
  1. // Now create an SQL table and issue SQL queries against it without
  2. // using the sqlContext but through the SparkSession object.
  3. // Creates a temporary view of the DataFrame
  4. scala> zipsDF.createOrReplaceTempView("zips_table")
  5. scala> zipsDF.cache()
  6. res3: zipsDF.type = [_id: string, city: string ... 3 more fields]
  7. scala> val resultsDF = spark.sql("SELECT city, pop, state, _id FROM zips_table")
  8. resultsDF: org.apache.spark.sql.DataFrame = [city: string, pop: bigint ... 2 more fields]
  9. scala> resultsDF.show(10)
  10. +------------+-----+-----+-----+
  11. |        city|  pop|state|  _id|
  12. +------------+-----+-----+-----+
  13. |      AGAWAM|15338|   MA|01001|
  14. |     CUSHMAN|36963|   MA|01002|
  15. |       BARRE| 4546|   MA|01005|
  16. | BELCHERTOWN|10579|   MA|01007|
  17. |   BLANDFORD| 1240|   MA|01008|
  18. |   BRIMFIELD| 3706|   MA|01010|
  19. |     CHESTER| 1688|   MA|01011|
  20. |CHESTERFIELD|  177|   MA|01012|
  21. |    CHICOPEE|23396|   MA|01013|
  22. |    CHICOPEE|31495|   MA|01020|
  23. +------------+-----+-----+-----+
  24. only showing top 10 rows

使用SparkSession读写Hive表

下面我们将使用SparkSession创建一个Hive表,并且对这个表进行一些SQL查询,正如你使用HiveContext一样:

[plain] view plaincopyprint?
  1. scala> spark.sql("DROP TABLE IF EXISTS iteblog_hive")
  2. res5: org.apache.spark.sql.DataFrame = []
  3. scala> spark.table("zips_table").write.saveAsTable("iteblog_hive")
  4. 16/08/24 21:52:59 WARN HiveMetaStore: Location: hdfs://iteblogcluster/user/iteblog/hive/warehouse/iteblog_hive specified for non-external table:iteblog_hive
  5. scala> val resultsHiveDF = spark.sql("SELECT city, pop, state, _id FROM iteblog_hive WHERE pop > 40000")
  6. resultsHiveDF: org.apache.spark.sql.DataFrame = [city: string, pop: bigint ... 2 more fields]
  7. scala> resultsHiveDF.show(10)
  8. +----------+-----+-----+-----+
  9. |      city|  pop|state|  _id|
  10. +----------+-----+-----+-----+
  11. |   HOLYOKE|43704|   MA|01040|
  12. |MONTGOMERY|40117|   MA|01085|
  13. |PITTSFIELD|50655|   MA|01201|
  14. | FITCHBURG|41194|   MA|01420|
  15. |FRAMINGHAM|65046|   MA|01701|
  16. |  LAWRENCE|45555|   MA|01841|
  17. |      LYNN|41625|   MA|01902|
  18. |   PEABODY|47685|   MA|01960|
  19. |DORCHESTER|48560|   MA|02124|
  20. | BROOKLINE|56614|   MA|02146|
  21. +----------+-----+-----+-----+
  22. only showing top 10 rows

正如你所见,你使用DataFrame API, Spark SQL 以及 Hive查询的结果都一样。

本文翻译自:https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html

SparkSession简单介绍相关推荐

  1. 遗传算法的简单介绍以及模式定理的简单证明

    遗传算法   遗传算法(Genetic Algorithm,GA),最早是由美国的John holland在20世纪70年代提出.算法通过模拟达尔文生物进化论的自然选择以及遗传学机理的生物进化过程来搜 ...

  2. 2021年大数据ELK(十八):Beats 简单介绍和FileBeat工作原理

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Beats 简单介绍和FileBeat工作原理 一.Beats 二.FileB ...

  3. 2021年大数据ELK(十五):Elasticsearch SQL简单介绍

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Elasticsearch SQL简单介绍 一.SQL与Elasticsear ...

  4. 2021年大数据ELK(二):Elasticsearch简单介绍

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 一.Elasticsearch简介 1.介绍 2.创始人 二.E ...

  5. iOS开发UI篇—多控制器和导航控制器简单介绍

    iOS开发UI篇-多控制器和导航控制器简单介绍 一.多控制器 一个iOS的app很少只由一个控制器组成,除非这个app极其简单.当app中有多个控制器的时候,我们就需要对这些控制器进行管理 有多个vi ...

  6. 简单介绍一下R中的几种统计分布及常用模型

    统计学上分布有很多,在R中基本都有描述.因能力有限,我们就挑选几个常用的.比较重要的简单介绍一下每种分布的定义,公式,以及在R中的展示. 统计分布每一种分布有四个函数:d――density(密度函数) ...

  7. LVS(Linux Virtual Server)三种负载均衡模型和十种调度的简单介绍

    LVS(Linux Virtual Server)三种负载均衡模型和十种调度的简单介绍 LVS (Linux Virtual Server) LVS(Linux Virtual Server)其实就是 ...

  8. dubbo学习过程、使用经验分享及实现原理简单介绍

    一.前言 部门去年年中开始各种改造,第一步是模块服务化,这边初选dubbo试用在一些非重要模块上,慢慢引入到一些稍微重要的功能上,半年时间,学习过程及线上使用遇到的些问题在此总结下. 整理这篇文章差不 ...

  9. iOS开发UI篇—UIWindow简单介绍

    iOS开发UI篇-UIWindow简单介绍 一.简单介绍 UIWindow是一种特殊的UIView,通常在一个app中只会有一个UIWindow iOS程序启动完毕后,创建的第一个视图控件就是UIWi ...

最新文章

  1. 贪心 Codeforces Round #300 A Cutting Banner
  2. 如何高效地去调试UGUI的源码
  3. vivo升级android10系统,官方确认vivo NEX旗舰版会直接升级到Android10
  4. flink介绍:有界流和无界流
  5. golang学习之旅:使用go语言操作mysql数据库(自己测试了)
  6. [Hands On ML] 4. 训练模型
  7. centos7 关闭selinux_Devops之LDAP部署安装(centos7+openLDAP+PhpLDAPAdmin)
  8. 微信小程序 自动解决分包大小问题_一个小小的优化,能让你的小程序瘦身10%...
  9. 如何使用phpMyAdmin管理数据库
  10. MATLAB每个字母等宽,等宽文本文件的导入选项对象
  11. jquery在选择元素的时候,可以写成var div=$(div)
  12. linux安装moodle最新版,在linux下安装moodle
  13. 2021年口腔正畸行业隐形矫治器专题研究报告
  14. 深度学习:卷积神经网络从入门到精通
  15. IP地址、子网掩码、网络号、主机号、网络地址、主机地址以及ip段/数字-如192.168.0.1/24是什么意思?
  16. SQLserver提权
  17. 12306候补购票功能怎么用?抢票软件依旧能抢到票
  18. 风琴html插件,jQuery垂直手风琴插件
  19. 工程光学第一、二、六章学习总结
  20. STM32从零开始(四)详解GPIO库函数

热门文章

  1. 英勇的战士——斯巴达
  2. lol人数最多的服务器,谁说“黑色玫瑰”妹子最多?LOL国服各大服务器趣闻盘点...
  3. wordpress社会化登陆插件Open Social设置教程
  4. iocp端口断开_在完成端口IOCP模型判断客户端是否已关闭连接(掉线) | 学步园
  5. 远期、期货和互换(三)
  6. 联想微型计算机620S,小巧、精致!联想ideacenter 620s远不止如此
  7. 我的编程奋斗历程[四部曲之四]-决定创业篇
  8. 底部孕线形态有哪些?底部孕线形态特征是什么?
  9. 网站银联支付证书更换
  10. ORB-SLAM2的安装及试运行