项目介绍

数据集介绍

使用MovieLens的名称为ml-25m.zip的数据集,使用的文件时movies.csvratings.csv,上述文件的下载地址为:

http://files.grouplens.org/datasets/movielens/ml-25m.zip
  • movies.csv

该文件是电影数据,对应的为维表数据,大小为2.89MB,包括6万多部电影,其数据格式为[movieId,title,genres],分别对应[电影id,电影名称,电影所属分类],样例数据如下所示:逗号分隔

1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
  • ratings.csv

该文件为定影评分数据,对应为事实表数据,大小为646MB,其数据格式为:[userId,movieId,rating,timestamp],分别对应[用户id,电影id,评分,时间戳],样例数据如下所示:逗号分隔

1,296,5,1147880044

项目代码结构

需求分析

  • 需求1:查找电影评分个数超过5000,且平均评分较高的前十部电影名称及其对应的平均评分

  • 需求2:查找每个电影类别及其对应的平均评分

  • 需求3:查找被评分次数较多的前十部电影

代码讲解

  • DemoMainApp

该类是程序执行的入口,主要是获取数据源,转换成DataFrame,并调用封装好的业务逻辑类。

object DemoMainApp {// 文件路径private val MOVIES_CSV_FILE_PATH = "file:///e:/movies.csv"private val RATINGS_CSV_FILE_PATH = "file:///e:/ratings.csv"def main(args: Array[String]): Unit = {// 创建spark sessionval spark = SparkSession.builder.master("local[4]").getOrCreate// schema信息val schemaLoader = new SchemaLoader// 读取Movie数据集val movieDF = readCsvIntoDataSet(spark, MOVIES_CSV_FILE_PATH, schemaLoader.getMovieSchema)// 读取Rating数据集val ratingDF = readCsvIntoDataSet(spark, RATINGS_CSV_FILE_PATH, schemaLoader.getRatingSchema)// 需求1:查找电影评分个数超过5000,且平均评分较高的前十部电影名称及其对应的平均评分val bestFilmsByOverallRating = new BestFilmsByOverallRating//bestFilmsByOverallRating.run(movieDF, ratingDF, spark)// 需求2:查找每个电影类别及其对应的平均评分val genresByAverageRating = new GenresByAverageRating//genresByAverageRating.run(movieDF, ratingDF, spark)// 需求3:查找被评分次数较多的前十部电影val mostRatedFilms = new MostRatedFilmsmostRatedFilms.run(movieDF, ratingDF, spark)spark.close()}/*** 读取数据文件,转成DataFrame** @param spark* @param path* @param schema* @return*/def readCsvIntoDataSet(spark: SparkSession, path: String, schema: StructType) = {val dataSet = spark.read.format("csv").option("header", "true").schema(schema).load(path)dataSet}
}
  • Entry

该类为实体类,封装了数据源的样例类和结果表的样例类

