Spark综合大作业:RDD编程初级实践
实验配置:操作系统:Ubuntu16.04 | 环境:Spark版本:2.4.0 | 软件:Python版本:3.4.3。

文章目录

  • 一、实验目的
  • 二、实验平台
  • 三、实验内容和要求
    • 1、pyspark交互式编程
    • 2.编写独立应用程序实现数据去重
    • 3.编写独立应用程序实现求平均值问题
  • 四、环境介绍
  • 五、实验步骤
    • 2、编写独立应用程序实现数据去重
    • 3、编写独立应用程序实现求平均值问题
  • 六、经验总结
  • 七、参考文献

一、实验目的

(1)熟悉Spark的RDD基本操作及键值对操作;
(2)熟悉使用RDD编程解决实际具体问题的方法。

二、实验平台

本次大作业的实验是操作系统:Ubuntu16.04,Spark版本:2.4.0,Python版本:3.4.3。

三、实验内容和要求

1、pyspark交互式编程

本作业提供分析数据data.txt,该数据集包含了某大学计算机系的成绩,数据格式如下所示:
Tom,DataBase,80
Tom,Algorithm,50
Tom,DataStructure,60
Jim,DataBase,90
Jim,Algorithm,60
Jim,DataStructure,80
……
请根据给定的实验数据,在pyspark中通过编程来计算以下内容:
(1)该系总共有多少学生;
(2)该系共开设了多少门课程;
(3)Tom同学的总成绩平均分是多少;
(4)求每名同学的选修的课程门数;
(5)该系DataBase课程共有多少人选修;
(6)各门课程的平均分是多少;
(7)使用累加器计算共有多少人选了DataBase这门课。

2.编写独立应用程序实现数据去重

对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。本文给出门课的成绩(A.txt、B.txt)下面是输入文件和输出文件的一个样例,供参考。
输入文件A的样例如下:
20200101 x
20200102 y
20200103 x
20200104 y
20200105 z
20200106 z
输入文件B的样例如下:
20200101 y
20200102 y
20200103 x
20200104 z
20200105 y
根据输入的文件A和B合并得到的输出文件C的样例如下:
20200101 x
20200101 y
20200102 y
20200103 x
20200104 y
20200104 z
20200105 y
20200105 z
20200106 z

3.编写独立应用程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。本文给出门课的成绩(Algorithm.txt、Database.txt、Python.txt),下面是输入文件和输出文件的一个样例,供参考。
Algorithm成绩:
小明 92
小红 87
小新 82
小丽 90
Database成绩:
小明 95
小红 81
小新 89
小丽 85
Python成绩:
小明 82
小红 83
小新 94
小丽 91
平均成绩如下:
(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)

四、环境介绍

Spark是云计算大数据的集大成者,是Hadoop的取代者,是第二代云计算大数据技术。它作为一个基于内存计算的云计算大数据平台,在实时流处理、交互式查询、机器学习、图处理、数据统计分析等方面具有无可比拟的优势;Spark 能够比Hadoop快100倍以上;Spark采用一个统一 的堆栈解决了云计算大数据的所有核心问题,这直接奠定了其一统云计算大数据领域的霸主地位。
PySpark 是 Spark 为 Python 开发者提供的 API。
RDD: 弹性分布式数据集分布在不同的集群节点的内存中,可以理解为一大数组,数组的每一个元素就是RDD的一个分区,一个RDD可以分布并被运算在多态计算机节点的内存以及硬盘中,RDD数据块可以放在磁盘上也可以放在内存中(取决于你的设置),如果出现缓冲失效或丢失,RDD分区可以重新计算刷新,RDD是不能被修改的但是可以通过API被变换生成新的RDD。有俩类对RDD的操作(也成算子):
1.变换(懒执行): 有 map flatMap groupByKey reduceByKey 等;
他们只是将一些指令集而不会马上执行,需要有操作的时候才会真正计算出结果;
2.操作(立即执行): 有 count take collect 等;
他们会返回结果,或者把RDD数据输出,这些操作实现了MapReduce的基本函数map,reduce及计算模型,还提供了filter,join,groupBYKey等,另外spark sql 可以用来操作有数据结构的RDD即SPARK DATA FRAME,它的运行原理和mapreduce是一样的,只是他们的运行方式不同,mr的运算是内存磁盘交互读写,不能在内存中共享数据,而RDD可以被共享和持久化.因为大数据运算经常是交互式和迭代式的,所以数据的重用性很重要,而mr的磁盘交互读写带来的I/O开销导致数度减慢。

五、实验步骤

