文章目录

  • 1、介绍
  • 2、业务统计
  • 3、代码实现

1、介绍

  • Spark SQL有三种不同实现方式:(1)使用DataFrame与RDD结合的方式。(2)纯粹使用DataFrame的方式。(3)使用DataSet的方式。本文仅介绍第二种方式,其他方式可以参考源码(见底部)。
  • DataFrameAPI是从Spark1.3开始就有的,它是一种以RDD为基础的分布式无类型数据集,类似于传统数据库中的二维表格。DataFrame 与 RDD 的注意区别在于,前者带有schema 元信息,即 DataFrame 表示的二维表数据集的每一列都带有名称和类型。
  • DataSetAPI 是从 Spark1.6 版本提出的,DataSet 是强类型,而 DataFrame 实际上是Dataset[Row]。DataSet 是 lazy 级别的,Transformation 级别的算子作用于 DataSet 会得到一个新的 DataSet。当 Action 算子被调用时,Spark 的查询优化器会优化 Transformation 算子形成的逻辑计划,并生成一个物理计划,该物理计划可以通过并行和分布式的方式来执行。

2、业务统计

  • 统计某特定电影观看者中男性和女性不同年龄分别有多少人。
  • 统计电影中平均得分最高(口碑最好)的电影及观看人数最高的电影(流行度最高)。

3、代码实现

1)数据读取。

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MovieUsersAnalyzerDataFrame")/*** SparkSession统一了Spark SQL执行时候的不同的上下文环境,也就是说Spark SQL无论运行在那种环境下我们都可以只使用* SparkSession这样一个统一的编程入口来处理DataFrame和DataSet编程,不需要关注底层是否有Hive等。*/val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()//从SparkSession获得的上下文,这是因为我们读原生文件的时候或者实现一些Spark SQL目前还不支持的功能的时候需要使用SparkContextval sc: SparkContext = spark.sparkContextval dataPath: String = "hdfs://hadoop3:8020/input/movieRecom/moviedata/medium/"val outputDir: String = "hdfs://hadoop3:8020/out/movieRecom_out2"val usersRDD: RDD[String] = sc.textFile(dataPath + "users.dat")val moviesRDD: RDD[String] = sc.textFile(dataPath + "movies.dat")val occupationsRDD: RDD[String] = sc.textFile(dataPath + "occupations.dat")val ratingsRDD: RDD[String] = sc.textFile(dataPath + "ratings.dat")val ratings: RDD[(String, String, String)] = ratingsRDD.map(_.split("::")).map(x => (x(0), x(1), x(2))).cache()

2)统计某特定电影观看者中男性和女性不同年龄分别有多少人。

 println("通过DataFrame实现某特定电影观看者中男性和女性不同年龄分别有多少人?")// 使用Struct方式把Users的数据格式化,即在RDD的基础上增加数据的元数据信息val schemaforusers: StructType = StructType("UserID::Gender::Age::OccupationID::Zip-code".split("::").map(column => StructField(column, StringType, true)))// 把我们的每一条数据变成以Row为单位的数据val usersRDDRows: RDD[Row] = usersRDD.map(_.split("::")).map(line => Row(line(0).trim, line(1).trim, line(2).trim, line(3).trim, line(4).trim))// 结合Row和StructType的元数据信息基于RDD创建DataFrame,这个时候RDD就有了元数据信息的描述val usersDataFrame: DataFrame = spark.createDataFrame(usersRDDRows, schemaforusers)val schemaforratings: StructType = StructType("UserID::MovieID".split("::").map(column => StructField(column, StringType, true))).add("Rating", DoubleType, true).add("Timestamp", StringType, true)val ratingsRDDRows: RDD[Row] = ratingsRDD.map(_.split("::")).map(line => Row(line(0).trim, line(1).trim, line(2).trim.toDouble, line(3).trim))val ratingsDataFrame: DataFrame = spark.createDataFrame(ratingsRDDRows, schemaforratings)//使用Struct方式把Users的数据格式化,即在RDD的基础上增加数据的元数据信息val schemaformovies: StructType = StructType("MovieID::Title::Genres".split("::").map(column => StructField(column, StringType, true)))//把我们的每一条数据变成以Row为单位的数据val moviesRDDRows: RDD[Row] = moviesRDD.map(_.split("::")).map(line => Row(line(0).trim,line(1).trim, line(2).trim))//结合Row和StructType的元数据信息基于RDD创建DataFrame,这个时候RDD就有了元数据信息的描述val moviesDataFrame: DataFrame = spark.createDataFrame(moviesRDDRows, schemaformovies)ratingsDataFrame.filter(s"MovieID = 1193") // 这里能够直接指定MovieID的原因是DataFrame中有该元数据信息!.join(usersDataFrame, "UserID") // Join的时候直接指定基于UserID进行Join,这相对于原生的RDD操作而言更加方便快捷.select("Gender", "Age")  // 直接通过元数据信息中的Gender和Age进行数据的筛选.groupBy("Gender", "Age") // 直接通过元数据信息中的Gender和Age进行数据的groupBy操作.count()  // 基于groupBy分组信息进行count统计操作.show(10) // 显示出分组统计后的前10条信息