class Entry {}case class Movies(movieId: String, // 电影的idtitle: String, // 电影的标题genres: String // 电影类别)case class Ratings(userId: String, // 用户的idmovieId: String, // 电影的idrating: String, // 用户评分timestamp: String // 时间戳)// 需求1MySQL结果表
case class tenGreatestMoviesByAverageRating(movieId: String, // 电影的idtitle: String, // 电影的标题avgRating: String // 电影平均评分)// 需求2MySQL结果表
case class topGenresByAverageRating(genres: String, //电影类别avgRating: String // 平均评分)// 需求3MySQL结果表
case class tenMostRatedFilms(movieId: String, // 电影的idtitle: String, // 电影的标题ratingCnt: String // 电影被评分的次数)
  • SchemaLoader

该类封装了数据集的schema信息,主要用于读取数据源是指定schema信息

class SchemaLoader {// movies数据集schema信息private val movieSchema = new StructType().add("movieId", DataTypes.StringType, false).add("title", DataTypes.StringType, false).add("genres", DataTypes.StringType, false)// ratings数据集schema信息private val ratingSchema = new StructType().add("userId", DataTypes.StringType, false).add("movieId", DataTypes.StringType, false).add("rating", DataTypes.StringType, false).add("timestamp", DataTypes.StringType, false)def getMovieSchema: StructType = movieSchemadef getRatingSchema: StructType = ratingSchema
}
  • JDBCUtil

该类封装了连接MySQL的逻辑,主要用于连接MySQL,在业务逻辑代码中会使用该工具类获取MySQL连接,将结果数据写入到MySQL中。

object JDBCUtil {val dataSource = new ComboPooledDataSource()val user = "root"val password = "123qwe"val url = "jdbc:mysql://localhost:3306/mydb"dataSource.setUser(user)dataSource.setPassword(password)dataSource.setDriverClass("com.mysql.jdbc.Driver")dataSource.setJdbcUrl(url)dataSource.setAutoCommitOnClose(false)
// 获取连接def getQueryRunner(): Option[QueryRunner]={try {Some(new QueryRunner(dataSource))}catch {case e:Exception =>e.printStackTrace()None}}
}

需求1实现

  • BestFilmsByOverallRating

需求1实现的业务逻辑封装。该类有一个run()方法,主要是封装计算逻辑。

/*** 需求1:查找电影评分个数超过5000,且平均评分较高的前十部电影名称及其对应的平均评分*/
class BestFilmsByOverallRating extends Serializable {def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {import spark.implicits._// 将moviesDataset注册成表moviesDataset.createOrReplaceTempView("movies")// 将ratingsDataset注册成表ratingsDataset.createOrReplaceTempView("ratings")// 查询SQL语句val ressql1 ="""|WITH ratings_filter_cnt AS (|SELECT|     movieId,|     count( * ) AS rating_cnt,|     avg( rating ) AS avg_rating|FROM|     ratings|GROUP BY|     movieId|HAVING|     count( * ) >= 5000|),|ratings_filter_score AS (|SELECT|     movieId, -- 电影id|     avg_rating -- 电影平均评分|FROM ratings_filter_cnt|ORDER BY avg_rating DESC -- 平均评分降序排序|LIMIT 10 -- 平均分较高的前十部电影|)|SELECT|    m.movieId,|    m.title,|    r.avg_rating AS avgRating|FROM|   ratings_filter_score r|JOIN movies m ON m.movieId = r.movieId""".stripMarginval resultDS = spark.sql(ressql1).as[tenGreatestMoviesByAverageRating]// 打印数据resultDS.show(10)resultDS.printSchema()// 写入MySQLresultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))}/*** 获取连接,调用写入MySQL数据的方法** @param res*/private def insert2Mysql(res: tenGreatestMoviesByAverageRating): Unit = {lazy val conn = JDBCUtil.getQueryRunner()conn match {case Some(connection) => {upsert(res, connection)}case None => {println("Mysql连接失败")System.exit(-1)}}}/*** 封装将结果写入MySQL的方法* 执行写入操作** @param r* @param conn*/private def upsert(r: tenGreatestMoviesByAverageRating, conn: QueryRunner): Unit = {try {val sql =s"""|REPLACE INTO `ten_movies_averagerating`(|movieId,|title,|avgRating|)|VALUES|(?,?,?)""".stripMargin// 执行insert操作conn.update(sql,r.movieId,r.title,r.avgRating)} catch {case e: Exception => {e.printStackTrace()System.exit(-1)}}}
}

需求1结果

  • 结果表建表语句

CREATE TABLE `ten_movies_averagerating` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',`movieId` int(11) NOT NULL COMMENT '电影id',`title` varchar(100) NOT NULL COMMENT '电影名称',`avgRating` decimal(10,2) NOT NULL COMMENT '平均评分',`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `movie_id_UNIQUE` (`movieId`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;
  • 统计结果

平均评分最高的前十部电影如下:

movieId title avgRating
318 Shawshank Redemption, The (1994) 4.41
858 Godfather, The (1972) 4.32
50 Usual Suspects, The (1995) 4.28
1221 Godfather: Part II, The (1974) 4.26
527 Schindler's List (1993) 4.25
2019 Seven Samurai (Shichinin no samurai) (1954) 4.25
904 Rear Window (1954) 4.24
1203 12 Angry Men (1957) 4.24
2959 Fight Club (1999) 4.23
1193 One Flew Over the Cuckoo's Nest (1975) 4.22

上述电影评分对应的电影中文名称为:

英文名称 中文名称
Shawshank Redemption, The (1994) 肖申克的救赎
Godfather, The (1972) 教父1
Usual Suspects, The (1995) 非常嫌疑犯
Godfather: Part II, The (1974) 教父2
Schindler's List (1993) 辛德勒的名单
Seven Samurai (Shichinin no samurai)  (1954) 七武士
Rear Window (1954) 后窗
12 Angry Men (1957) 十二怒汉
Fight Club (1999) 搏击俱乐部
One Flew Over the Cuckoo's Nest (1975) 飞越疯人院

需求2实现

  • GenresByAverageRating

需求2实现的业务逻辑封装。该类有一个run()方法,主要是封装计算逻辑。

*** 需求2:查找每个电影类别及其对应的平均评分*/
class GenresByAverageRating extends Serializable {def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {import spark.implicits._// 将moviesDataset注册成表moviesDataset.createOrReplaceTempView("movies")// 将ratingsDataset注册成表ratingsDataset.createOrReplaceTempView("ratings")val ressql2 ="""|WITH explode_movies AS (|SELECT| movieId,| title,| category|FROM| movies lateral VIEW explode ( split ( genres, "\\|" ) ) temp AS category|)|SELECT| m.category AS genres,| avg( r.rating ) AS avgRating|FROM| explode_movies m| JOIN ratings r ON m.movieId = r.movieId|GROUP BY| m.category| """.stripMarginval resultDS = spark.sql(ressql2).as[topGenresByAverageRating]// 打印数据resultDS.show(10)resultDS.printSchema()// 写入MySQLresultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))}/*** 获取连接,调用写入MySQL数据的方法** @param res*/private def insert2Mysql(res: topGenresByAverageRating): Unit = {lazy val conn = JDBCUtil.getQueryRunner()conn match {case Some(connection) => {upsert(res, connection)}case None => {println("Mysql连接失败")System.exit(-1)}}}/*** 封装将结果写入MySQL的方法* 执行写入操作** @param r* @param conn*/private def upsert(r: topGenresByAverageRating, conn: QueryRunner): Unit = {try {val sql =s"""|REPLACE INTO `genres_average_rating`(|genres,|avgRating|)|VALUES|(?,?)""".stripMargin// 执行insert操作conn.update(sql,r.genres,r.avgRating)} catch {case e: Exception => {e.printStackTrace()System.exit(-1)}}}
}

需求2结果

  • 结果表建表语句

CREATE TABLE genres_average_rating (`id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',`genres` VARCHAR ( 100 ) NOT NULL COMMENT '电影类别',`avgRating` DECIMAL ( 10, 2 ) NOT NULL COMMENT '电影类别平均评分',`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY ( `id` ),
UNIQUE KEY `genres_UNIQUE` ( `genres` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;
  • 统计结果

共有20个电影分类,每个电影分类的平均评分为:

genres avgRating
Film-Noir 3.93
War 3.79
Documentary 3.71
Crime 3.69
Drama 3.68
Mystery 3.67
Animation 3.61
IMAX 3.6
Western 3.59
Musical 3.55
Romance 3.54
Adventure 3.52
Thriller 3.52
Fantasy 3.51
Sci-Fi 3.48
Action 3.47
Children 3.43
Comedy 3.42
(no genres listed) 3.33
Horror 3.29

电影分类对应的中文名称为:

分类 中文名称
Film-Noir 黑色电影
War 战争
Documentary 纪录片
Crime 犯罪
Drama 历史剧
Mystery 推理
Animation 动画片
IMAX 巨幕电影
Western 西部电影
Musical 音乐
Romance 浪漫
Adventure 冒险
Thriller 惊悚片
Fantasy 魔幻电影
Sci-Fi 科幻
Action 动作
Children 儿童
Comedy 喜剧
(no genres listed) 未分类
Horror 恐怖

需求3实现

  • MostRatedFilms

    需求3实现的业务逻辑封装。该类有一个run()方法,主要是封装计算逻辑。

/*** 需求3:查找被评分次数较多的前十部电影.*/
class MostRatedFilms extends Serializable {def run(moviesDataset: DataFrame, ratingsDataset: DataFrame,spark: SparkSession) = {import spark.implicits._// 将moviesDataset注册成表moviesDataset.createOrReplaceTempView("movies")// 将ratingsDataset注册成表ratingsDataset.createOrReplaceTempView("ratings")val ressql3 ="""|WITH rating_group AS (|    SELECT|       movieId,|       count( * ) AS ratingCnt|    FROM ratings|    GROUP BY movieId|),|rating_filter AS (|    SELECT|       movieId,|       ratingCnt|    FROM rating_group|    ORDER BY ratingCnt DESC|    LIMIT 10|)|SELECT|    m.movieId,|    m.title,|    r.ratingCnt|FROM|    rating_filter r|JOIN movies m ON r.movieId = m.movieId|""".stripMarginval resultDS = spark.sql(ressql3).as[tenMostRatedFilms]// 打印数据resultDS.show(10)resultDS.printSchema()// 写入MySQLresultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))}/*** 获取连接,调用写入MySQL数据的方法** @param res*/private def insert2Mysql(res: tenMostRatedFilms): Unit = {lazy val conn = JDBCUtil.getQueryRunner()conn match {case Some(connection) => {upsert(res, connection)}case None => {println("Mysql连接失败")System.exit(-1)}}}/*** 封装将结果写入MySQL的方法* 执行写入操作** @param r* @param conn*/private def upsert(r: tenMostRatedFilms, conn: QueryRunner): Unit = {try {val sql =s"""|REPLACE INTO `ten_most_rated_films`(|movieId,|title,|ratingCnt|)|VALUES|(?,?,?)""".stripMargin// 执行insert操作conn.update(sql,r.movieId,r.title,r.ratingCnt)} catch {case e: Exception => {e.printStackTrace()System.exit(-1)}}}}

需求3结果

  • 结果表创建语句

CREATE TABLE ten_most_rated_films (`id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',`movieId` INT ( 11 ) NOT NULL COMMENT '电影Id',`title` varchar(100) NOT NULL COMMENT '电影名称',`ratingCnt` INT(11) NOT NULL COMMENT '电影被评分的次数',`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY ( `id` ),
UNIQUE KEY `movie_id_UNIQUE` ( `movieId` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;
  • 统计结果

movieId title ratingCnt
356 Forrest Gump (1994) 81491
318 Shawshank Redemption, The (1994) 81482
296 Pulp Fiction (1994) 79672
593 Silence of the Lambs, The (1991) 74127
2571 Matrix, The (1999) 72674
260 Star Wars: Episode IV - A New Hope (1977) 68717
480 Jurassic Park (1993) 64144
527 Schindler's List (1993) 60411
110 Braveheart (1995) 59184
2959 Fight Club (1999) 58773

评分次数较多的电影对应的中文名称为:

英文名称 中文名称
Forrest Gump (1994) 阿甘正传
Shawshank Redemption, The (1994) 肖申克的救赎
Pulp Fiction (1994) 低俗小说
Silence of the Lambs, The (1991) 沉默的羔羊
Matrix, The (1999) 黑客帝国
Star Wars: Episode IV - A New Hope (1977) 星球大战
Jurassic Park (1993) 侏罗纪公园
Schindler's List (1993) 辛德勒的名单
Braveheart (1995) 勇敢的心
Fight Club (1999) 搏击俱乐部

总结

本文主要是基于SparkSQL对MovieLens数据集进行统计分析,完整实现了三个需求,并给对每个需求都给出了详细的代码实现和结果分析。本案例还原了企业使用SparkSQL进行实现数据统计的基本流程

使用SparkSQL的电影分析相关推荐

  1. 第十一篇|基于SparkSQL的电影分析项目实战

    在之前的分享中,曾系统地介绍了Spark的基本原理和使用方式,感兴趣的可以翻看之前的分享文章.在本篇分享中,将介绍一个完整的项目案例,该案例会真实还原企业中SparkSQL的开发流程,手把手教你构建一 ...

  2. 豆瓣电影分析报告:大陆和港台到底差(cha)在哪里?

    Python 2.7 IDE Pycharm 5.0.3 PyExcelerator 0.6.4a 可视化 Plotly 图片要是挂了 请看这里此文备份链接 前言 在上次爬完豆瓣的东西后,感觉锻(zh ...

  3. SparkSql之电影案例SQL编写

    SparkSql之电影案例SQL编写 需求1:查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分,结果以下面字段结构输出,保存成任意文本格式(txt\csv)或者存储到数据库. ...

  4. 猫眼电影-分析及展示(Python+pycharts)

    此篇文章承接(猫眼电影-爬取). 将电影数据储存到MySQL中后,发现评论人数和票房的数据当中存在汉字,后期不好分析,所以需要将汉字转化为数值. 保险起见,我先将films表里面的结构和数据复制了成了 ...

  5. 爬虫+数据分析+可视化大作业:基于大数据的高质量电影分析报告

    主要目的是为了学习Scrapy与Sklearn而不是写论文,结论是瞎扯的,轻喷求求了 目录 摘要 数据爬虫程序设计和实现 Scrapy框架 Scrapy框架简介 Scrapy的组件 Scrapy的工作 ...

  6. 未明学院:学员来稿 | 2019年中国电影分析报告

    作者 | L同学 Python数据分析11月班学员 从 2008年到2018年,中国电影票房以年均35%的速度迅速上升.2017年,中国电影市场更是一跃成为全球第二大电影市场. 而尽管2019年已经步 ...

  7. python猫眼电影分析_Python数据分析之猫眼电影TOP100

    前言 如果大家经常阅读Python爬虫相关的公众号,都会是以爬虫+数据分析的形式展现的,这样很有趣,图表也很不错,今天了,我就来分享上一次在培训中的一个作品:猫眼电影爬虫及分析. 通过猫眼电影TOP1 ...

  8. 数据分析(一)豆瓣华语电影分析

    在之前,我们已经用通过爬虫获取了豆瓣华语电影共33133部电影的数据,具体爬虫介绍请见之前的博文,爬虫实战(一)利用scrapy爬取豆瓣华语电影.本文对爬虫过程进行简要概述后,对这部分数据进行分析. ...

  9. python猫眼电影分析_抓取猫眼电影

    注意:抓取内容之前一定要查看下Robots协议 1.准备工作 1>第一步,安装python,安装requests.json库. 2.抓取分析 1>接下来我们打开网页分析下猫眼电影排行榜 2 ...

最新文章

  1. 业余时间用哪里,哪里就有发展的可能
  2. 八个层面比较 Java 8, RxJava, Reactor
  3. python需要音语基础_英语基础一般,如何才能学习C语言编程和Python
  4. Android Studio 提示与技巧(官方文档翻译)
  5. 剑指OFFER之从二叉搜索树的后序遍历序列(九度OJ1367)
  6. boost::fusion::as_map用法的测试程序
  7. 4、oracle数据库的查询基础
  8. Jedis操作reids集群
  9. URL跳转与webview安全浅谈
  10. bgp高防-服务器单线、双线、三线、BGP线路哪个更好?
  11. FPGA资源之LUT
  12. One PUNCH Man——深度学习入门
  13. 【托业】【金山词霸】1-42待巩固词汇(包含首次背诵措词)
  14. 手机6120C 玩仙剑dos版
  15. APICloud平台常用技术点汇总详解
  16. linux 看硬件配置 命令2
  17. ip-guard产品功能介绍
  18. nodejs 读取本地文件
  19. 华为emui3.1 android,华为EMUI3.1开发版怎么升级?升级EMUI3.1开发版5.9.1的方法
  20. 批量将多个 PPT 幻灯片文件合并成单个 PPT 文件

热门文章

  1. sql获取当前时间前后5天、月的方式
  2. 数字签名技术以及RSA算法的原理实现
  3. ChatGPT的使用感受
  4. 红帽8.5---yum源安装与配置
  5. python多进程存储数据丢失的存储器是_python查漏补缺 --- 模块、文件、异常
  6. 讲一下prototype是什么东西,原型链的理解,什么时候用prototype?
  7. 如何设置hosts文件屏蔽网址
  8. B. 豚鼠的定居(模拟)
  9. 2G/3G核心网演进与网络结构
  10. css定位元素-随心记