(一)spark的安装
1、安装spark
(1)解压安装包,更改使用用户名hadoop并修改权限

sudo tar -zxf ~/下载/spark-2.0.2-bin-without-hadoop.tgz -C /usr/local/
cd /usr/local
sudo mv ./spark-2.1.0-bin-without-hadoop/ ./spark
sudo chown -R hadoop:hadoop ./spark

(2)安装后,还需要修改Spark的配置文件spark-env.sh

cd /usr/local/spark
cp ./conf/spark-env.sh.template ./conf/spark-env.sh

编辑spark-env.sh文件(vim ./conf/spark-env.sh),在第一行添加以下配置信息:

(3)启动spark-shell后,就会进入“scala>”命令提示符状态,如下图所示:

2、Java独立应用程序编程
(1)安装maven
选择安装在/usr/local/maven中:


2.Java应用程序代码
在终端执行如下命令创建一个文件夹sparkapp2作为应用程序根目录

在 ./sparkapp2/src/main/java 下建立一个名为 SimpleApp.java 的文件(vim ./sparkapp2/src/main/java/SimpleApp.java),添加代码如下:

/*** SimpleApp.java ***/import org.apache.spark.api.java.*;import org.apache.spark.api.java.function.Function;public class SimpleApp {public static void main(String[] args) {String logFile = "file:///usr/local/spark/README.md"; // Should be some file on your systemJavaSparkContext sc = new JavaSparkContext("local", "Simple App","file:///usr/local/spark/", new String[]{"target/simple-project-1.0.jar"});JavaRDD<String> logData = sc.textFile(logFile).cache();long numAs = logData.filter(new Function<String, Boolean>() {public Boolean call(String s) { return s.contains("a"); }}).count();long numBs = logData.filter(new Function<String, Boolean>() {public Boolean call(String s) { return s.contains("b"); }}).count();System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);}}

该程序依赖Spark Java API,因此我们需要通过Maven进行编译打包。在./sparkapp2中新建文件pom.xml(vim ./sparkapp2/pom.xml),添加内容如下,声明该独立应用程序的信息以及与Spark的依赖关系:

 <project><groupId>edu.berkeley</groupId><artifactId>simple-project</artifactId><modelVersion>4.0.0</modelVersion><name>Simple Project</name><packaging>jar</packaging><version>1.0</version><repositories><repository><id>Akka repository</id><url>http://repo.akka.io/releases</url></repository></repositories><dependencies><dependency> <!-- Spark dependency --><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.1.0</version></dependency></dependencies></project>

3.使用maven打包java程序
为了保证maven能够正常运行,先执行如下命令检查整个应用程序的文件结构:
cd ~/sparkapp2
Find .
文件结构如下图:

接着,我们可以通过如下代码将这整个应用程序打包成Jar
/usr/local/maven/bin/mvn package
如果运行上面命令后出现类似下面的信息,说明生成Jar包成功:

(2)通过spark-submit 运行程序
可以通过spark-submit提交应用程序,该命令的格式如下:

./bin/spark-submit --class <main-class>  //需要运行的程序的主类,应用程序的入口点--master <master-url>  //Master URL,下面会有具体解释--deploy-mode <deploy-mode>   //部署模式... # other options  //其他参数<application-jar>  //应用程序JAR包[application-arguments] //传递给主类的主方法的参数

最后,针对上面编译打包得到的应用程序,可以通过将生成的jar包通过spark-submit提交到Spark中运行,如下命令:

/usr/local/spark/bin/spark-submit --class "SimpleApp" ~/sparkapp2/target/simple-project-1.0.jar
#上面命令执行后会输出太多信息,可以不使用上面命令,而使用下面命令查看想要的结果
/usr/local/spark/bin/spark-submit --class "SimpleApp" ~/sparkapp2/target/simple-project-1.0.jar 2>&1 | grep "Lines with a"

最后得到的结果如下:

通过运行Spark自带的示例,验证Spark是否安装成功

(二)pyspark的安装配置

1.配置环境变量

进入.bashrc文件,输入hadoop的密码为hadoop

2.在文件中添加如下几行代码:

3.接着让该环境变量激活生效,执行如下代码:

4.执行pyspark后如下所示:

数据来源描述
(1)该系总共有多少学生
1)创建一个sparksqldata的文件

mkdir sparksqldata

2)cp拷贝的命令,执行

 cp data.txt /usr/local/spark/sparksqldata/

进入sparksqldata目录查看data.txt文件

cd /usr/local/spark/sparksqldata/
ls

