前文

  • 一、CentOS7 hadoop3.3.1安装(单机分布式、伪分布式、分布式)
  • 二、JAVA API实现HDFS

MapReduce编程实例

文章目录

  • 前文
  • MapReduce编程实例
    • 前言
    • 注意事项
    • 单词统计 WordCount
    • MapReduce 经典案例——倒排索引
    • MapReduce 经典案例——数据去重
    • MapReduce 经典案例——TopN
  • Github下载地址

前言

简介

讲解_Hadoop 中文网

Hadoop测试项目:HadoopDemo

注意事项

如果下载了HadoopDemo作为测试,用到HDFS_CRUD.java
需要提前准备winutils。最好对应版本。

单词统计 WordCount

WordCountMapper.java

package top.rabbitcrows.hadoop.mr;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** 这里就是MapReduce程序 Map阶段业务逻辑实现的类 Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>* <p>* KEYIN:表示mapper数据输入时key的数据类型,在默认读取数据组件下,叫作ImportFormat,它的行为是每行读取待处理的数据* 读取一行,就返回一行给MR程序,这种情况下 KEYIN就表示每一行的起始偏移,因此数据类型是Long* <p>* VALUEIN:表示mapper数据输入的时候Value的数据类型,在默认读取数据组件下,* valueIN就表示读取的一行内容 因此数据类型是String* <p>* KEYOUT:表示mapper阶段数据输出的时候key的数据类型,在本案例中输出的key是单词,因此数据类型是String* ValueOUT:表示mapper阶段数据输出的时候value的数据类型,在本案例中输出的value是单次的此书,因此数据类型是Integer* <p>* 这里所说的数据类型String,Long都是JDK的自带的类型,* 数据在分布式系统中跨网络传输就需要将数据序列化,默认JDK序列化时效率低下,因此* 使用Hadoop封装的序列化类型。 long--LongWritable String --Text Integer intWritable ....** @author LEHOSO*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {/*** 这里就是mapper阶段具体业务逻辑实现的方法 该方法的调用取决于读取数据的组件有没有给MR传入数据* 如果有数据传入,每一个<k,v>对,map就会被调用一次*/@Overrideprotected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, IntWritable>.Context context)throws IOException, InterruptedException {// 拿到传入进来的一行内容,把数据类型转换为StringString line = value.toString();// 将这行内容按照分隔符切割String[] words = line.split(" ");// 遍历数组,每出现一个单词就标记一个数组1 例如:<单词,1>for (String word : words) {// 使用MR上下文context,把Map阶段处理的数据发送给Reduce阶段作为输入数据context.write(new Text(word), new IntWritable(1));//第一行 hadoop hadoop spark  发送出去的是<hadoop,1><hadoop,1><spark,1>}}
}

WordCountReducer.java

package top.rabbitcrows.hadoop.mr;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;//都要继承Reducer 这就是我们所说的变成模型,只需要套模板就行了/*** 这里是MR程序 reducer阶段处理的类* <p>* KEYIN:就是Reducer阶段输入的数据key类型,对应Mapper阶段输出KEY类型 ,在本案例中就是单词* <p>* VALUEIN:就是Reducer阶段输入的数据value类型,对应Mapper阶段输出VALUE类型 ,在本案例中就是个数* <p>* KEYOUT:就是Reducer阶段输出的数据key类型,在本案例中,就是单词 Text* <p>* VALUEOUT:reducer阶段输出的数据value类型,在本案例中,就是单词的总次数** @author LEHOSO*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {/*** 这里是REDUCE阶段具体业务类的实现方法* 第一行 hadoop hadoop spark  发送出去的是<hadoop,1><hadoop,1><spark,1>* reduce接受所有来自Map阶段处理的数据之后,按照Key的字典序进行排序* 按照key是否相同作一组去调用reduce方法* 本方法的key就是这一组相同的kv对 共同的Key* 把这一组的所有v作为一个迭代器传入我们的reduce方法* <p>* 迭代器:<hadoop,[1,1]>*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> value,Reducer<Text, IntWritable, Text, IntWritable>.Context context)throws IOException, InterruptedException {//定义一个计数器int count = 0;//遍历一组迭代器,把每一个数量1累加起来就构成了单词的总次数//for (IntWritable iw : value) {count += iw.get();}context.write(key, new IntWritable(count));}
}

WordCountCombiner.java

package top.rabbitcrows.hadoop.mr;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context)throws IOException, InterruptedException {// 1.局部汇总int count = 0;for (IntWritable v : values) {count += v.get();}context.write(key, new IntWritable(count));}
}

