数据格式:

数据文件中每行数据采用下划线分隔数据

每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种

如果搜索关键字为 null,表示数据不是搜索数据

如果点击的品类 ID 和产品 ID 为-1,表示数据不是点击数据

针对于下单行为,一次可以下单多个商品,所以品类 ID 和产品 ID 可以是多个,id 之 间采用逗号分隔,如果本次不是下单行为,则数据采用 null 表示

支付行为和下单行为类似

需求:按照每个品类的点击、下单、支付的量来统计热门品类(先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数)

分析:数据可以统计成(品类,点击总数)(品类,下单总数)(品类,支付总数)这种格式来进行汇总,就简化成了WordCount问题

在实际的解决中我们可以进一步分析解决将数据转化成(品类,(点击,下单,支付))来解决

代码:

object demand {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("demand"))val rdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")// TODO 分别统计每个品类点击的次数,下单的次数和支付的次数: (品类,点击总数)(品类,下单总数)(品类,支付总数)val accumulator = new MyAccumulatorsc.register(accumulator,"demand")// 第一种 :将数据扁平化整理成(品类,(点击,下单,支付)),按key值进行累加即可
//        rdd.flatMap(
//            value => {
//                val values: Array[String] = value.split("_")
//                if (values(6) != "-1") {
//                    List((values(6), (1, 0, 0)))
//                } else if (values(8) != "null") {
//                    val ids: Array[String] = values(8).split(",")
//                    ids.map(
//                        id => (id, (0, 1, 0))
//                    )
//                } else if (values(10) != "null") {
//                    val ids: Array[String] = values(10).split(",")
//                    ids.map(
//                        id => (id, (0, 0, 1))
//                    )
//                }else {
//                    Nil // 空集合
//                }
//            }
//        ).reduceByKey(
//            (t1,t2) => {
//                (t1._1+t2._1,t1._2+t2._2,t1._3+t2._3)
//            }
//        ).sortBy(_._2,false).take(10).foreach(println)// 第二种:使用累加器来避免shufflerdd.foreach(value => {val values: Array[String] = value.split("_")if (values(6) != "-1") {accumulator.add(values(6),"click")} else if (values(8) != "null") {val ids: Array[String] = values(8).split(",")ids.foreach(id => accumulator.add(id,"order"))} else if (values(10) != "null") {val ids: Array[String] = values(10).split(",")ids.foreach(id => accumulator.add(id,"pay"))}})// 要自定义比较规则accumulator.value.map(_._2).toList.sortWith((l,r) =>{if (l.clickCnt > r.clickCnt){true}else if (l.clickCnt == r.clickCnt){if (l.orderCnt > r.orderCnt){true}else if (l.orderCnt == r.orderCnt){if (l.payCnt > r.payCnt){true}else false}else false}else false}).take(10).foreach(println)sc.stop()}// 辅助类case class UserVisitAction(cid:String,var clickCnt:Int,var orderCnt:Int,var payCnt:Int)// 自定义累加器class MyAccumulator extends AccumulatorV2[(String,String),mutable.Map[String, UserVisitAction]]{private var values: mutable.Map[String,UserVisitAction] = mutable.Map[String,UserVisitAction]()// 累加器是否为初始状态override def isZero: Boolean = values.isEmpty// 复制累加器override def copy(): AccumulatorV2[(String,String), mutable.Map[String,UserVisitAction]] = {new MyAccumulator()}// 重置累加器override def reset(): Unit = values.clear()// 向累加器中添加数据override def add(v: (String,String)): Unit = {val cid: String = v._1val action: String = v._2// 获取品类对应的UserVisitAction数据,从而来实现对相应操作的累加赋值val action1: UserVisitAction = values.getOrElse(cid, UserVisitAction(cid, 0, 0, 0))if (action == "click"){action1.clickCnt += 1}else if (action == "order"){action1.orderCnt += 1}else if (action == "pay"){action1.payCnt += 1}// 数据一定要进行更新values.update(cid,action1)}// 合并累加器override def merge(other: AccumulatorV2[(String,String), mutable.Map[String, UserVisitAction]]): Unit = {// 你将要输出的mapval map1 = this.values// 相对于输出的map以外的来自其他task任务返回的mapval map2 = other.value// 对结果进行merge合并map2.foreach{case (cid,use) => {val action: UserVisitAction = map1.getOrElse(cid, UserVisitAction(cid, 0, 0, 0))action.clickCnt += use.clickCntaction.orderCnt += use.orderCntaction.payCnt += use.payCnt// 一定要记得对数据更新到你要输出的map中去map1.update(cid,action)}}}// 返回累加器的结果override def value: mutable.Map[String, UserVisitAction] = values}
}

在解决问题时,由于spark有三种数据结构:RDD、累加器、广播变量,分别解决了一些问题,我们可以配合的使用它们来优化程序,由于计算时shuffle过程涉及磁盘IO,所有可以通过累加器的使用来避免shuffle提高效率。