3) 统计电影中平均得分最高(口碑最好)的电影及观看人数最高的电影(流行度最高)。
代码中第5行可以查看Spark SQL解析DataFrame的过程。

  • 第一,经sql解析器词法分析生成未解析的逻辑计划,从[UserID#4, MovieID#5, Rating#6, Timestamp#7]中投影选择未解析的两列数据:电影ID,评分数据。
  • 第二,通过语法分析器,形成解析以后的逻辑计划。取到电影ID、评分数据,即 [MovieID#5, Rating#6]。
  • 第三,经优化器进行优化,生成优化以后的逻辑计划。这里仅做了 select 简单操作,不用优化。
  • 第四,然后通过 Spark 计划,生成物理计划。
1   import spark.sqlContext.implicits._
2   println("通过纯粹使用DataFrame方式计算所有电影中平均得分最高(口碑最好)的电影TopN2:")
3   ratingsDataFrame.select("MovieID", "Rating").groupBy("MovieID")
4      .avg("Rating").orderBy($"avg(Rating)".desc).show(10)
5   ratingsDataFrame.select("MovieID", "Rating").explain(true)
6
7   println("纯粹通过DataFrame的方式计算最流行电影即所有电影中粉丝或者观看人数最多(最流行电影)的电影TopN:")
8   ratingsDataFrame.groupBy("MovieID").count()
9      .orderBy($"count".desc).show(10)

4) 源码和数据
https://github.com/fengqijie001/movieRecommendation

希望可以帮到各位,不当之处,请多指教~?

基于Spark实现电影点评系统用户行为分析—DataFrame篇(二)相关推荐

  1. 基于Spark实现电影点评系统用户行为分析—RDD篇(一)

    文章目录 1.项目背景 2.数据描述 3.代码实现 1.项目背景 电影推荐系统(MovieLens)是美国明尼苏达大学(Minnesota)计算机科学与工程学院的GroupLens项目组创办的,是一个 ...

  2. Spark实战电影点评系统(一)

    一.通过RDD实战电影点评系统 日常的数据来源有很多渠道,如网络爬虫.网页埋点.系统日志等.下面的案例中使用的是用户观看电影和点评电影的行为数据,数据来源于网络上的公开数据,共有3个数据文件:uers ...

  3. 电影点评系统论文java_毕业设计(论文)-基于web的电影点评系统分析与设计.docx...

    毕业设计(论文) 论文题目 基于web的电影点评系统分析与设计 thesis Topic Movie reviews system analysis and design based on web A ...

  4. Spark商业案例与性能调优实战100课》第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析

    Spark商业案例与性能调优实战100课>第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析 package com.dt.spark.coresimport org.apa ...

  5. 基于Spark的电影推荐系统(推荐系统~1)

    第四部分-推荐系统-项目介绍 行业背景: 快速:Apache Spark以内存计算为核心 通用 :一站式解决各个问题,ADHOC SQL查询,流计算,数据挖掘,图计算 完整的生态圈 只要掌握Spark ...

  6. 电影推荐系统 python简书_基于Spark的电影推荐系统(实战简介)

    ## 写在前面 一直不知道这个专栏该如何开始写,思来想去,还是暂时把自己对这个项目的一些想法 和大家分享 的形式来展现.有什么问题,欢迎大家一起留言讨论. 这个项目的源代码是在https://gith ...

  7. 基于Spark的电影推荐系统(电影网站)

    第一部分-电影网站: 软件架构: SpringBoot+Mybatis+JSP 项目描述:主要实现电影网站的展现 和 用户的所有动作的地方 技术选型: 技术 名称 官网 Spring Boot 容器 ...

  8. 基于Spark的电影推荐系统(推荐系统~5)

    第四部分-推荐系统-离线推荐 本模块基于第4节得到的模型,开始为用户做离线推荐,推荐用户最有可能喜爱的5部电影. 说明几点 1.主要分为两个模块.其一是为 单个随机用户 做推荐,其二是为 所有用户做推 ...

  9. 基于ssm的电影购票系统(源代码+数据库) 618

    部分代码地址 https://gitee.com/ynwynwyn/movie-puchase-public 基于ssm的电影购票系统(源代码+数据库) 一.系统介绍 本项目分为管理员与普通用户两种角 ...

最新文章

  1. loglevel python 不输出_Python 通过 Celery 框架实现分布式任务队列!
  2. 全面介绍Windows内存管理机制及C++内存分配实例(一):进程空间
  3. 创建存储过程向表中循环加入数据
  4. 360推出国内首个工业互联网安全态势感知系统
  5. php if变量满足数组,在php中使用if()查看数组中的日期是否等于变量中存储的日期...
  6. 邓小铁:博弈论研究中的学术快乐
  7. 基于文本数据的情感分析系统
  8. 爬取豆瓣电影top250
  9. 系统集成项目管理师 高项论文 项目进度管理
  10. python刷抖音_Python刷抖音脚本
  11. 英语写作翻译-形容词(一)
  12. 了解iOS各个版本新特性总结
  13. Android 百度地图--定位、周边搜索
  14. 阿里巴巴社招笔试题——多线程打印
  15. python+openCV使用SIFT算法实现印章的总相似度检测
  16. python_第一节课_python基础语法
  17. 音频提取以及人声处理
  18. python气象学_Python气象绘图教程(二)
  19. 亚马逊企业文化交流分享
  20. 给鼠标右键添加上帝模式,让上帝随叫随到的操作

热门文章

  1. 人脸反光识别和读数识别_云端人脸识别-人脸识别SDK+API-人脸识别闸机解决方案...
  2. 数字孪生城市建设标准体系
  3. android扫描不到手机蓝牙,Android上的蓝牙:StartDiscovery无法正常工作。无法扫描设备...
  4. 西北工业大学计算机学院张磊,西工大校友张磊当选2018年IEEE Fellow
  5. C语言--期末前夜刷题计划(吉林大学高级语言程序设计超星慕课作业题为主)
  6. Gradle接入checkstyle代码风格检查插件
  7. mysql中checktable语句来_MySQL的命令check table用法
  8. 计算机网络安全第一课
  9. Flutter第三方库
  10. 水域智慧监测,水环境监测的全新革命!