3)启动pyspark,Cp拷贝的命令,将data.txt拷贝到目录sparkdata下



启动pyspark,pyspark启动成功

4)加载数据集,获取每行数据的第1列;去重操作;取元素总个数

lines = sc.textFile('file///usr/local/spark/sparksqldata/data.txt')
res = lines.map(lambda x:x.split(",")).map(lambda x:x[0])
distinst_res = res.distinct()
distinct_res.count()

答案为265人。
(2)该系共开设了多少门课程;
获取每行数据的第2列;去重操作;取元素总个数

lines = sc.textFile('file///usr/local/spark/sparksqldata/data.txt')
res = lines.map(lambda x:x.split(",")).map(lambda x:x[1])
distinst_res = res.distinct()
distinct_res.count()

答案为8门。
(3)Tom同学的总成绩平均分是多少;
筛选Tom同学的成绩信息
res.foreach(print)
score = res.map(lambda x:int(x[2])) //提取Tom同学的每门成绩,并转换为int类型
num = res.count() //Tom同学选课门数
sum_score = score.reduce(lambda x,y:x+y) //Tom同学的总成绩
avg = sum_score/num // 总成绩/门数=平均分

lines = sc.textFile('file///usr/local/spark/sparksqldata/data.txt')
res = lines.map(lambda x:x.split(",")).filter(lambda x:x[0]=='Tom')
res.foreach(print)


Tom同学的平均分为30.8分
(4)求每名同学的选修的课程门数;
学生每门课程都对应(学生姓名,1),学生有n门课程则有n个(学生姓名,1);按学生姓名获取每个学生的选课总数。

lines = sc.textFile('file///usr/local/spark/sparksqldata/data.txt')
res = lines.map(lambda x:x.split(",")).map(lambda x:(x[1],1))
each_res = res.reduceByKey(lambda x,y: x+y)
each_res.foreach(print)

答案共265行。
(5)该系DataBase课程共有多少人选修;

lines = sc.textFile('file///usr/local/spark/sparksqldata/data.txt')
res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=='DataBase')
res.count()

答案为1764人。

(6)各门课程的平均分是多少;
为每门课程的分数后面新增一列1,表示1个学生选择了该课程。格式如(‘ComputerNetwork’, (44, 1));按课程名聚合课程总分和选课人数。格式如(‘ComputerNetwork’, (7370, 142));课程总分/选课人数 = 平均分,并利用round(x,2)保留两位小数。

lines = sc.textFile('file///usr/local/spark/sparksqldata/data.txt')
res = lines.map(lambda x:x.split(",")).map(lambda x:x[1],(int(x[2]),1)))
temp = res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
avg = temp.map(lambda x:(x[0],round(x[1][0]/xx[1][1],2)))
avg.foreach(print)

所以ComputerNetwork的平均分是51.9分,Software的平均分是50.91分,Algorithm的平均分是48.83分,OperatingSystem的平均分是54.94分,python的平均分是57.82分,datastructure的平均分是47.57分,clanguage的平均分是50.61分。

(7)使用累加器计算共有多少人选了DataBase这门课。
筛选出选了DataBase课程的数据;定义一个从0开始的累加器accum;遍历res,每扫描一条数据,累加器加1;

lines = sc.textFile('file///usr/local/spark/sparksqldata/data.txt')
res = lines.map(lambda x:x.split(",")).map(lambda x:x[1]=='DataBase')
accum = sc.accumulator(0)
res.foreach(lambda x:accumn.add(1))
accum.value

答案为1764人。

2、编写独立应用程序实现数据去重

对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。本文给出门课的成绩(A.txt、B.txt)下面是输入文件和输出文件的一个样例,供参考。
输入文件A的样例如下:
20200101 x
20200102 y
20200103 x
20200104 y
20200105 z
20200106 z
输入文件B的样例如下:
20200101 y
20200102 y
20200103 x
20200104 z
20200105 yv
根据输入的文件A和B合并得到的输出文件C的样例如下:
20200101 x
20200101 y
20200102 y
20200103 x
20200104 y
20200104 z
20200105 y
20200105 z
20200106 z
(1)当前目录为/usr/local/spark/sparksqldata/,首先创建A.py和B.py文件,分别存放A、B两个文件,在当前目录下新建一个C.py文件;

cd /usr/local/spark/sparksqldata/
vim A
vim B




vim C.py

(2)输入以下代码