spark案例:Top10 热门品类相关推荐

  1. 案例实操-Top10热门品类

    需求1:分别统计每个品类的点击次数,下单次数和支付次数 (品类,点击总数)(品类,下单总数)(品类,支付总数) 排名顺序如:点击总数>下单总数>支付总数 方案一 def main(args ...

  2. 50.Spark大型电商项目-用户访问session分析-top10热门品类之本地测试

    本篇文章记录用户访问session分析-top10热门品类之本地测试. 在测试的过程中,到很多问题. 问题一:二次排序需要序列化,否则会在程序运行的时候报错. public class Categor ...

  3. 43.Spark大型电商项目-用户访问session分析-top10热门品类之需求回顾以及实现思路分析

    目录 需求回顾 top10热门品类 二次排序 实现思路分析 本篇文章将记录用户访问session分析-top10热门品类之需求回顾以及实现思路分析. 需求回顾 top10热门品类 计算出来通过筛选条件 ...

  4. Spark Core项目实战(1) | 准备数据与计算Top10 热门品类(附完整项目代码及注释)

      大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...

  5. top10热门品类之需求以及实现思路分析

    一. 需求:top10热门品类 计算出来通过筛选条件的那些session,他们访问过的所有品类(点击.下单.支付),按照各个品类的点击.下单和支付次数,降序排序,获取前10个品类,也就是筛选条件下的那 ...

  6. 35.top10热门品类之需求回顾以及实现思路分析

    本文为<Spark大型电商项目实战> 系列文章之一,主要介绍session访问分析里的top10热门品类模块,本文主要进行需求分析和实现思路. 需求回顾:top10热门品类 计算出来通过筛 ...

  7. SparkCore项目实战 需求一Top10热门品类 需求二Top10热门品类下每个品类的Top10活跃用户统计 需求三计算页面单跳转换率

    目录 数据格式简介 需求一:Top10热门品类(普通算子实现) 优化:需求一(使用ReduceByKey进行预聚合) 优化:需求一(采用累加器,避免shuffle过程) 需求二:Top10热门品类下每 ...

  8. 大数据之Spark案例实操完整使用(第六章)

    大数据之Spark案例实操完整使用 一.案例一 1.准备数据 2.需求 1:Top10 热门品类 3.需求说明 方案一. 实现方案二 实现方案三 二 .需求实现 1.需求 2:Top10 热门品类中每 ...

  9. Spark 案例(依据电商网站的真实需求)

    目录 数据说明 需求1:Top10 热门品类 需求说明 实现方案一 需求分析 需求实现 实现方案二 需求分析 需求实现 实现方案三 需求分析 需求实现 需求 2:Top10 热门品类中每个品类的 To ...

最新文章

  1. SCD缓慢变化维拉链表
  2. http 请求默认时间_JMeter接口测试之HTTP请求默认值
  3. 反汇编基础-MSVC2012,2008,MinGw中控制台main函数入口特征
  4. erlang有前途吗_带有示例的Erlang概述
  5. iOS:Core Data 中的简单ORM
  6. 在哪里买铁甲格斗机器人_铁甲雄心Ⅱ火了 优必选“超变铁甲”掀起闯关赢免单热潮...
  7. jsp include指令标签
  8. 50秒开门,3分钟开走,特斯拉Model S就这样不翼而飞
  9. nginx日志中$request_body 十六进制字符(\\x22) 引号问题处理记录
  10. RAID 磁盘列阵与阵列卡
  11. 路由器怎么用自己的笔记本电脑进行配置
  12. Ubuntu16.04 开机开启小键盘数字键,时默认开NumLock灯
  13. opencv二值化的cv2.threshold函数
  14. 【图像去噪】基于小波变换(中值、硬阙值、软阙值)的图像去噪含Matlab源码
  15. nrm安装成功后但是不能使用
  16. 【夏目鬼鬼分享】RabbitMQ路由模式
  17. JDK环境变量配置-win10
  18. 简析NFT交易平台的发展历程及4F评估模型
  19. Android客户端+JavaEE后台服务器端+Mysql数据库(社交app-心情驿站)
  20. [转载]pAppLocale(微软AppLocale修改版,不会有乱码后遗症)+辅助配件

热门文章

  1. AI安防智能化发展至今还存在哪些问题?
  2. 计算机二级报名锁定了,职称计算机考试WindowsXP考点:锁定计算机
  3. hmm 求隐藏序列_隐马尔可夫模型HMM
  4. win10系统安装Oracle 11g时遇到[INS-13001]环境不满足最低要求解决办法
  5. ubuntu重启sshd
  6. 微软服务器阵列设置,HP Gen8服务器RAID阵列大于2TB时,安装Windows2016方法
  7. 运维必备——ELK日志分析系统
  8. 菜鸟飞行记——一、基础得不能再基础的知识记录
  9. Kong 网关使用入门
  10. linux shell命令对时间的处理(精确到秒、毫秒、纳秒)——筑梦之路