MapReduce分布式计算(一)
MapReduce是Hadoop系统核心组件之一,它是一种可用于大数据并行处理的计算模型、框架和平台,主要解决海量数据的计算,是目前分布式计算模型中应用较为广泛的一种。
练习:计算a.txt文件中每个单词出现的次数
hello world
hello hadoop
hello 51doit
hadoop mapreduce
mapreduce spark
public class WordCount {public static void main(String[] args) throws IOException {//获取到resource文件夹下a.txt的路径URL resource = WordCount.class.getClassLoader().getResource("a.txt");String path = resource.getPath();//使用FileUtils将文件读取成字符串String s = FileUtils.readFileToString(new File(path),"utf-8");//将文件使用空格进行切割 \s可以切割 空格 tab键String[] arr = s.split("\\s+");//创建Map集合Map<String,Integer> map = new HashMap<>();//遍历数组for (String s1 : arr) {//判断集合是否包含指定键if(!map.containsKey(s1)){//如果不包含 添加 单词 1map.put(s1,1);}else{//如果包含 获取当前键的次数 +1 在添加回集合Integer count = map.get(s1);map.put(s1,count+1);}}System.out.println(map);}
}
通过以上的方式 计算出来了a.txt文件中每个单词出现的次数,但是我们想一下 ,如果a.txt文件非常大,怎么办?
比如有一个a.txt文件10个T的大小。这时一台计算机就没有办法计算了,因为我们根本存储不了,计算不了,那么一台计算机无法计算,就使用多台计算机来进行计算!
MapReduce核心思想
MapReduce的核心思想是“分而治之”。所谓“分而治之”就是把一个复杂的问题,按照一定的“分解”方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果,把各部分的结果组成整个问题的结果,这种思想来源于日常生活与工作时的经验,同样也完全适合技术领域。
为了更好地理解“分而治之”思想,我们光来举一个生活的例子。例如,某大型公司在全国设立了分公司,假设现在要统计公司今年的营收情况制作年报,有两种统计方式,第1种方式是全国分公司将自己的账单数据发送至总部,由总部统一计算公司今年的营收报表:第2种方式是采用分而治之的思想,也就是说,先要求分公司各自统计营收情况,再将统计结果发给总部进行统一汇总计算。这两种方式相比,显然第2种方式的策略更好,工作效率更高效。
MapReduce 作为一种分布式计算模型,它主要用于解决海量数据的计算问题。使用MapReduce操作海量数据时,每个MapReduce程序被初始化为一个工作任务,每个工作任务可以分为Map 和l Reducc两个阶段,具体介绍如下:
Map阶段::负责将任务分解,即把复杂的任务分解成若干个“简单的任务”来行处理,但前提是这些任务没有必然的依赖关系,可以单独执行任务。
Reduce阶段:负责将任务合并,即把Map阶段的结果进行全局汇总。下面通过一个图来描述上述MapReduce 的核心思想。
MapReduce就是“任务的分解与结和的汇总”。即使用户不懂分布式计算框架的内部运行机制,但是只要能用Map和 Reduce思想描述清楚要处理的问题,就能轻松地在Hadoop集群上实现分布式计算功能。
MapReduce编程模型
MapReduce是一种编程模型,用于处理大规模数据集的并行运算。使用MapReduce执行计算任务的时候,每个任务的执行过程都会被分为两个阶段,分别是Map和Reduce,其中Map阶段用于对原始数据进行处理,Reduce阶段用于对Map阶段的结果进行汇总,得到最终结果。
MapReduce编程模型借鉴了函数式程序设计语言的设计思想,其程序实现过程是通过map()和l reduce()函数来完成的。从数据格式上来看,map()函数接收的数据格式是键值对,生的输出结果也是键值对形式,reduce()函数会将map()函数输出的键值对作为输入,把相同key 值的 value进行汇总,输出新的键值对。
(1)将原始数据处理成键值对<K1,V1>形式。
(2)将解析后的键值对<K1,V1>传给map()函数,map()函数会根据映射规则,将键值对<K1,V1>映射为一系列中间结果形式的键值对<K2,V2>。
(3)将中间形式的键值对<K2,V2>形成<K2,{V2,....>形式传给reduce()函数处理,把具有相同key的value合并在一起,产生新的键值对<K3,V3>,此时的键值对<K3,V3>就是最终输出的结果。
词频统计
因为我们的数据都存储在不同的计算机中,那么将对象中的数据从网络中传输,就一定要用到序列化!
/*JDK序列化对象的弊端 我们进行序列化 其实最主要的目的是为了 序列化对象的属性数据比如如果序列化一个Person对象 new Person("柳岩",38); 其实我们想要的是 柳岩 38但是如果直接序列化一个对象的话 JDK为了反序列化方便 会在文件中加入其他的数据 这样序列化后的文件会变的很大,占用空间
*/
public class Test {public static void main(String[] args) throws Exception {ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("d:\\person.txt"));//JDK序列化对象 Person p = new Person();p.setName("柳岩");p.setAge(38);oos.writeObject(p);oos.close();}
}
本来其实数据就占几个字节,序列化后,多占用了很多字节,这样如果序列化多的话就会浪费很多空间.
/*可以通过序列化属性的方式解决问题只序列化属性 可以减小序列化后的文件大小
*/
public class Test {public static void main(String[] args) throws Exception {ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("d:\\person.txt"));Person p = new Person();p.setName("柳岩");p.setAge(38);//只序列化属性oos.writeUTF(p.getName());oos.writeInt(p.getAge());oos.close();}
}
/*需要注意反序列化时 需要按照序列化的顺序来反序列化
*/
public class Test {public static void main(String[] args) throws Exception {ObjectInputStream ois = new ObjectInputStream(new FileInputStream("d:\\person.txt"));//先反序列化name 在反序列化ageString name = ois.readUTF();int age = ois.readInt();System.out.println(name + " "+age);ois.close();}
}
Hadoop对java的序列化又进行了优化,对一些类型进行了进一步的封装,方便按照自己的方式序列化
Integer ----> IntWritable
Long ----> LongWritable
String ----> Text
Double ----> DoubleWritable
Boolean ----> BooleanWritable
WorldCount代码编写
map函数定义
/*KEYIN: K1 VALUIN: V1 KEYOUT:K2 VALUEOUT:V2
*/
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {}
}
我们只需要继承Mapper类,重写map方法就好
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*K1 : 行起始位置 数字 Long ---- > LongWritableV1 : 一行数据 字符串 String -----> TextK2 : 单词 字符串 String -----> TextV2 : 固定数字1 数组 Long -----> LongWritable*/
public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {/**** @param key K1* @param value V1* @param context 上下文对象 将map的结果 输出给reduce*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//将一行数据 转换成字符串 按照空格切割 String[] arr = value.toString().split("\\s+");for (String k2 : arr) {//将单词输出给reducecontext.write(new Text(k2),new LongWritable(1));}}
}
reduce函数定义
/*KEYIN:K2 VALUEIN:V2KEYOUT:K3VALUEOUT:V3
*/
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {}
}
我们只需要继承Reducer类型重写reduce方法就好
/*K2:单词 String ----> TextV2:固定数字 1 Long ----> LongWritableK3:单词 String ----> TextV3:相加后的结果 Long ----> LongWritable*/
public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {/**** @param key K2* @param values V2的集合 {1,1,1,1}* @param context 上下文对象 输出结果*/@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {int count = 0;//将次数相加for (LongWritable value : values) {count+=value.get();}//写出 k3 v3context.write(key,new LongWritable(count));}
}
最后编写启动程序
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Test {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//创建配置对象Configuration conf = new Configuration();//创建工作任务Job job = Job.getInstance(conf, "wordCount");//设置Map类job.setMapperClass(WordCountMapper.class);//设置Reduce类job.setReducerClass(WordCountReducer.class);//设置map的输出类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);//设置reduce的输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//设置读取文件位置 可以是文件 也可以是文件夹FileInputFormat.setInputPaths(job,new Path("d:\\work\\abc"));//设置输出文件位置FileOutputFormat.setOutputPath(job,new Path("d:\\work\\abc\\out_put"));//提交任务 并等待任务结束job.waitForCompletion(true);}
}如果抛这个异常 需要查看windows环境
Exception in thread "main"java.lang .UnsatisfiedLinkError: org.apache .hadoop.io.nativeio.NativeIO$windows.access0(Ljava/lang/string;1) .如果已经配置了环境 还不行 在src新建包 org.apache.hadoop.io.nativeio然后hadoop02文件夹中的 NativeIO.java添加到这个包下 重新运行尝试
若要显示报错信息在resouces目录下添加log4j.properties
内容如下:
log4j.rootCategory=INFO,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
MapReduce分布式计算(一)相关推荐
- 【Hadoop】谷歌的三篇论文(GFS、MapReduce分布式计算模型 、BigTable大表)
谷歌的三篇论文(GFS.MapReduce分布式计算模型 .BigTable大表) 0 谷歌的基本思想:三驾马车 1 第一篇论文:GFS 1.1 Google的GFS分布式文件系统的基本原理 1.2 ...
- MapReduce分布式计算框架
1.MapReduce分布式计算框架 本章介绍了Hadoop的MapReduce分布式计算框架的基本概念.编程规范和词频统计实战等内容.从存储的大数据中快速抽取信息,进一步进行数据价值的挖掘,需要用到 ...
- 对mapreduce分布式计算框架原理,进行完整流程分析
本文中大致的对mapreduce进行完整流程分析,map.reduce端的分析. 一. MapReduce 简介 MapReduce源自google的一篇文章,将海量数据处理的过程拆分为map和red ...
- MapReduce(分布式计算框架)
什么是MapReduce MapReduce是分布式计算框架,它将大型数据操作作业分解为可以跨服务器集群并行执行的单个任务,适用于大规模数据处理场景,每个job包含Map和Reduce两部分 MapR ...
- MapReduce分布式计算框架简介
Hadoopd分布式计算框架--MapReduce 一.MapReduce简介 1. 概念 MapReduce是基于Hadoop的分布式计算框架. 起源于Google,它将大型数据操作作业分解为可以跨 ...
- MapReduce分布式计算框架的优缺点
MapReduce是一个可用于大规模数据处理的分布式计算框架,它借助函数式编程及分而治之的设计思想,使编程人员在即使不会分布式编程的情况下,也能够轻松地编写分布式应用程序并运行在分布式系统之上. 一. ...
- MapReduce(分布式计算框架)了解
Hadoop组成 Hadoop HDFS:一个高可靠.高吞吐量的分布式文件系统,对海量数据的存储. Hadoop MapReduce:一个分布式的资源调度和离线并行计算框架. Hadoop Yarn: ...
- MapReduce 分布式计算框架 简介 特点 工作流程
MapReduce 计算框架 一种分布式计算框架,解决海量数据的计算问题 MapReduce将整个并行计算过程抽象到两个函数 Map(映射):对一些独立元素组成的列表的每一个元素进行指定的操作,可以高 ...
- Hadoop之MapReduce分布式计算
简单介绍一下项目背景--很简单,作死去接下老师的活,然后一干就是半个月,一直忙着从零基础到使用Hadoop中的MapReduce来解决一个实际问题,也就是用来计算一个数据量较大的二度朋友关系. 那么首 ...
最新文章
- SQL SERVER 数据库清空语句 忽略外键 触发器 等
- linux ubuntu/deepin与Windows时间不同步解决办法(双系统)
- JAX-RS协议说明
- Docker Hello World容器运行报错的解决办法
- 罗马数字转整数Python解法
- WPF实现实现圆形菜单
- LeetCode 1561. 你可以获得的最大硬币数目
- weixin微信公众号一站到底游戏代码(有点普通)
- 求解一元一次方程C语言,问一道算法题目(解一元一次方程的问题)
- python自定义函数两个返回值如何分别输出_第八讲 python自定义函数返回值
- jtds 连接mysql_JAVA 使用jtds 连接sql server数据库
- github1s 油猴插件
- springboot教务评教系统毕业设计源码252116
- 推广软件开发分析规范
- 面试官常问的设计模式及常用框架中设计模式的使用(一)
- centos7网卡开机自动down
- 基于PANDA做DTI分析
- 读取经纬度坐标并存储为字典格式,即key为ID,value为轨迹点
- 小米手机彻底关闭广告经验分享
- TestProject 自动化
热门文章
- ubuntu系统终端(Terminate)常用快捷键
- Asp .NetCore 从数据库加载配置(一)
- 虚拟dom (virtual dom)(vnode)
- 【快速理解Adagrad】通俗解释Adagrad梯度下降算法
- Apifox简单了解——WEB端测试的集大成者
- Unity游戏开发之游戏动画(模型动画制作及导入)
- python安装 百度网盘-centos 安装百度云/百度网盘Python客户端
- 〖Python 数据库开发实战 - MySQL篇㉖〗- 数据删除操作 - DELETE语句
- 程序员如何在工作之余挣点小钱
- IDEA安装阿里巴巴开发手册插件(Alibaba Java Coding Guidelines)