数据源

链接:https://pan.baidu.com/s/1lUbGmA10yOgUL4Rz2KAGmw
提取码:yh57

源码在github:https://github.com/lidonglin-bit/Spark-Sql

目录

  • 一.数据准备
  • 二.各区域热门商品 Top3
    • 需求简介
    • 思路分析
  • 具体实现
    • 提前准备
    • 测试数据(实现一小部分sql)
    • 使用UDAF实现城市备注的部分
    • 把数据写到mysql中

一.数据准备

我们这次 Spark-sql 操作中所有的数据均来自 Hive.
首先在 Hive 中创建表, 并导入数据.
一共有 3 张表: 1 张用户行为表, 1 张城市表, 1 张产品表

CREATE TABLE `user_visit_action`(`date` string,`user_id` bigint,`session_id` string,`page_id` bigint,`action_time` string,`search_keyword` string,`click_category_id` bigint,`click_product_id` bigint,`order_category_ids` string,`order_product_ids` string,`pay_category_ids` string,`pay_product_ids` string,`city_id` bigint)
row format delimited fields terminated by '\t';
load data local inpath '/export/servers/datas/user_visit_action.txt' into table spark1602.user_visit_action;CREATE TABLE `product_info`(`product_id` bigint,`product_name` string,`extend_info` string)
row format delimited fields terminated by '\t';
load data local inpath '/export/servers/datas/product_info.txt' into table spark1602.product_info;CREATE TABLE `city_info`(`city_id` bigint,`city_name` string,`area` string)
row format delimited fields terminated by '\t';
load data local inpath '/export/servers/datas/city_info.txt' into table spark1602.city_info;

二.各区域热门商品 Top3

需求简介

这里的热门商品是从点击量的维度来看的.
计算各个区域前三大热门商品,并备注上每个商品在主要城市中的分布比例,超过两个城市用其他显示。

例如:
地区  商品名称        点击次数    城市备注
华北  商品A     100000  北京21.2%,天津13.2%,其他65.6%
华北  商品P     80200   北京63.0%,太原10%,其他27.0%
华北  商品M     40000   北京63.0%,太原10%,其他27.0%
东北  商品J     92000   大连28%,辽宁17.0%,其他 55.0%

思路分析

使用 sql 来完成. 碰到复杂的需求, 可以使用 udf 或 udaf
1.先把需要的字段查出来
2.按照地区和商品名称聚合
3.按照地区进行分组开窗,排序 开窗函数
4.过滤出来名次小于等于3的
5. 城市备注需要自定义 UDAF 函数

具体实现

提前准备

  • 1.添加依赖
   <dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.1.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version></dependency></dependencies><build><plugins><!-- 打包插件, 否则 scala 类不会编译并打包进去 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build>
  • 2.本次需要用hive,把hive-site.xml文件导入到resources下

测试数据(实现一小部分sql)

实现前面的一部分,后部分要用UDAF

