spark—SQL实战案例
学习内容
- 一、sparkSQL在IDEA的使用
- 1.环境配置
- 2.快速入门
- 二、sparkSQL实战案例
- 1.数据准备
- 2.案例分析
- 3.功能实现
- 4.代码实现
一、sparkSQL在IDEA的使用
1.环境配置
配置pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>Spark3.0</artifactId><groupId>org.example</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version></dependency></dependencies><artifactId>spark-core</artifactId></project>
连接hadoop集群中的hive
- 首先集群要启动
- Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下
- 将 Mysql 的驱动 copy 到 jars/目录下(mysql-connector-java-5.1.27-bin.jar)
- 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
- 重启集群中 spark-shell,执行语句 spark.sql(“show tables”).show 显示hive的表即可
- 把 hive-site.xml放到IDEA项目的target文件夹下
- 最后注意windows和虚拟机的IP的映射
2.快速入门
package com.bigdata.SparkSQLimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/*** @author wangbo* @version 1.0*//*** 环境测试*/
object Spark02_SparkSQL_Hive_demo {def main(args: Array[String]): Unit = {// TODO 创建SparkSQL的运行环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val sparkSession: SparkSession = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()//使用SparkSQL连接外置的Hive//首先 集群要启动//1.拷贝Hive-size.xml文件到classpath下//2.启用hive的支持//3.增加对应的依赖关系(包含mysql的驱动)sparkSession.sql("show tables").show()// TODO 关闭环境sparkSession.close()}
}
如果报错类似这种:
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=anonymous, access=EXECUTE, inode=“/tmp”:root:supergroup:drwx------
解决方法:
这种情况说明你hive中的数据库权限不够,直接将hdfs中存放该数据库的文件的权限修改即可 如:[root@hadoop100 ~]# hadoop dfs -chmod 777 /user/hive/warehouse/spark_demo.db
二、sparkSQL实战案例
1.数据准备
数据文件:
链接:https://pan.baidu.com/s/1t9hxa3dXF9gNRZJtxosWtQ
提取码:x523
package com.bigdata.SparkSQLimport org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession/*** @author wangbo* @version 1.0*//*** 首先在hive中创建数据库,在hdfs中把hive创建的数据库文件,给上权限 如:hadoop dfs -chmod 777 /user/hive/warehouse/spark_demo.db* 数据的准备:进入数据库,创建表,导入数据*/
object Spark02_SparkSQL_Hive_demo1 {def main(args: Array[String]): Unit = {// TODO 创建SparkSQL的运行环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val sparkSession: SparkSession = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()//进入数据库sparkSession.sql("use spark_demo")//TODO 准备数据, 创建表//用户信息表sparkSession.sql("""|CREATE TABLE `user_visit_action`(| `date` string,| `user_id` bigint,| `session_id` string,| `page_id` bigint,| `action_time` string,| `search_keyword` string,| `click_category_id` bigint,| `click_product_id` bigint,| `order_category_ids` string,| `order_product_ids` string,| `pay_category_ids` string,| `pay_product_ids` string,| `city_id` bigint)|row format delimited fields terminated by '\t'""".stripMargin)sparkSession.sql("""|load data local inpath 'datas/user_visit_action1.txt' into table spark_demo.user_visit_action|""".stripMargin)//商品信息表sparkSession.sql("""|CREATE TABLE `product_info`(| `product_id` bigint,| `product_name` string,| `extend_info` string)|row format delimited fields terminated by '\t'|""".stripMargin)sparkSession.sql("""|load data local inpath 'datas/product_info.txt' into table spark_demo.product_info|""".stripMargin)//城市信息表sparkSession.sql("""|CREATE TABLE `city_info`(| `city_id` bigint,| `city_name` string,| `area` string)|row format delimited fields terminated by '\t'|""".stripMargin)sparkSession.sql("""|load data local inpath 'datas/city_info.txt' into table spark_demo.city_info|""".stripMargin)sparkSession.sql("show tables").show()// TODO 关闭环境sparkSession.close()}
}
2.案例分析
- 查询出来所有的点击记录,并与 city_info 表连接,得到每个城市所在的地区,与Product_info 表连接得到产品名称
- 按照地区和商品 id 分组,统计出每个商品在每个地区的总点击次数
- 每个地区内按照点击次数降序排列
- 只取前三名
- 城市备注需要自定义 UDAF 函数
3.功能实现
- 连接三张表的数据,获取完整的数据(只有点击)
- 将数据根据地区,商品名称分组
- 统计商品点击次数总和,取 Top3
- 实现自定义聚合函数显示备注
4.代码实现
package com.bigdata.SparkSQLimport org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregatorimport scala.collection.mutable
import scala.collection.mutable.ListBuffer/*** @author wangbo* @version 1.0*//*** 进行表的查询*/
object Spark02_SparkSQL_Hive_demo2 {def main(args: Array[String]): Unit = {// TODO 创建SparkSQL的运行环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val sparkSession: SparkSession = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()//进入数据库sparkSession.sql("use spark_demo")//查询基本数据sparkSession.sql("""| select| a.*,| p.product_name,| c.area,| c.city_name| from user_visit_action a| join product_info p on a.click_product_id = p.product_id| join city_info c on a.city_id = c.city_id| where a.click_product_id > -1|""".stripMargin).createOrReplaceTempView("t1") //把上面的查询结果,放在一个临时表他t1中//根据区域,商品进行数据聚合sparkSession.udf.register("cityRemark",functions.udaf(new cityRemarkUDAF()))sparkSession.sql("""| select| area,| product_name,| count(*) as clickCnt,| cityRemark(city_name) as city_remark| from t1 group by area,product_name|""".stripMargin).createOrReplaceTempView("t2")//区域内对点击数量进行排序sparkSession.sql("""| select| *,| rank() over(partition by area order by clickCnt desc) as rank| from t2|""".stripMargin).createOrReplaceTempView("t3")//取前三名sparkSession.sql("""| select| *| from t3 where rank <=3|""".stripMargin).show(false) //这里的false为显示完整的字段名,如果不写,字段过长会被省略// TODO 关闭环境sparkSession.close()}/*自定义聚合函数:实现城市备注功能1.定义自定义类继承org.apache.spark.sql.expressions.Aggregator定义泛型IN:输入的数据类型:城市的名称BUF:缓冲区的数据类型(使用了样例类):【总点击数量,Map[ (city,cnt),(city,cnt) ]】OUT:输出的数据类型:备注信息2.重写方法*/case class Buffer(var total:Long,var cityMap:mutable.Map[String,Long])class cityRemarkUDAF extends Aggregator[String,Buffer,String]{//初始值,缓冲区初始化override def zero: Buffer = {Buffer(0,mutable.Map[String,Long]())}//根据输入的数据更新缓冲区的数据override def reduce(buff: Buffer, city: String): Buffer = {buff.total += 1val newCount = buff.cityMap.getOrElse(city,0L) + 1 //获取cityMap的value,如果能取到就+1,取不到赋值为0+1buff.cityMap.update(city,newCount) //更新缓冲区buff}//合并缓冲区的数据override def merge(buff1: Buffer, buff2: Buffer): Buffer = {buff1.total += buff2.total //将点击量合并val map1: mutable.Map[String, Long] = buff1.cityMapval map2: mutable.Map[String, Long] = buff2.cityMap//方式一:两个map合并操作
// buff1.cityMap = map1.foldLeft(map2) {// case (map, (city, count)) => { //key:city,value:count
// val newCount = map.getOrElse(city, 0L) + count
// map.update(city, newCount)
// map
// }
// }
// buff1//方式二:两个map合并操作map2.foreach{case (city , count) => {val newCount = map1.getOrElse(city,0L) + countmap1.update(city, newCount)}}buff1.cityMap = map1buff1}//将统计的结构生成字符串信息override def finish(buff: Buffer): String = {val remarkList: ListBuffer[String] = ListBuffer[String]()val totalCount: Long = buff.total //城市的总数量val cityMap: mutable.Map[String, Long] = buff.cityMap//数据进行降序排列,去前两个val cityCountList: List[(String, Long)] = cityMap.toList.sortWith( //因为List可以排序(left, right) => { //cityMap1 和 cityMap2 两个map进行比较left._2 > right._2}).take(2)//判断城市是否大于2val bool: Boolean = cityMap.size > 2var rsum = 0LcityCountList.foreach{case (city,count) => { //city城市名称,count城市数量val r = count * 100 / totalCount //求出商品在主要城市的比例 乘100是为了取整remarkList.append(s"${city} ${r}%")rsum += r}}if (bool){remarkList.append(s"其他 ${100-rsum}")}remarkList.mkString(",")}//缓冲区的编码操作,自定义的类就写Encoders.product,如果是scala存在的类,如Long 就写Encoders.scalaLongoverride def bufferEncoder: Encoder[Buffer] = Encoders.product//输出的编码操作,自定义的类就写Encoders.product,如果是scala存在的类,如Long 就写Encoders.scalaLongoverride def outputEncoder: Encoder[String] = Encoders.STRING}
}
参考:尚硅谷spark3.0教学
spark—SQL实战案例相关推荐
- Hive/Spark SQL使用案例
Hive/Spark SQL使用案例 求 TOPN:开窗函数 求天数:datediff() 函数 求每个学生的成绩都大于...系列:开窗 / 分组 表转置/行转列系列一:concat_ws 函数 表转 ...
- flink sql实战案例
目录 一.背景 二.流程 三.案例 1.flink sql读取 Kafka 并写入 MySQL source sink insert 2.flinksql读kafka写入kudu source sin ...
- Spark Streaming 实战案例(一)
本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html#mllib-opera ...
- Spark SQL实战(08)-整合Hive
1 整合原理及使用 Apache Spark 是一个快速.可扩展的分布式计算引擎,而 Hive 则是一个数据仓库工具,它提供了数据存储和查询功能.在 Spark 中使用 Hive 可以提高数据处理和查 ...
- Spark SQL实战
一.程序 1 package sparklearning 2 3 import org.apache.log4j.Logger 4 import org.apache.spark.SparkConf ...
- Spark SQL之案例实战(四)
1. 获取数据 本文通过将github上的Spark项目Git日志作为数据,对SparkSQL的内容进行详细介绍 数据获取命令如下: [root@master spark]# git log --pr ...
- Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制
主要内容 Spark Stream 缓存 Checkpoint 案例 1. Spark Stream 缓存 通过前面一系列的课程介绍,我们知道DStream是由一系列的RDD构成的,它同一般的RDD一 ...
- Spark Streaming 实战案例(五) Spark Streaming与Kafka
主要内容 Spark Streaming与Kafka版的WordCount示例(一) Spark Streaming与Kafka版的WordCount示例(二) 1. Spark Streaming与 ...
- Spark Streaming 实战案例(二) Transformation操作
本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html DStream Tra ...
最新文章
- 中文企业云操作系统 CecOS
- RK3288 添加USB转虚拟串口设备
- android listview 游标,Android Listview - 使用游标时无法选择多个项目
- html提交本页面,将文本提交到相同的HTML页面
- 图论——Dijkstra+prim算法涉及到的优先队列(二叉堆)
- java5的递归算法_java递归算法 java面试题(5)
- mapreduce复制连接的代码_MapReduce:在大型集群上简化数据处理(2)
- android循环请求数据,android – Camera2 ImageReader冻结重复捕获请求
- 11. Django基础:应用及分布式路由
- python中小用法之assert,*arg,**arg用法全解
- 判断在ios系统中打开微信浏览器
- Kotlin 和 Java 中内部类中的 static native 方法(JNI 函数)
- 5G网络规划面临的挑战
- 怎么把flac转换为mp3格式
- ffmpeg批量处理视频和音频合成
- 游戏平台搭建在韩国大带宽服务器CN2路线哪里的比较好
- Latex 中文简历 过程(更新Miktex和 修改utf字体)
- 记录我看的密码学方案中的技术,Shamir秘密共享,Schnorr零知识证明,EIGamal密码体制
- 使用 MFC 编写打印程序
- MAC禁止chrome自动更新【绝对真实有效,无效剁吊】
热门文章
- 防火墙知识学习(一)
- 用计算机操纵Photoshop快速做事
- STC8051学习笔记
- 成功解决gpg: 找不到有效的 OpenPGP 数据
- Pico+UnityXR实现简单移动和交互
- 【动画消消乐】纯CSS加载/过渡动画学习笔记合集(1-50)
- 试用了阿里云市场的验证码识别api,真的牛批,传统4位数验证码识别率超高
- mysql5.532,又双叒叕一篇评测,StorageReview这样评价PBlaze5 916 U.2 NVMe SSD
- linux 程序 减肥,linux下实用软件组合 -- 为你的 linux 减肥!
- 室温固态量子计算机,我国学者在室温固态体系中实现基于单自旋体系的质因数分解量子算法...