使用SparkSQL的电影分析
项目介绍
数据集介绍
使用MovieLens的名称为ml-25m.zip的数据集,使用的文件时movies.csv和ratings.csv,上述文件的下载地址为:
http://files.grouplens.org/datasets/movielens/ml-25m.zip
movies.csv
该文件是电影数据,对应的为维表数据,大小为2.89MB,包括6万多部电影,其数据格式为[movieId,title,genres],分别对应[电影id,电影名称,电影所属分类],样例数据如下所示:逗号分隔
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
ratings.csv
该文件为定影评分数据,对应为事实表数据,大小为646MB,其数据格式为:[userId,movieId,rating,timestamp],分别对应[用户id,电影id,评分,时间戳],样例数据如下所示:逗号分隔
1,296,5,1147880044
项目代码结构
需求分析
需求1:查找电影评分个数超过5000,且平均评分较高的前十部电影名称及其对应的平均评分
需求2:查找每个电影类别及其对应的平均评分
需求3:查找被评分次数较多的前十部电影
代码讲解
DemoMainApp
该类是程序执行的入口,主要是获取数据源,转换成DataFrame,并调用封装好的业务逻辑类。
object DemoMainApp {// 文件路径private val MOVIES_CSV_FILE_PATH = "file:///e:/movies.csv"private val RATINGS_CSV_FILE_PATH = "file:///e:/ratings.csv"def main(args: Array[String]): Unit = {// 创建spark sessionval spark = SparkSession.builder.master("local[4]").getOrCreate// schema信息val schemaLoader = new SchemaLoader// 读取Movie数据集val movieDF = readCsvIntoDataSet(spark, MOVIES_CSV_FILE_PATH, schemaLoader.getMovieSchema)// 读取Rating数据集val ratingDF = readCsvIntoDataSet(spark, RATINGS_CSV_FILE_PATH, schemaLoader.getRatingSchema)// 需求1:查找电影评分个数超过5000,且平均评分较高的前十部电影名称及其对应的平均评分val bestFilmsByOverallRating = new BestFilmsByOverallRating//bestFilmsByOverallRating.run(movieDF, ratingDF, spark)// 需求2:查找每个电影类别及其对应的平均评分val genresByAverageRating = new GenresByAverageRating//genresByAverageRating.run(movieDF, ratingDF, spark)// 需求3:查找被评分次数较多的前十部电影val mostRatedFilms = new MostRatedFilmsmostRatedFilms.run(movieDF, ratingDF, spark)spark.close()}/*** 读取数据文件,转成DataFrame** @param spark* @param path* @param schema* @return*/def readCsvIntoDataSet(spark: SparkSession, path: String, schema: StructType) = {val dataSet = spark.read.format("csv").option("header", "true").schema(schema).load(path)dataSet}
}
Entry
该类为实体类,封装了数据源的样例类和结果表的样例类
class Entry {}case class Movies(movieId: String, // 电影的idtitle: String, // 电影的标题genres: String // 电影类别)case class Ratings(userId: String, // 用户的idmovieId: String, // 电影的idrating: String, // 用户评分timestamp: String // 时间戳)// 需求1MySQL结果表
case class tenGreatestMoviesByAverageRating(movieId: String, // 电影的idtitle: String, // 电影的标题avgRating: String // 电影平均评分)// 需求2MySQL结果表
case class topGenresByAverageRating(genres: String, //电影类别avgRating: String // 平均评分)// 需求3MySQL结果表
case class tenMostRatedFilms(movieId: String, // 电影的idtitle: String, // 电影的标题ratingCnt: String // 电影被评分的次数)
SchemaLoader
该类封装了数据集的schema信息,主要用于读取数据源是指定schema信息
class SchemaLoader {// movies数据集schema信息private val movieSchema = new StructType().add("movieId", DataTypes.StringType, false).add("title", DataTypes.StringType, false).add("genres", DataTypes.StringType, false)// ratings数据集schema信息private val ratingSchema = new StructType().add("userId", DataTypes.StringType, false).add("movieId", DataTypes.StringType, false).add("rating", DataTypes.StringType, false).add("timestamp", DataTypes.StringType, false)def getMovieSchema: StructType = movieSchemadef getRatingSchema: StructType = ratingSchema
}
JDBCUtil
该类封装了连接MySQL的逻辑,主要用于连接MySQL,在业务逻辑代码中会使用该工具类获取MySQL连接,将结果数据写入到MySQL中。
object JDBCUtil {val dataSource = new ComboPooledDataSource()val user = "root"val password = "123qwe"val url = "jdbc:mysql://localhost:3306/mydb"dataSource.setUser(user)dataSource.setPassword(password)dataSource.setDriverClass("com.mysql.jdbc.Driver")dataSource.setJdbcUrl(url)dataSource.setAutoCommitOnClose(false)
// 获取连接def getQueryRunner(): Option[QueryRunner]={try {Some(new QueryRunner(dataSource))}catch {case e:Exception =>e.printStackTrace()None}}
}
需求1实现
BestFilmsByOverallRating
需求1实现的业务逻辑封装。该类有一个run()方法,主要是封装计算逻辑。
/*** 需求1:查找电影评分个数超过5000,且平均评分较高的前十部电影名称及其对应的平均评分*/
class BestFilmsByOverallRating extends Serializable {def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {import spark.implicits._// 将moviesDataset注册成表moviesDataset.createOrReplaceTempView("movies")// 将ratingsDataset注册成表ratingsDataset.createOrReplaceTempView("ratings")// 查询SQL语句val ressql1 ="""|WITH ratings_filter_cnt AS (|SELECT| movieId,| count( * ) AS rating_cnt,| avg( rating ) AS avg_rating|FROM| ratings|GROUP BY| movieId|HAVING| count( * ) >= 5000|),|ratings_filter_score AS (|SELECT| movieId, -- 电影id| avg_rating -- 电影平均评分|FROM ratings_filter_cnt|ORDER BY avg_rating DESC -- 平均评分降序排序|LIMIT 10 -- 平均分较高的前十部电影|)|SELECT| m.movieId,| m.title,| r.avg_rating AS avgRating|FROM| ratings_filter_score r|JOIN movies m ON m.movieId = r.movieId""".stripMarginval resultDS = spark.sql(ressql1).as[tenGreatestMoviesByAverageRating]// 打印数据resultDS.show(10)resultDS.printSchema()// 写入MySQLresultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))}/*** 获取连接,调用写入MySQL数据的方法** @param res*/private def insert2Mysql(res: tenGreatestMoviesByAverageRating): Unit = {lazy val conn = JDBCUtil.getQueryRunner()conn match {case Some(connection) => {upsert(res, connection)}case None => {println("Mysql连接失败")System.exit(-1)}}}/*** 封装将结果写入MySQL的方法* 执行写入操作** @param r* @param conn*/private def upsert(r: tenGreatestMoviesByAverageRating, conn: QueryRunner): Unit = {try {val sql =s"""|REPLACE INTO `ten_movies_averagerating`(|movieId,|title,|avgRating|)|VALUES|(?,?,?)""".stripMargin// 执行insert操作conn.update(sql,r.movieId,r.title,r.avgRating)} catch {case e: Exception => {e.printStackTrace()System.exit(-1)}}}
}
需求1结果
结果表建表语句
CREATE TABLE `ten_movies_averagerating` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',`movieId` int(11) NOT NULL COMMENT '电影id',`title` varchar(100) NOT NULL COMMENT '电影名称',`avgRating` decimal(10,2) NOT NULL COMMENT '平均评分',`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `movie_id_UNIQUE` (`movieId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
统计结果
平均评分最高的前十部电影如下:
movieId | title | avgRating |
---|---|---|
318 | Shawshank Redemption, The (1994) | 4.41 |
858 | Godfather, The (1972) | 4.32 |
50 | Usual Suspects, The (1995) | 4.28 |
1221 | Godfather: Part II, The (1974) | 4.26 |
527 | Schindler's List (1993) | 4.25 |
2019 | Seven Samurai (Shichinin no samurai) (1954) | 4.25 |
904 | Rear Window (1954) | 4.24 |
1203 | 12 Angry Men (1957) | 4.24 |
2959 | Fight Club (1999) | 4.23 |
1193 | One Flew Over the Cuckoo's Nest (1975) | 4.22 |
上述电影评分对应的电影中文名称为:
英文名称 | 中文名称 |
---|---|
Shawshank Redemption, The (1994) | 肖申克的救赎 |
Godfather, The (1972) | 教父1 |
Usual Suspects, The (1995) | 非常嫌疑犯 |
Godfather: Part II, The (1974) | 教父2 |
Schindler's List (1993) | 辛德勒的名单 |
Seven Samurai (Shichinin no samurai) (1954) | 七武士 |
Rear Window (1954) | 后窗 |
12 Angry Men (1957) | 十二怒汉 |
Fight Club (1999) | 搏击俱乐部 |
One Flew Over the Cuckoo's Nest (1975) | 飞越疯人院 |
需求2实现
GenresByAverageRating
需求2实现的业务逻辑封装。该类有一个run()方法,主要是封装计算逻辑。
*** 需求2:查找每个电影类别及其对应的平均评分*/
class GenresByAverageRating extends Serializable {def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {import spark.implicits._// 将moviesDataset注册成表moviesDataset.createOrReplaceTempView("movies")// 将ratingsDataset注册成表ratingsDataset.createOrReplaceTempView("ratings")val ressql2 ="""|WITH explode_movies AS (|SELECT| movieId,| title,| category|FROM| movies lateral VIEW explode ( split ( genres, "\\|" ) ) temp AS category|)|SELECT| m.category AS genres,| avg( r.rating ) AS avgRating|FROM| explode_movies m| JOIN ratings r ON m.movieId = r.movieId|GROUP BY| m.category| """.stripMarginval resultDS = spark.sql(ressql2).as[topGenresByAverageRating]// 打印数据resultDS.show(10)resultDS.printSchema()// 写入MySQLresultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))}/*** 获取连接,调用写入MySQL数据的方法** @param res*/private def insert2Mysql(res: topGenresByAverageRating): Unit = {lazy val conn = JDBCUtil.getQueryRunner()conn match {case Some(connection) => {upsert(res, connection)}case None => {println("Mysql连接失败")System.exit(-1)}}}/*** 封装将结果写入MySQL的方法* 执行写入操作** @param r* @param conn*/private def upsert(r: topGenresByAverageRating, conn: QueryRunner): Unit = {try {val sql =s"""|REPLACE INTO `genres_average_rating`(|genres,|avgRating|)|VALUES|(?,?)""".stripMargin// 执行insert操作conn.update(sql,r.genres,r.avgRating)} catch {case e: Exception => {e.printStackTrace()System.exit(-1)}}}
}
需求2结果
结果表建表语句
CREATE TABLE genres_average_rating (`id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',`genres` VARCHAR ( 100 ) NOT NULL COMMENT '电影类别',`avgRating` DECIMAL ( 10, 2 ) NOT NULL COMMENT '电影类别平均评分',`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY ( `id` ),
UNIQUE KEY `genres_UNIQUE` ( `genres` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;
统计结果
共有20个电影分类,每个电影分类的平均评分为:
genres | avgRating |
---|---|
Film-Noir | 3.93 |
War | 3.79 |
Documentary | 3.71 |
Crime | 3.69 |
Drama | 3.68 |
Mystery | 3.67 |
Animation | 3.61 |
IMAX | 3.6 |
Western | 3.59 |
Musical | 3.55 |
Romance | 3.54 |
Adventure | 3.52 |
Thriller | 3.52 |
Fantasy | 3.51 |
Sci-Fi | 3.48 |
Action | 3.47 |
Children | 3.43 |
Comedy | 3.42 |
(no genres listed) | 3.33 |
Horror | 3.29 |
电影分类对应的中文名称为:
分类 | 中文名称 |
---|---|
Film-Noir | 黑色电影 |
War | 战争 |
Documentary | 纪录片 |
Crime | 犯罪 |
Drama | 历史剧 |
Mystery | 推理 |
Animation | 动画片 |
IMAX | 巨幕电影 |
Western | 西部电影 |
Musical | 音乐 |
Romance | 浪漫 |
Adventure | 冒险 |
Thriller | 惊悚片 |
Fantasy | 魔幻电影 |
Sci-Fi | 科幻 |
Action | 动作 |
Children | 儿童 |
Comedy | 喜剧 |
(no genres listed) | 未分类 |
Horror | 恐怖 |
需求3实现
MostRatedFilms
需求3实现的业务逻辑封装。该类有一个run()方法,主要是封装计算逻辑。
/*** 需求3:查找被评分次数较多的前十部电影.*/
class MostRatedFilms extends Serializable {def run(moviesDataset: DataFrame, ratingsDataset: DataFrame,spark: SparkSession) = {import spark.implicits._// 将moviesDataset注册成表moviesDataset.createOrReplaceTempView("movies")// 将ratingsDataset注册成表ratingsDataset.createOrReplaceTempView("ratings")val ressql3 ="""|WITH rating_group AS (| SELECT| movieId,| count( * ) AS ratingCnt| FROM ratings| GROUP BY movieId|),|rating_filter AS (| SELECT| movieId,| ratingCnt| FROM rating_group| ORDER BY ratingCnt DESC| LIMIT 10|)|SELECT| m.movieId,| m.title,| r.ratingCnt|FROM| rating_filter r|JOIN movies m ON r.movieId = m.movieId|""".stripMarginval resultDS = spark.sql(ressql3).as[tenMostRatedFilms]// 打印数据resultDS.show(10)resultDS.printSchema()// 写入MySQLresultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))}/*** 获取连接,调用写入MySQL数据的方法** @param res*/private def insert2Mysql(res: tenMostRatedFilms): Unit = {lazy val conn = JDBCUtil.getQueryRunner()conn match {case Some(connection) => {upsert(res, connection)}case None => {println("Mysql连接失败")System.exit(-1)}}}/*** 封装将结果写入MySQL的方法* 执行写入操作** @param r* @param conn*/private def upsert(r: tenMostRatedFilms, conn: QueryRunner): Unit = {try {val sql =s"""|REPLACE INTO `ten_most_rated_films`(|movieId,|title,|ratingCnt|)|VALUES|(?,?,?)""".stripMargin// 执行insert操作conn.update(sql,r.movieId,r.title,r.ratingCnt)} catch {case e: Exception => {e.printStackTrace()System.exit(-1)}}}}
需求3结果
结果表创建语句
CREATE TABLE ten_most_rated_films (`id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',`movieId` INT ( 11 ) NOT NULL COMMENT '电影Id',`title` varchar(100) NOT NULL COMMENT '电影名称',`ratingCnt` INT(11) NOT NULL COMMENT '电影被评分的次数',`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY ( `id` ),
UNIQUE KEY `movie_id_UNIQUE` ( `movieId` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;
统计结果
movieId | title | ratingCnt |
---|---|---|
356 | Forrest Gump (1994) | 81491 |
318 | Shawshank Redemption, The (1994) | 81482 |
296 | Pulp Fiction (1994) | 79672 |
593 | Silence of the Lambs, The (1991) | 74127 |
2571 | Matrix, The (1999) | 72674 |
260 | Star Wars: Episode IV - A New Hope (1977) | 68717 |
480 | Jurassic Park (1993) | 64144 |
527 | Schindler's List (1993) | 60411 |
110 | Braveheart (1995) | 59184 |
2959 | Fight Club (1999) | 58773 |
评分次数较多的电影对应的中文名称为:
英文名称 | 中文名称 |
---|---|
Forrest Gump (1994) | 阿甘正传 |
Shawshank Redemption, The (1994) | 肖申克的救赎 |
Pulp Fiction (1994) | 低俗小说 |
Silence of the Lambs, The (1991) | 沉默的羔羊 |
Matrix, The (1999) | 黑客帝国 |
Star Wars: Episode IV - A New Hope (1977) | 星球大战 |
Jurassic Park (1993) | 侏罗纪公园 |
Schindler's List (1993) | 辛德勒的名单 |
Braveheart (1995) | 勇敢的心 |
Fight Club (1999) | 搏击俱乐部 |
总结
本文主要是基于SparkSQL对MovieLens数据集进行统计分析,完整实现了三个需求,并给对每个需求都给出了详细的代码实现和结果分析。本案例还原了企业使用SparkSQL进行实现数据统计的基本流程
使用SparkSQL的电影分析相关推荐
- 第十一篇|基于SparkSQL的电影分析项目实战
在之前的分享中,曾系统地介绍了Spark的基本原理和使用方式,感兴趣的可以翻看之前的分享文章.在本篇分享中,将介绍一个完整的项目案例,该案例会真实还原企业中SparkSQL的开发流程,手把手教你构建一 ...
- 豆瓣电影分析报告:大陆和港台到底差(cha)在哪里?
Python 2.7 IDE Pycharm 5.0.3 PyExcelerator 0.6.4a 可视化 Plotly 图片要是挂了 请看这里此文备份链接 前言 在上次爬完豆瓣的东西后,感觉锻(zh ...
- SparkSql之电影案例SQL编写
SparkSql之电影案例SQL编写 需求1:查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分,结果以下面字段结构输出,保存成任意文本格式(txt\csv)或者存储到数据库. ...
- 猫眼电影-分析及展示(Python+pycharts)
此篇文章承接(猫眼电影-爬取). 将电影数据储存到MySQL中后,发现评论人数和票房的数据当中存在汉字,后期不好分析,所以需要将汉字转化为数值. 保险起见,我先将films表里面的结构和数据复制了成了 ...
- 爬虫+数据分析+可视化大作业:基于大数据的高质量电影分析报告
主要目的是为了学习Scrapy与Sklearn而不是写论文,结论是瞎扯的,轻喷求求了 目录 摘要 数据爬虫程序设计和实现 Scrapy框架 Scrapy框架简介 Scrapy的组件 Scrapy的工作 ...
- 未明学院:学员来稿 | 2019年中国电影分析报告
作者 | L同学 Python数据分析11月班学员 从 2008年到2018年,中国电影票房以年均35%的速度迅速上升.2017年,中国电影市场更是一跃成为全球第二大电影市场. 而尽管2019年已经步 ...
- python猫眼电影分析_Python数据分析之猫眼电影TOP100
前言 如果大家经常阅读Python爬虫相关的公众号,都会是以爬虫+数据分析的形式展现的,这样很有趣,图表也很不错,今天了,我就来分享上一次在培训中的一个作品:猫眼电影爬虫及分析. 通过猫眼电影TOP1 ...
- 数据分析(一)豆瓣华语电影分析
在之前,我们已经用通过爬虫获取了豆瓣华语电影共33133部电影的数据,具体爬虫介绍请见之前的博文,爬虫实战(一)利用scrapy爬取豆瓣华语电影.本文对爬虫过程进行简要概述后,对这部分数据进行分析. ...
- python猫眼电影分析_抓取猫眼电影
注意:抓取内容之前一定要查看下Robots协议 1.准备工作 1>第一步,安装python,安装requests.json库. 2.抓取分析 1>接下来我们打开网页分析下猫眼电影排行榜 2 ...
最新文章
- 业余时间用哪里,哪里就有发展的可能
- 八个层面比较 Java 8, RxJava, Reactor
- python需要音语基础_英语基础一般,如何才能学习C语言编程和Python
- Android Studio 提示与技巧(官方文档翻译)
- 剑指OFFER之从二叉搜索树的后序遍历序列(九度OJ1367)
- boost::fusion::as_map用法的测试程序
- 4、oracle数据库的查询基础
- Jedis操作reids集群
- URL跳转与webview安全浅谈
- bgp高防-服务器单线、双线、三线、BGP线路哪个更好?
- FPGA资源之LUT
- One PUNCH Man——深度学习入门
- 【托业】【金山词霸】1-42待巩固词汇(包含首次背诵措词)
- 手机6120C 玩仙剑dos版
- APICloud平台常用技术点汇总详解
- linux 看硬件配置 命令2
- ip-guard产品功能介绍
- nodejs 读取本地文件
- 华为emui3.1 android,华为EMUI3.1开发版怎么升级?升级EMUI3.1开发版5.9.1的方法
- 批量将多个 PPT 幻灯片文件合并成单个 PPT 文件