import org.apache.spark.sql.SparkSessionobject SqlApp {def main(args: Array[String]): Unit = {System.setProperty("HADOOP_USER_NAME", "root")val spark = SparkSession.builder().master("local[*]").appName("SqlApp").enableHiveSupport().getOrCreate()//去执行sql,从hive查询数据spark.sql("use spark1602")spark.sql("""|select|    ci.*,|    pi.product_name,|    uva.click_product_id|from user_visit_action uva|join product_info pi|on uva.click_product_id = pi.product_id|join city_info ci|on uva.city_id = ci.city_id|""".stripMargin).createOrReplaceTempView("t1")spark.sql("""|select|   area,|   product_name,|   count(*) count|from t1|group by area,product_name|""".stripMargin).createOrReplaceTempView("t2")spark.sql("""|select|   area,|   product_name,|   count,|   rank() over(partition by area order by count desc) rk|from t2|""".stripMargin).createOrReplaceTempView("t3")spark.sql("""|select|  area,|   product_name,|   count|from t3|where rk<=3|""".stripMargin).showspark.close()}
}

结果

使用UDAF实现城市备注的部分

  • 1.创建UDAF
import org.apache.spark.sql.SparkSessionobject SqlApp {def main(args: Array[String]): Unit = {System.setProperty("HADOOP_USER_NAME", "root")val spark = SparkSession.builder().master("local[*]").appName("SqlApp").enableHiveSupport().getOrCreate()spark.udf.register("remark",new CityRemarkUDAF)//去执行sql,从hive查询数据spark.sql("use spark1602")spark.sql("""|select|    ci.*,|    pi.product_name,|    uva.click_product_id|from user_visit_action uva|join product_info pi|on uva.click_product_id = pi.product_id|join city_info ci|on uva.city_id = ci.city_id|""".stripMargin).createOrReplaceTempView("t1")spark.sql("""|select|   area,|   product_name,|   count(*) count,|   remark(city_name) remark|from t1|group by area,product_name|""".stripMargin).createOrReplaceTempView("t2")spark.sql("""|select|   area,|   product_name,|   count,|   remark,|   rank() over(partition by area order by count desc) rk|from t2|""".stripMargin).createOrReplaceTempView("t3")spark.sql("""|select|  area,|   product_name,|   count,|   remark|from t3|where rk<=3|""".stripMargin).show(false)spark.close()}}

实现UDAF

import java.text.DecimalFormatimport org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, LongType, MapType, StringType, StructField, StructType}class CityRemarkUDAF extends UserDefinedAggregateFunction{//输入的数据类型  "北京","天津","String"override def inputSchema: StructType = StructType(Array(StructField("city",StringType)))//缓冲类型,每个地区的每个商品,缓冲所有城市的点击量//1.Map(北京->1000 天津->1000 石家庄->500)  用Map来存//2.总的点击量override def bufferSchema: StructType =StructType(Array(StructField("map",MapType(StringType,LongType)),StructField("total",LongType)))//最终聚合的结果类型   北京21.2%   天津13.2%  其他65.6%  Stringoverride def dataType: DataType = StringType//确定性override def deterministic: Boolean = true//对缓冲区做初始化override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = Map[String,Long]()buffer(1) = 0L}//分区内聚合override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {input match {//这个是remark(city_name)case Row(cityName:String) =>//1.总数的点击量 + 1buffer(1) = buffer.getLong(1) + 1L//2.给这个城市的点击量 + 1 => 找到缓冲的map,取出来这个城市原来的点击 +1  再赋值过去val map = buffer.getMap[String,Long](0)buffer(0) = map + (cityName -> (map.getOrElse(cityName,0L)+1L))case  _ =>}}//分区间的聚合override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {val map1 = buffer1.getMap[String,Long](0)val map2 = buffer2.getMap[String,Long](0)val total1 = buffer1.getLong(1)val total2 = buffer2.getLong(1)// 1.总数的聚合buffer1(1) = total1 + total2//2.map的聚合buffer1(0) = map1.foldLeft(map2){case (map,(cityName,count))=>map + (cityName-> (map.getOrElse(cityName,0L) +count))}}//返回最后的聚合结果override def evaluate(buffer: Row): String = {//北京21.2%,天津13.2%,其他65.6%val cityAndCount = buffer.getMap[String,Long](0)val total = buffer.getLong(1)val cityCountTop2 = cityAndCount.toList.sortBy(-_._2).take(2)var cityRemarks = cityCountTop2.map {case (cityName, count) => CityRemark(cityName, count.toDouble/total)}// CityRemark("其他",1-cityRemarks.foldLeft(0D)(_+_.cityRadio))cityRemarks :+= CityRemark("其他",cityRemarks.foldLeft(1D)(_-_.cityRadio))cityRemarks.mkString(",")}
}
case class CityRemark(cityName:String,cityRadio:Double){val f = new DecimalFormat("0.00%")//北京21.2%,天津13.2%,其他65.6%override def toString:String = s"$cityName:${f.format(cityRadio.abs)}"
}

结果

把数据写到mysql中

代码实现

import java.util.Propertiesimport org.apache.spark.sql.SparkSessionobject SqlApp {def main(args: Array[String]): Unit = {System.setProperty("HADOOP_USER_NAME", "root")val spark = SparkSession.builder().master("local[*]").appName("SqlApp").enableHiveSupport().getOrCreate()spark.udf.register("remark",new CityRemarkUDAF)//去执行sql,从hive查询数据spark.sql("use spark1602")spark.sql("""|select|    ci.*,|    pi.product_name,|    uva.click_product_id|from user_visit_action uva|join product_info pi|on uva.click_product_id = pi.product_id|join city_info ci|on uva.city_id = ci.city_id|""".stripMargin).createOrReplaceTempView("t1")spark.sql("""|select|   area,|   product_name,|   count(*) count,|   remark(city_name) remark|from t1|group by area,product_name|""".stripMargin).createOrReplaceTempView("t2")spark.sql("""|select|   area,|   product_name,|   count,|   remark,|   rank() over(partition by area order by count desc) rk|from t2|""".stripMargin).createOrReplaceTempView("t3")val url = "jdbc:mysql://hadoop102:3306/sparksql?useUnicode=true&characterEncoding=UTF-8"val user = "root"val pw = "root"val props = new Properties()props.put("user",user)props.put("password",pw)spark.sql("""|select|  area,|   product_name,|   count,|   remark|from t3|where rk<=3|""".stripMargin).coalesce(1).write.mode("overwrite").jdbc(url,"sql1602",props)//把结果写到mysql中spark.close()}}

成功

SparkSql 项目实战 | 各区域热门商品Top3相关推荐

  1. SparkSQL实战小项目之热门商品top3

    SparkSQL实战小项目之热门商品top3 一.说明及需求分析 二.准备测试数据 三.思路分析 四.编码实现 五.验证结果 一.说明及需求分析 软件及环境: centos7 + hive-2.3.3 ...

  2. 99.Spark大型电商项目-各区域热门商品统计-模块介绍

    目录 各区域热门商品统计 作业提交 大数据方向的职业发展规划 用户行为分析意义 本篇文章记录各区域热门商品统计-模块介绍. 各区域热门商品统计 需求:根据用户指定的日期范围,统计各个区域下的最热门的t ...

  3. 【VUE项目实战】56、商品添加功能(六)-提交添加的商品

    接上篇<55.商品添加功能(五)-商品内容模块> 上一篇我们完成了商品内容编辑模块的开发,也即是完成了商品所有的信息编辑,本篇我们就来开发提交商品所有信息到后台的功能. 一.要实现的效果 ...

  4. JavaWeb商城项目笔记--- Day1 (热门商品,热销商品)

    功能出现场景 在线的商场中,前端最近界面总会有一块区域用来显示销售量最高的,最新上架的和类似的这种的商品. 一些联想到的功能:热销,热评等 功能解决思路 核心还是对数据库进行查询,然后响应给前端信息, ...

  5. 【VUE项目实战】54、商品添加功能(四)-商品图片上传模块

    接上篇<53.商品添加功能(三)-商品参数及属性模块> 上一篇我们完成了商品参数和商品属性面板的开发,本篇我们来完成商品图片上传模块的开发. 一.要实现的效果 我们在商品图片页签,需要放置 ...

  6. js实战练习—锤子热门商品

    通过这三周的学习,通过实战练习来掌握对js的熟练程度和对HTML.CSS和CSS3知识的巩固和应用. 对于制作这个网页发现了一些我认为比较重要的事情: 重在实践,多加练习.正如这次,对于js函数传参以 ...

  7. Vue 项目实战五 参数管理 商品列表

    1.1 参数列表(展示动态参数可选项) 动态参数可选项展示及操作,在获取动态参数的方法中进行处理. //将获取到的数据中的attr_vals字符串转换为数组 res.data.forEach(item ...

  8. java 电商项目 搜索模块,SSH电商项目实战之十:商品类基本模块的搭建

    前面我们完成了与商品类别相关的业务逻辑,接下来我们开始做具体商品部分. 1. 数据库建表并映射Model 首先我们在数据库中新建一张表,然后使用逆向工程将表映射成Model类,表如下: SQL代码/* ...

  9. 微信小程序商城项目实战(第十一篇:商品收藏+历史浏览管理)

    商品收藏+历史浏览管理 商品收藏+历史浏览页面 分析 代码实现 效果图展示 商品收藏: 历史浏览: 商品收藏+历史浏览页面 共用一个页面 分析 顶部改为"商品收藏" 上方为导航栏: ...

最新文章

  1. 换发型app任性扣费?苹果app订阅任性扣费?怎么办?刚成功
  2. fin.is_open()与fin.open()有什么区别?
  3. Python机器学习及分析工具:Scipy篇
  4. 机器学习算法源码全解析(一)-带你深入理解随机森林(RandomForest)原理及如何防止 Dropout
  5. Python--三元表达式、列表推导式、生成器表达式、递归、匿名函数、内置函数...
  6. 38. 统计一个整数的二进制表示中bit为1的个数
  7. python用outlook自动发邮件_python使用两种发邮件的方式smtp和outlook示例
  8. centos6mysql配置_笔记:centos6 mysql配置测试
  9. 大火金九银十!秋季借势海报PSD分层模板,看谁最能俘获你的心
  10. Mutex对象是操作系统级?
  11. cocos2dx 3.1从零学习(四)——内存管理(错误案例分析)
  12. js加载flv格式视频
  13. 什么是NAT技术与代理服务器
  14. 鼠标在微信开发工具中消失(而在手机模拟器以外可以显示)
  15. 【STM32】HAL库 STM32CubeMX教程四---UART串口通信详解
  16. Google Guava EventBus 消息发布-订阅异步调用使用
  17. 硬盘柱面损坏怎么办_硬盘有坏道怎么修复?使用DiskGenius修复硬盘逻辑坏道的方法...
  18. [5-23]绿色精品软件每天更新[uc23整理]
  19. 古有陈天华万字血书抗沙俄,今有本剧蒻万字背包虐dp(01,完全,多重,分组,混合等各种背包详解 + 板子题+ 奇奇怪怪滴变式题)
  20. 【Spring教程】框架体系介绍

热门文章

  1. autojs问题汇总
  2. 【OpenFOAM】snappyHexMesh
  3. Grid++Report实现Web报表
  4. 手机计算机上输入错误是什么意思,手机计算器出错 原因竟是人性化设计
  5. python面向对象基础知识
  6. 寄生虫html链接,index.html
  7. 计算机科学专业的五种高薪职业选择,看看你适合哪种?
  8. 保研边缘人,参加夏令营有机会拿到优营吗?
  9. python+vue 餐饮食品安全监管投诉平台
  10. 举例说明层次分析的三大原则_文章写作:对比说明与举例说明