from pyspark import SparkContext
#初始化SparkContext
sc = SparkContext("local","sparksqldata")
#加载文件 A B ,创建RDD
lines1 = sc.textFile("file:///usr/local/spark/sparksqldata/A")
lines2 = sc.textFile("file:///usr/local/spark/sparksqldata/B")
#合并文件 A B
lines = lines1.union(lines2)
#去重操作
distinct_lines = lines.distinct()
#排序操作
res = distinct_lines.sortBy(lambda x:x)
#让合并结果放入一个文件中
res.repartition(1).saveAsTextFile('file:///usr/local/spark/sparksqldata/ymsresult')

(3)最后在目录/usr/local/spark/sparksqldata/下执行下面命令执行程序
$ python3 C.py

python3 C.py

(4)在目录/usr/local/spark/sparksqldata/result下即可得到结果文件part-00000。

cd ymsresult/
ls

vim part-00000

一共有500行

(5)在本机目录/usr/local/spark/sparksqldata/result下可以看到结果文件part-00000。

3、编写独立应用程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。本文给出门课的成绩(Algorithm.txt、Database.txt、Python.txt),下面是输入文件和输出文件的一个样例,供参考。
Algorithm成绩:
小明 92
小红 87
小新 82
小丽 90
Database成绩:
小明 95
小红 81
小新 89
小丽 85
Python成绩:
小明 82
小红 83
小新 94
小丽 91
平均成绩如下:
(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)
(1)创建Algorithm.py和Database.py文件以及Python.py文件

 vim Algorithm


创建Algorithm.py:

创建Database.py

vim Database



创建Python.py文件

vim Python


(2)当前目录为/usr/local/spark/sparksqldata/,首先创建Algorithm.py和Database.py文件以及Python.py文件,分别存放三个科目文件,在当前目录下新建一个avg.py文件求平均值

vim avg.py

(3)输入以下代码:

from pyspark import SparkContext
#初始化SparkContext
sc = SparkContext('local','sparksqldata')
#加载三个文件Algorithm.txt、Database.txt、Python.txt
lines1 =sc.textFile("file:///usr/local/spark/sparksqldata/Algorithm")
lines2 =sc.textFile("file:///usr/local/spark/sparksqldata/Database")
lines3 =sc.textFile("file:///usr/local/spark/sparksqldata/Python")
#合并三个文件的内容
lines1.union(lines2).union(lines3)
#为每个数据增加一列1,方便后续统计每个学生的课程数目。
data = lines.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1)))
#根据key也就是学生姓名合计每门课程的成绩,以及选秀的课程数目。
res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
#利用总成绩除以选秀的课程来计算每个学生的没门的平均分,并且利用round(0,2)来保留两位小数
result = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
#将结果写入result文件中,reparttition(1)的作用是让结果合并到一个文件中,不加的话会写入三个文件中
result.repartition(1).saveAsTextFile("file:///usr/local/spark/sparksqldata/newys")

(4)最后在目录/usr/local/spark/sparksqldata/下执行下面命令执行程序
$ python3 avg.py

python3 avg.py

(5)在目录/usr/local/spark/sparksqldata/result1下即可得到结果文件part-00000。

cd newys
ls


(6)进入part-00000查看内容

(7)在本机目录/usr/local/spark/sparksqldata/newre下可以看到结果文件part-00000。


打开part-00000查看具体的内容:

六、经验总结

本次实验操作下来,我学到了很多东西,我对Spark的RDD基本操作及键值对操作有了一定的了解;对如何使用RDD编程解决实际具体问题的方法有了进一步的认识,每个步骤都让我掌握了一定的命令知识,更深入理解大数据。创建RDD有两种方式:一种是通过并行化驱动程序中的已有集合创建,另外一种方法是读取外部数据集;我也了解到我们不应该把RDD 看作存放着特定数据的数据集,而最好把每个RDD 当作我们通过转化操作构建出来的、记录如 何计算数据的指令列表。我也发现自身的不足,以后会处于学习过程中,最后如果有错误的地方欢迎大家指出。

七、参考文献

[1] link.
http://dblab.xmu.edu.cn/blog/2481-2/
[2] link.
http://dblab.xmu.edu.cn/blog/290-2/
[3] link.
http://dblab.xmu.edu.cn/blog/285/
[4] link.
http://dblab.xmu.edu.cn/blog/hadoop-build-project-using-eclipse/