WordCountDriver.java

package top.rabbitcrows.hadoop.mr;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** Driver类就是MR程序运行的主类,本类中组装了一些程序运行时所需要的信息* 比如:使用的Mapper类是什么,Reducer类,数据在什么地方,输出在哪里** @author LEHOSO*/
public class WordCountDriver {public static void main(String[] args) throws Exception {// 通过Job来封装本次MR的相关信息Configuration conf = new Configuration();conf.set("mapreduce.framework.name", "local");Job wcjob = Job.getInstance(conf);// 指定MR Job jar包运行主类wcjob.setJarByClass(WordCountDriver.class);// 指定本次MR所有的Mapper Reducer类wcjob.setMapperClass(WordCountMapper.class);wcjob.setReducerClass(WordCountReducer.class);// 设置我们的业务逻辑 Mapper类的输出 key和 value的数据类型wcjob.setMapOutputKeyClass(Text.class);wcjob.setMapOutputValueClass(IntWritable.class);// 设置我们的业务逻辑 Reducer类的输出 key和 value的数据类型wcjob.setOutputKeyClass(Text.class);wcjob.setOutputValueClass(IntWritable.class);//设置Combiner组件wcjob.setCombinerClass(WordCountCombiner.class);// 指定要处理的数据所在的位置FileInputFormat.setInputPaths(wcjob, new Path("input/mr"));// 指定处理完成之后的结果所保存的位置FileOutputFormat.setOutputPath(wcjob, new Path("output/mr"));// 提交程序并且监控打印程序执行情况boolean res = wcjob.waitForCompletion(true);System.exit(res ? 0 : 1);}
}

MapReduce 经典案例——倒排索引

InvertedIndexMapper.java

package top.rabbitcrows.mr.InvertedIndex;import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;/*** @author LEHOSO* @date 2021/11/2* @apinote*/
public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {//存储单词和文档名称private static Text KeyInfo = new Text();//存储词频,初始化为1private static final Text valueInfo = new Text("1");@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {String line = value.toString();String[] fileds = StringUtils.split(line, " ");//得到这行数据所在的文件切片FileSplit fileSplit = (FileSplit) context.getInputSplit();//根据文件切片得到文件名String fileName = fileSplit.getPath().getName();for (String filed : fileds) {//key值由单词和文档名称组成,如“MapReduce:file1.txt”KeyInfo.set(filed + ":" + fileName);context.write(KeyInfo, valueInfo);}}
}

InvertedIndexCombiner.java

package top.rabbitcrows.mr.InvertedIndex;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author LEHOSO* @date 2021/11/2* @apinote*/
public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {private static Text info = new Text();//输入:<MapReduce:file3.txt{1,1}>//输出:<MapReduce:file3.txt:2>@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {int sum = 0;    //统计词频for (Text value : values) {sum += Integer.parseInt(value.toString());}int splitIndex = key.toString().indexOf(":");//重新设置value值并由文档名称和词频组成info.set(key.toString().substring(splitIndex + 1) + ":" + sum);//重新设置key值为单词key.set(key.toString().substring(0, splitIndex));context.write(key, info);}
}

InvertedIndexReducer.java

package top.rabbitcrows.mr.InvertedIndex;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author LEHOSO* @date 2021/11/2* @apinote*/
public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {private static Text result = new Text();//输入:<MapReduce:file3.txt:2}>//输出:<MapReduce:file1.txt:1;file2.txt:1;file3.txt:2;>@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {//生成文档列表String fileList = new String();for (Text value : values) {fileList += value.toString() + ";";}result.set(fileList);context.write(key, result);}
}

InvertedIndexDriver.java

package top.rabbitcrows.mr.InvertedIndex;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author LEHOSO* @date 2021/11/2* @apinote*/
public class InvertedIndexDriver {public static void main(String[] args)throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();
//        conf.set("mapreduce.framework.name", "local");Job job = Job.getInstance(conf);job.setJarByClass(InvertedIndexDriver.class);job.setMapperClass(InvertedIndexMapper.class);job.setReducerClass(InvertedIndexReducer.class);job.setCombinerClass(InvertedIndexCombiner.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 指定要处理的数据所在的位置FileInputFormat.setInputPaths(job,new Path("input/InvertedIndex/"));// 指定处理完成之后的结果所保存的位置FileOutputFormat.setOutputPath(job,new Path("output/InvertedIndex"));// 提交程序并且监控打印程序执行情况boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}
}

MapReduce 经典案例——数据去重

DedupMapper.java

package top.rabbitcrows.mr.dedup;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author LEHOSO* @date 2021/11/5* @apinote*/
public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable> {private static Text field = new Text();//<0,2021-11-1 a><11,2021-11-2 b>@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {field = value;//NullWritable.get()方法设置空值context.write(field, NullWritable.get());// <2018-3-3 c,null> <2018-3-4 d,null>}
}

DedupReducer.java

package top.rabbitcrows.mr.dedup;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author LEHOSO* @date 2021/11/5* @apinote*/
public class DedupReducer extends Reducer<Text, NullWritable,Text,NullWritable> {//<2021-11-1,a,null><2021-11-2,b,null><2021-11-3,c,null>@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {context.write(key,NullWritable.get());}
}

DedupDriver.java

package top.rabbitcrows.mr.dedup;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author LEHOSO* @date 2021/11/5* @apinote*/
public class DedupDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(DedupDriver.class);job.setMapperClass(DedupMapper.class);job.setReducerClass(DedupReducer.class);job.setOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path("input/Dedup"));// 指定处理完成之后的结果所保存的位置FileOutputFormat.setOutputPath(job, new Path("output/Dedup"));job.waitForCompletion(true);}
}

