【小白视角】大数据基础实践(五) MapReduce编程基础操作
目录
- 1. MapReduce 简介
- 1.1 起源
- 1.2 模型简介
- 1.3 MRv1体系结构
- 1.4 YARN
- 1.4.1 YARN体系结构
- 1.4.2 YARN工作流程
- 2. MapReduce 工作流程
- 3. Java Api要点
- 4. 实验过程
- 最后
1. MapReduce 简介
1.1 起源
在函数式语言里,map表示对一个列表(List)中的每个元素做计算,reduce表示对一个列表中的每个元素做迭代计算。
它们具体的计算是通过传入的函数来实现的,map和reduce提供的是计算的框架。
- 在MapReduce里,map处理的是原始数据,每条数据之间互相没有关系;
- 到了reduce阶段,数据是以key后面跟着若干个value来组织的,这些value有相关性,至少它们都在一个key下面,于是就符合函数式语言里map和reduce的基本思想了。
- “map”和“reduce”的概念和它们的主要思想,都是从函数式编程语言借用来的,还有从矢量编程语言里借来的特性。极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。
1.2 模型简介
MapReduce
将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map
和Reduce
- 编程容易,不需要掌握分布式并行编程细节,也可以很容易把自己的程序运行在分布式系统上,完成海量数据的计算
MapReduce
采用“分而治之”策略,一个存储在分布式文件系统中的大规模数据集,会被切分成许多独立的分片(split),这些分片可以被多个Map任务并行处理MapReduce
设计的一个理念就是“计算向数据靠拢”,而不是“数据向计算靠拢”,因为,移动数据需要大量的网络传输开销MapReduce
框架采用了Master/Slave
架构,包括一个Master
和若干个Slave
。Master
上运行JobTracker
(yarn上ResourceManager),Slave
上运行TaskTracker
(yarn上Nodemanager)- Hadoop框架是用Java实现的,但是,MapReduce应用程序则不一定要用Java来写
1.3 MRv1体系结构
MapReduce体系结构主要由四个部分组成,分别是:Client、JobTracker、TaskTracker以及Task
结点说明:
- Client
用户编写的MapReduce
程序通过Client
提交到JobTracker
端,用户可通过Client
提供的一些接口查看作业运行状态。 - JobTracker
JobTracker
负责资源监控和作业调度;JobTracker
监控所有TaskTracker
与Job
的健康状况,一旦发现失败,就将相应的任务转移到其他节点;JobTracker
会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源。 - TaskTracker
TaskTracker
会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给JobTracker
,同时接收JobTracker
发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker
使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task
获取到一个slot
后才有机会运行,而Hadoop
调度器的作用就是将各个TaskTracker
上的空闲slot
分配给Task
使用。slot 分为Map slot
和Reduce slot
两种,分别供Map Task
和Reduce Task
使用。 - Task
Task分为Map Task
和Reduce Task
两种,均由TaskTracker
启动。
结构缺点:
- 存在单点故障
- JobTracker“大包大揽”导致任务过重(任务多时内存开销大,上限4000节点)
- 容易出现内存溢出(分配资源只考虑MapReduce任务数,不考虑CPU、内存)
- 资源划分不合理(强制划分为slot ,包括Map slot和Reduce slot)
1.4 YARN
1.4.1 YARN体系结构
架构思想
体系结构
ResourceManager
• 处理客户端请求
• 启动/监控ApplicationMaster
• 监控NodeManager
• 资源分配与调度
NodeManager
• 单个节点上的资源管理
• 处理来自ResourceManger的命令
• 处理来自ApplicationMaster的命令
ApplicationMaster
• 为应用程序申请资源,并分配给内部任务
• 任务调度、监控与容错
1.4.2 YARN工作流程
步骤1:用户编写客户端应用程序,向YARN提交应用程序,提交的内容包括ApplicationMaster
程序、启动ApplicationMaster
的命令、用户程序等
步骤2:YARN
中的ResourceManager
负责接收和处理来自客户端的请求,为应用程序分配一个容器,在该容器中启动一个ApplicationMaster
步骤3:ApplicationMaster
被创建后会首先向ResourceManager
注册
步骤4:ApplicationMaster
采用轮询的方式向ResourceManager
申请资源
步骤5:ResourceManager
以“容器”的形式向提出申请的ApplicationMaster
分配资源
步骤6:在容器中启动任务(运行环境、脚本)
步骤7:各个任务向ApplicationMaster
汇报自己的状态和进度
步骤8:应用程序运行完成后,ApplicationMaster
向ResourceManager
的应用程序管理器注销并关闭自己
2. MapReduce 工作流程
➢ 不同的Map任务之间不会进行通信
➢ 不同的Reduce任务之间也不会发生任何信息交换
➢ 用户不能显式地从一台机器向另一台机器发送消息
➢ 所有的数据交换都是通过MapReduce框架自身去实现的
例子
3. Java Api要点
- Writable
Hadoop 自定义的序列化接口。当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。Map 和 Reduce 的 key、value 数据格式均为 Writeable 类型,其中 key 还需实现WritableComparable 接口。Java 基本类型对应 writable 类型的封装如下:
Java primitive | Writable implementation |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | ShortWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
enum | EnumWritable |
Map | MapWritable |
(2)InputFormat
用于描述输入数据的格式。提供两个功能:
getSplits()
数据分片,按照某个策略将输入数据切分成若干个split
,以便确定Map
任务个数以及对应的split
;createRecordReader()
,将某个split
解析成一个个key-value
对。
FileInputFormat
是所有以文件作为数据源的InputFormat
实现基类,小文件不会进行分片,记录读取调用子类TextInputFormat
实现;
TextInputFormat
是默认处理类,处理普通文本文件,以文件中每一行作为一条记录,行起始偏移量为key
,每一行文本为 value;CombineFileInputFormat
针对小文件设计,可以合并小文件;KeyValueTextInputFormat
适合处理一行两列并以tab
作为分隔符的数据;NLineInputFormat
控制每个split
中的行数。
(3)OutputFormat
主要用于描述输出数据的格式。Hadoop 自带多种 OutputFormat 的实现。
TextOutputFormat
默认的输出格式,key 和 value 中间用 tab 分隔;SequenceFileOutputFormat
,将 key 和 value 以 SequenceFile 格式输出;SequenceFileAsOutputFormat
,将 key 和 value 以原始二进制格式输出;MapFileOutputFormat
,将 key 和 value 写入 MapFile 中;MultipleOutputFormat
,默认情况下 Reducer 会产生一个输出,用该格式可以实现一个Reducer 多个输出。
(4)Mapper/Reducer
封装了应用程序的处理逻辑,主要由 map、reduce 方法实现。
(5)Partitioner
根据 map 输出的 key 进行分区,通过 getPartition()方法返回分区值,默认使用哈希函
数。分区的数目与一个作业的reduce任务的数目是一样的。HashPartitioner是默认的Partioner。
4. 实验过程
1、计数统计类应用
仿照 WordCount 例子,编写“TelPubXxx”类实现对拨打公共服务号码的电话信息的统计。给出的一个文本输入文件如下,第一列为电话号码、第二列为公共服务号码,中间以空格隔开。
13718855152 11216810117315 110
39451849 112
13718855153 110
13718855154 112
18610117315 114
18610117315 114
MapReduce 程序执行后输出结果如下,电话号码之间用“|”连接:
110 13718855153|16810117315
112 13718855154|39451849|13718855152
114 18610117315|18610117315
运行成功
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TelPubZqc {public static class TelMap extends Mapper<Object, Text, Text, Text> {private Text pub = new Text();private Text tel = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {//Map (Key Value)String[] s=value.toString().split(" ");tel.set(s[0]);pub.set(s[1]);context.write(pub,tel);}}public static class TelReducer extends Reducer<Text, Text, Text, Text> {private Text result = new Text();public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {StringBuilder s= new StringBuilder();for (Text val : values) {if(s.toString().equals("")){s.append(val.toString());}else s.append("|").append(val.toString());}result.set(String.valueOf(s));context.write(key, result);// 输出结果}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// 加载hadoop配置conf.set("fs.defaultFS", "hdfs://localhost:9000");String[] otherArgs = new String[]{"input/input.txt","output/outputTel"};if (otherArgs.length < 2) {System.err.println("Usage: PubTel <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "word count");// 设置环境参数job.setJarByClass(TelPubZqc.class);// 设置程序主类job.setMapperClass(TelMap.class);// 设置用户实现的Mapper类job.setCombinerClass(TelReducer.class);job.setReducerClass(TelReducer.class);// 设置用户实现的Reducer类job.setOutputKeyClass(Text.class);// 设置输出key类型job.setOutputValueClass(Text.class); // 设置输出value类型for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));// 添加输入文件路径}FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// 设置输出文件路径System.exit(job.waitForCompletion(true) ? 0 : 1); // 提交作业并等待结束}
}
2、两表联结 Join 应用
仿照单表关联例子,编写“RelationXxx”类实现多表关联。中文文本文件转成 UTF-8 编码格式,否则会乱码。
输入 score.txt:
studentid | classid | score |
---|---|---|
s003001 | fd3003 | 84 |
s003001 | fd3004 | 90 |
s003002 | fd2001 | 71 |
s002001 | fd1001 | 66 |
s001001 | fd1001 | 98 |
s001001 | fd1002 | 60 |
输入 major.txt:
classid | classname | deptname |
---|---|---|
fd1001 | 数据挖掘 | 数学系 |
fd2001 | 电子工程 | 电子系 |
fd2002 | 电子技术 | 电子系 |
fd3001 | 大数据 | 计算机系 |
fd3002 | 网络工程 | 计算机系 |
fd3003 | Java 应用 | 计算机系 |
fd3004 | web 前端 | 计算机系 |
输出结果:
classid | classname | deptname | studentid | score |
---|---|---|---|---|
fd1001 | 数据挖掘 | 数学系 | s001001 | 98 |
fd1001 | 数据挖掘 | 数学系 | s002001 | 66 |
fd2001 | 电子工程 | 电子系 | s003002 | 71 |
fd3003 | Java 应用 | 计算机系 | s003001 | 84 |
fd3004 | web 前端 | 计算机系 | s003001 | 90 |
将其中需要的东西传到hdfs中去。
没有报错。查看结果
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;public class RelationZqc {public static int time = 0;public static class RelationMap extends Mapper<Object, Text, Text, Text> {private Text classID = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String filename=((FileSplit)context.getInputSplit()).getPath().getName();String[] s = value.toString().split(" ");if(filename.equals("score.txt")){classID.set(s[1]);String val="1," + s[0] + "," + s[2];context.write(classID,new Text(val));}else if (filename.equals("major.txt")){if(!s[0].equals("classid")){classID.set(s[0]);String val = "2," + s[1] + "," + s[2];context.write(classID,new Text(val));}}}}public static class RelationReduce extends Reducer<Text, Text, Text, Text> {private Text result = new Text();public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {String[][] studentTable=new String[10][2];String[] data;String classID = "nil";if(time == 0){context.write(new Text("classid"), new Text("classname deptname studentid score"));time++;}int cnt = 0;for (Text val : values) {data = val.toString().split(",");if(data[0].equals("1")){studentTable[cnt][0] = data[1];studentTable[cnt][1] = data[2];cnt = cnt + 1;}else if(data.length == 3 && data[0].equals("2")){classID = data[1] + " " + data[2];}}for(int i = 0; i < cnt; i++){if(classID.equals("nil")) continue;String s=classID+" "+studentTable[i][0]+" "+studentTable[i][1];result.set(s);context.write(key, result);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// 加载hadoop配置conf.set("fs.defaultFS", "hdfs://localhost:9000");String[] otherArgs = new String[]{"input/score.txt", "input/major.txt", "output/outputRelationZqc"};
// String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: Relation <in> <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "RelationZqc");// 设置环境参数job.setJarByClass(RelationZqc.class);// 设置程序主类job.setMapperClass(RelationMap.class);// 设置用户实现的Mapper类job.setReducerClass(RelationReduce.class);// 设置用户实现的Reducer类job.setOutputKeyClass(Text.class);// 设置输出key类型job.setOutputValueClass(Text.class); // 设置输出value类型for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));// 添加输入文件路径}FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// 设置输出文件路径System.exit(job.waitForCompletion(true) ? 0 : 1); // 提交作业并等待结束}
}
3、简单排序类应用编写 MapReduce 程序“SortXxx” 类,要求输入文件 sort1.txt、sort2.txt、sort3.txt 内容,由程序随机生成若干条数据并存储到 HDFS 上,每条数据占一行,数据可以是日期也可以是数字;输出结果为两列数据,第一列是输入文件中的原始数据,第二列是该数据的排位。
运行成功
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class SortZqc {public static class SortMap extends Mapper<Object,Text,IntWritable,IntWritable>{private static IntWritable data = new IntWritable();//实现map函数public void map(Object key,Text value,Context context) throws IOException,InterruptedException{String line=value.toString();data.set(Integer.parseInt(line));context.write(data, new IntWritable(1));}}public static class SortReduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{IntWritable n = new IntWritable(1); //用n代表位次public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{for(IntWritable val:values){context.write(key,n);n = new IntWritable(n.get()+1);}}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();// 加载hadoop配置conf.set("fs.defaultFS", "hdfs://localhost:9000");String[] otherArgs = new String[]{"input/sort1.txt","input/sort2.txt","input/sort3.txt","output/outputSortZqc"};if (otherArgs.length < 2) {System.err.println("Usage: data sort <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "data sort");// 设置环境参数job.setJarByClass(SortZqc.class);// 设置程序主类job.setMapperClass(SortMap.class);// 设置用户实现的Mapper类job.setCombinerClass(SortReduce.class);job.setReducerClass(SortReduce.class);// 设置用户实现的Reducer类job.setOutputKeyClass(IntWritable.class);// 设置输出key类型job.setOutputValueClass(IntWritable.class); // 设置输出value类型for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));// 添加输入文件路径}FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// 设置输出文件路径System.exit(job.waitForCompletion(true) ? 0 : 1); // 提交作业并等待结束}}
最后
小生凡一,期待你的关注。
【小白视角】大数据基础实践(五) MapReduce编程基础操作相关推荐
- 打怪升级之小白的大数据之旅(五十九)<Hadoop优化方案>
打怪升级之小白的大数据之旅(五十八) Hadoop优化方案与扩展知识点 上次回顾 上一章,我们对Hadoop的扩展知识HA进行了学习,本章是我们在使用Hadoop过程中的一些优化方案和其他几个需要了解 ...
- 打怪升级之小白的大数据之旅(一)<Java基础语法之Java的身世之谜>
打怪升级之小白的大数据之旅(一) Java基础语法之Java的身世之谜 打怪升级之小白的大数据之旅(一) 前言 一.学习大数据之前 二.Java基础 what? why? how? 总结 前言 做了几 ...
- 网络智能和大数据公开课Homework3 Map-Reduce编程
Web Intelligence and Big Data by Dr. Gautam Shroff 这门课是关于大数据处理,本周是第一次编程作业,要求使用Map-Reduce对文本数据进行统计.使 ...
- 提升60%基础资源利用率!中国联通的容器化大数据平台实践
中国联通数据中心总经理王志军在Rancher举办的ECIC大会上的演讲实录,分享了中国联通为何开始进行平台容器化并如何运用Kubernetes对9000台的服务器数据节点进行最大化利用和合理调度,进而 ...
- 零基础小白的大数据入门手册
零基础小白的大数据入门手册,学大数据前,大家可能听过不少说大数据难学.入行做好心理准备的.大家听完也很动摇很犹豫,怀疑自己能不能学好大数据.这其实完全没有必要,觉得一个东西难,百分之八十的原因是你不了 ...
- 打怪升级之小白的大数据之旅(二十五)<Java面向对象进阶之IO流三 其他常见流>
打怪升级之小白的大数据之旅(二十五) Java面向对象进阶之IO流三 其他常见流 上次回顾 上一章,我们学习了常用的字节流与字符流,本章,我会将其他的一些常见的流进行分享,IO流很多,我介绍不完,就挑 ...
- 傅一平:运营商的大数据变现实践
3月29日,由东湖大数据发起.数据观作为合作媒体参与的大数据百人会·线上沙龙第9期活动圆满结束,浙江移动大数据中心傅一平博士就<运营商大数据变现实践>主题与大家分享了数据利用的心得与经验, ...
- 传统银行业务的数字化转型-中原银行大数据建设实践
在以"场景赋能·驱动有数"为主题的神策 2018 数据驱动大会现场,中原银行刘远东发表了名为<传统银行业务的数字化转型-中原银行大数据建设实践>的主题演讲,以下内容根据 ...
- 23篇大数据系列(二)scala基础知识全集(史上最全,建议收藏)
作者简介: 蓝桥签约作者.大数据&Python领域优质创作者.管理多个大数据技术群,帮助大学生就业和初级程序员解决工作难题. 我的使命与愿景:持续稳定输出,赋能中国技术社区蓬勃发展! 大数据系 ...
最新文章
- [记录]calculate age based on date of birth
- [云炬创业基础笔记]第一章创业环境测试9
- 编写高质量代码:改善Java程序的151个建议(第3章:类、对象及方法___建议36~40)
- IOS基础之Foundation框架常用类NSFileManager,DSDate,CGPoint,CGSize,copy,单例
- qq空间网页设计_网页设计中负空间的有效利用
- 整理JS+FLASH幻灯片播放图片脚本代码
- WINFORM 调用 Close 不会释放窗体
- SpringBoot2.0 基础案例(14):基于Yml配置方式,实现文件上传逻辑
- 基于BAE微信公众账号管理系统答辩PPT免费下载
- Mondrian xml服务mysql_mondrian与java工程的集成
- Android典型界面设计——ViewPage+Fragment实现区域顶部tab滑动切换
- 7500 cpuz跑分 i5_锐龙R5 1400对比i5 7500哪个好 R5-1400与i5-7500区别对比详细评测
- 在mac11以上系统可用的cocosbuilder3.0,12也可用。
- C#中的bin和obj文件夹有什么用?
- md4 java_求MD4 java实现的代码
- 故事要从我白嫖了一个阿里云服务器说起
- 计算机网络---应用层
- java http请求发送unicode_c++ 使用httpclient获取网页及utf8与unicode之间转码
- 优动漫PAINT之绘画助手软件简介
- svn篇2:idea中使用svn
热门文章
- Access VBA 学习笔记 技巧
- Go语言自学系列 | golang标准库os包和环境相关的方法
- 管道的故事(二)提桶者和管道创建者
- 微软android studio,Android 入门 - Visual Studio App Center | Microsoft Docs
- IDEA正则表达式高级替换
- Openstack实验笔记
- springboot毕设项目教学质量评估系统8psea(java+VUE+Mybatis+Maven+Mysql)
- tableau 10.5安装(超简单)
- 时代的发展,带来的是淘汰还是机会
- linux中用vi读文件夹,linux下vi与vim编辑器的简单区别及VI详细使用方法