Spark综合大作业:RDD编程初级实践相关推荐

  1. spark期末大作业RDD编程初级实践

    1.需求描述 本次实验需要:系统:linux unbuntu14.04,处理器:至少需要两个处器,一个内核,内存:至少4G,硬盘空间:大小需要20GB.Hadoop:2.7.1以上版本,JDK:1.8 ...

  2. RDD编程初级实践-课程论文

    需求描述 本次实验需要对给定数据进行RDD的基本操作,使用RDD编程实现解决具体问题的方法.内容要求分为三个部分: pyspark交互式编程 根据给定大学成绩数据集,用pyspark来进行编程,分析并 ...

  3. RDD编程初级实践(期末大作业)

    文章目录 1.pyspark交互式编程 (1)该系总共有多少学生: (2)该系共开设了多少门课程: (3)Tom同学的总成绩平均分是多少: (4)求每名同学的选修的课程门数: (5)该系DataBas ...

  4. Spark大数据分析与实战:Spark Streaming编程初级实践

    Spark Streaming编程初级实践 一.安装Hadoop和Spark 具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作: Hadoop的安装:https://blog.csdn ...

  5. 实验5 Spark SQL编程初级实践

    今天做实验[Spark SQL 编程初级实践],虽然网上有答案,但在自己的环境下并不能够顺利进行 在第二题中,要求编程实现将 RDD 转换为 DataFrame.根据所谓标准答案,在进行sbt 打包时 ...

  6. 合肥学院C语言大作业,C语言实践课程综合大作业..doc

    C语言实践课程综合大作业. 昆明理工大学 <程序设计基础>课程 综合设计实践教学课题报告 课程名称: C语言程序设计综合大作业 课题名称:数学计算工具程序设计 组长:学号 20131040 ...

  7. 大学计算机实践教程4.3综合作业,2020年下学期西安电子科技大学《基础实验》综合大作业.docx...

    学习中心/函授站 _ 姓 名 学 号 西安电子科技大学网络与继续教育学院 2020 学年下学期 <基础实验>期末考试试题 (综合大作业) 题号 一 总分 题分 100 得分 考试说明: 1 ...

  8. 作业——08 爬虫综合大作业

    作业的要求来自于:https://edu.cnblogs.com/campus/gzcc/GZCC-16SE2/homework/3075 一.把爬取的内容保存取MySQL数据库 import pan ...

  9. 爬虫综合大作业(震惊!爬取了590位微信好友后竟然发现了)

    作业要求来自https://edu.cnblogs.com/campus/gzcc/GZCC-16SE1/homework/3159 可以用pandas读出之前保存的数据:见上次博客爬取全部的校园新闻 ...

最新文章

  1. 动态检测内存错误利器ASan
  2. Java继承Exception自定义异常类教程以及Javaweb中用Filter拦截并处理异常
  3. 世界围棋人机大战、顶峰对决第一盘:围棋世界冠军Lee Sedol(李世石,围棋职业九段)对战Google DeepMind AlphaGo围棋程序,Google AlphaGo首战告捷
  4. java字符串转字符串列表_Java中的字符串列表示例
  5. 计算机协会小游戏,网页闯关小游戏闯关记录(一)ISA TEST
  6. 20155322 《Java程序设计》课堂实践项目 数据库-3-4
  7. win8恢复我的计算机较早时间点,Win8系统的重置和刷新功能 -电脑资料
  8. javaScript中的Object类型
  9. ssm-学子商城-项目第三天
  10. 淘宝客高手必备的14大WordPress插件
  11. 如何简单有效的同步油猴插件
  12. Mybatis提高查询效率的方式
  13. 20162327WJH实验四——图的实现与应用
  14. 【致远FAQ】V8.0sp2_8.0sp2版本协同BPM平台的人员匹配去重
  15. Chrome浏览器对统一资源发出多个请求时,导致最多停止20s问题
  16. (PTA)数据结构(作业)11、树和图
  17. 安装ESIM事件相机模拟器遇到的一些问题及解决方法
  18. LINE FRIENDS 跨界李宁推出联名系列
  19. Lync 小技巧-41-Lync 2013-无法上载-PowerPoint
  20. oracle 分页语句效率高,Oracle 10g SQL分页查询语句和效率分析

热门文章

  1. redis: WRONGTYPE Operation against a key holding the wrong kind of value
  2. linux登录界面配置有趣的图案 /etc/motd,附带图片转字符串网址
  3. python编程技巧
  4. 【开发必备】快来收藏!涵盖日常开发中所需要的60多个正则验证!!
  5. C51单片机毕业设计题目大全
  6. 被final修饰的变量到底能不能被修改
  7. css设置高度和宽度相同
  8. 我跑了香港六家银行,把境外开户的事整明白了
  9. 010Editor十六进制转图片(攻防演练-决定用和决一死战)
  10. 使用 var 关键字在 Java 中使用动态类型