MapReduce 经典案例——TopN

TopNMapper.java

package top.rabbitcrows.mr.topN;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
import java.util.TreeMap;/*** @author LEHOSO* @date 2021/11/5* @apinote*/
public class TopNMapper extends Mapper<LongWritable, Text, NullWritable, IntWritable> {private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>();// <0,10 3 8 7 6 5 1 2 9 4>// <xx,11 12 17 14 15 20>@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, IntWritable>.Context context) throws IOException, InterruptedException {String line = value.toString();String[] nums = line.split(" ");for (String num : nums) {//读取每行数据写入TreeMap,超过5个就会移除最小的数值repToRecordMap.put(Integer.parseInt(num), " ");if (repToRecordMap.size() > 5) {repToRecordMap.remove(repToRecordMap.firstKey());}}}//重写cleanup()方法,读取完所有文件行数据后,再输出到Reduce阶段@Overrideprotected void cleanup(Mapper<LongWritable, Text, NullWritable, IntWritable>.Context context) throws IOException, InterruptedException {for (Integer i : repToRecordMap.keySet()) {try {context.write(NullWritable.get(), new IntWritable(i));} catch (Exception e) {e.printStackTrace();}}}
}

TopNReducer.java

package top.rabbitcrows.mr.topN;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.Comparator;
import java.util.TreeMap;/*** @author LEHOSO* @date 2021/11/5* @apinote*/
public class TopNReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>(new Comparator<Integer>() {//返回一个基本类型的整型,谁大谁排后面.//返回负数表示:o1 小于o2//返回0表示:表示:o1和o2相等//返回正数表示:o1大于o2。public int compare(Integer a, Integer b) {return b - a;}});public void reduce(NullWritable key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {for (IntWritable value : values) {repToRecordMap.put(value.get(), " ");if (repToRecordMap.size() > 5) {repToRecordMap.remove(repToRecordMap.firstKey());}}for (Integer i : repToRecordMap.keySet()) {context.write(NullWritable.get(), new IntWritable(i));}}
}

TopNDriver.java

package top.rabbitcrows.mr.topN;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** @author LEHOSO* @date 2021/11/5* @apinote*/
public class TopNDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance();job.setJarByClass(TopNDriver.class);job.setMapperClass(TopNMapper.class);job.setReducerClass(TopNReducer.class);job.setNumReduceTasks(1);//map阶段输出的keyjob.setMapOutputKeyClass(NullWritable.class);//map阶段输出的valuejob.setMapOutputValueClass(IntWritable.class);//reduce阶段输出的keyjob.setOutputKeyClass(NullWritable.class);//reduce阶段输出的valuejob.setMapOutputValueClass(IntWritable.class);FileInputFormat.setInputPaths(job, new Path("input/TopN/num.txt"));FileOutputFormat.setOutputPath(job, new Path("output/TopN"));boolean res = job.waitForCompletion(true);System.out.println(res ? 0 : 1);}}

Github下载地址

(HadoopDemo)[https://github.com/lehoso/HadoopDemo]

三、MapReduce编程实例相关推荐

  1. 大数据之Hadoop学习——动手实战学习MapReduce编程实例

    文章目录 一.MapReduce理论基础 二.Hadoop.Spark学习路线及资源收纳 三.MapReduce编程实例 1.自定义对象序列化 需求分析 报错:Exception in thread ...

  2. MapReduce编程实例

    实验目的 搭建MapReduce编程模型 配置Eclipse和Maven Hadoop集群与启动顺序 MapReduce的WordCount应用 书上代码练习 学习编写一个MapReduce程序 实验 ...

  3. hadoop中使用MapReduce编程实例

    原文链接:http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html 从网上搜到的一篇hadoop的编程实例,对于初学者真是帮助太大 ...

  4. mapreduce编程实例(1)-统计词频

    今天开始把MapReduce Design Patterns这本书上的mapreduce例子过一遍,我觉得这本书对学mapreduce编程非常好,把这本书看完了,基本上能遇到的mapreduce问题也 ...

  5. mapreduce编程实例python-使用Python实现Hadoop MapReduce程序

    在这个实例中,我将会向大家介绍如何使用Python 为Hadoop编写一个简单的MapReduce 程序. 尽管Hadoop 框架是使用Java编写的但是我们仍然需要使用像C++.Python等语言来 ...

  6. mapreduce编程实例python-使用Python语言写Hadoop MapReduce程序

    原标题:使用Python语言写Hadoop MapReduce程序 Python部落(python.freelycode.com)组织翻译,禁止转载,欢迎转发. 在本教程中,我将描述如何使用Pytho ...

  7. mapreduce编程实例python-Python模拟MapReduce的流程

    说一下开发环境,是在WIN7下面进行的. python环境是:python2.7 MapReduce的主要流程有: Map阶段->Shuffle阶段->Reduce阶段. 那么一下分别对应 ...

  8. mapreduce编程实例python-Python编写MapReduce作业的简单示例

    这篇文章主要为大家详细介绍了Python编写MapReduce作业的简单示例,具有一定的参考价值,可以用来参考一下. 对python这个高级语言感兴趣的小伙伴,下面一起跟随512笔记的小编两巴掌来看看 ...

  9. Hadoop那些事儿(四)---MapReduce编程实例(基础)

    前言 上一篇文章,以WordCount为例讲了一下MapReduce的代码结构及运行机制,这篇文章将通过几个简单的例子进一步认识MapReduce. 1.数据检索 问题描述 假设有很多条数据,我们从中 ...

最新文章

  1. 开源!北大研究生把《统计学习方法》书中全部算法都实现了!
  2. linux清空文件内容
  3. linux桌面文件夹改图标,Linux 给桌面程序设置个性化图标
  4. 修改java和mysql_关于mysql和java的数据修改
  5. 有赞统一接入层架构演进
  6. Cannot find package module @sap/cds/common
  7. 日本电影《摇摆》:男人之间的心灵碰撞
  8. react实现多行文本超出加省略号
  9. [翻译]:怎样从C/C++代码中对C#进行回调
  10. mysql 使用内置函数来进行模糊搜索(locate()等)
  11. 经典伴读_java8实战_一网打尽
  12. 线上展示3D可视化电子沙盘管理系统
  13. 最优化理论——最速下降法
  14. Unity Kinect添加自定义姿势识别
  15. 【技巧】屏蔽百度搜索热点和相关软件推荐等(提高注意力)
  16. 14 POJ3363 Annoying painting tool
  17. c++、python实现置换密码(栅栏技术,列置换)、替代密码(加法密码(Caesar密码),乘法密码)
  18. 大学计算机网络配置实验报告,北京理工大学-计算机网络实践-IP路由配置与路由协议分析实验报告.docx...
  19. 哈尔滨车牌摇号系统遭遇黑客攻击瘫痪
  20. matlab回归分析sst_线性回归(2)—— 模型评估

热门文章

  1. git 删除本地/远程分支
  2. Windows环境下搭建Shell环境
  3. python 执行alias_设置bash alias别名及取消
  4. 【大厂阿里程序员的薪资】简述阿里的薪酬体系和职级
  5. python用于硬件_python之计算机硬件知识
  6. 电话语音机器人使用心得
  7. 求职时,常见的招聘陷阱有哪些呢?
  8. 鸿蒙之境大司命,神都夜行录鸿蒙之境怎么打 神都夜行录鸿蒙之境通关攻略解析...
  9. (附源码)springboot社区居家养老互助服务管理平台 毕业设计 062027
  10. 在GitHub上搭建typora的图床