MapReduce—第一个WordCount程序
Mapper类
我们自定义MyMapper类并继承Mapper,
import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*** KEYIN:输入数据的k类型,默认偏移量* VALUEIN:该行内容,字符串,hadoop.io.Text下的Text* KEYOUT:输出数据的k类型,Text* VALUEOUT:输出数据的v类型,1* */
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] wordsArr = value.toString().split(" ");//按照空格切割value,得到一个数组//得到数组,开始遍历for (int i = 0; i < wordsArr.length; i++) {String word = wordsArr[i];//取出数组中的值Text outKey = new Text(word);//字符串IntWritable outValue = new IntWritable(1);//单词出现一次,就是1context.write(outKey, outValue);//向外输出}}}
map输出数据要经过聚合,调用比较器。优先调用自定义比较器,若未自定义,就调用默认的比较器(按照ASCII值比较)。聚合后的数据value为1 。
分区器不用管,于是此刻数据就是“K-V-P”,之后放到内存缓冲区、溢写。之后调用比较器,键K的比较器就是Text。
关于Text的比较,源码Comparator提供了比较方法,比较ASCII值,使用字典序。
@Overridepublic int compare(byte[] b1, int s1, int l1,byte[] b2, int s2, int l2) {int n1 = WritableUtils.decodeVIntSize(b1[s1]);int n2 = WritableUtils.decodeVIntSize(b2[s2]);return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);}
a和b的ASCII分别为65、66,所以a在前,b在后。
Reducer类
自定义MyReducer类继承Reducer,参数四个类型。上一步Mapper的输出作为Reducer的输入。
传进来的iter就是“假迭代器”。比较键(单词,比较ASCII值)经过比较之后单词都是一样的,经过迭代累加后,输出。
package com.husky.hadoop.wc;import java.io.IOException;
import java.util.Iterator;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> iter,Context context) throws IOException, InterruptedException {int sum = 0;for(IntWritable i : iter){sum+=i.get();}context.write(key, new IntWritable(sum));}
}
客户端
客户端先和RM联系,通过配置文件,知道集群内角色在哪。
package com.husky.hadoop.wc;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MyWC {public static void main(String[] args) throws Exception {客户端自动读取配置文件,并把配置信息加载到conf对象中Configuration conf = new Configuration(true);//jobJob job = Job.getInstance(conf);FileSystem fs = FileSystem.get(conf);//必须要配置的,入口类job.setJarByClass(MyWC.class);//设置job namejob.setJobName("wc");//设置Mapper和Reducerjob.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);//设置输出的K-V类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//设置reduce的数量,默认1job.setNumReduceTasks(1);//设置计算输入数据,path就是hdfs上的文件路径FileInputFormat.addInputPath(job, new Path("/user/root/test.txt"));//设置计算输出目录,最后的计算结果要在这该目录中//该目录必须不存在,否则计算容易出错。submitter.submitJobInternal(Job.this, cluster)//方法中调用了checkSpecs(job)方法来检查该目录是否存在。Path outPath = new Path("/output/wc/");if (fs.exists(outPath)) { //如果目录存在就删除fs.delete(outPath,true);}FileOutputFormat.setOutputPath(job, outPath);//开始执行boolean f = job.waitForCompletion(true);if (f) {System.out.println("MapReduce程序执行成功!");}}}
点击查看Client提交Job作业相关流程的源码分析!
执行
把编写的这几个类,export成一个jar包,并上传。执行以下命令:
#最后的参数为包名+类名
hadoop jar ./MyWC.jar com.husky.hadoop.wc.MyWC
在执行的过程中,jps会出现YarnChild和MRAppMaster进程。
执行结束,结果文件就已经按照字典序排好顺序了,且会打印出我们的输出信息:
根据为我们提供的插件查看结果:
注意
笔者在进行以上操作时,遇见了以下问题:
这里省略了一万行报错信息...
Note: System times on machines may be out of sync. Check system time and time zones.
报错信息提示我的集群时间并不同步,导致我的task跑不起来。
解决方案:
安装ntpdate工具
yum -y install ntp ntpdate
设置系统时间与网络时间同步
ntpdate cn.pool.ntp.org
最后
WordCount案例是最简单的入门案例,我们需要和MR整个流程对应起来,这能帮助我们理解掌握Shrffle机制原理。所以,这个小案例,最好能做到手写并口述流程。
MapReduce—第一个WordCount程序相关推荐
- Hadoop官网的一个WordCount程序
下面是Hadoop官网的一个WordCount程序: package org.myorg;import java.io.IOException; import java.util.*;import o ...
- 一个wordcount程序轻松玩转MapReduce编程模型
可以毫不夸张的说,几乎开发中绝大部分的MR程序都是基于wordcount编程模型而来,或者说用wordcount变化而来(改变的主要是业务方面的逻辑).所以,熟练掌握wordcount编程模型,是掌握 ...
- Flink基础系列8-Flink on yarn运行wordcount程序
文章目录 环境介绍 一.Maven配置 二.Java代码编写 三.Maven打包并上传 四.运行jar文件 五.运行其它的class文件 参考 环境介绍 测试服务器CDH 6.3.1版本安装Flink ...
- Spark在Yarn上运行Wordcount程序
过往记忆专注于Hadoop.Spark.Hive.Flume.Hbase,QQ交流群:138615359 首页 Hadoop Spark Hive Hbase Flume 面试题 面试题 数据结构 算 ...
- MapReduce的工作原理,详细解释WordCount程序
本篇文章主要说两部分:简单介绍MapReduce的工作原理:详细解释WordCount程序. MapReduce的工作原理 在<Hadoop in action>一书中,对MapReduc ...
- MapReduce流程(WordCount案例实现)
文章目录 1 MapReduce概述 设计构思 实例进程 实例进程分类 完整执行过程 总结 2 MapReduce编程规范 Map阶段2个步骤 Shuffle阶段4个步骤 Reduce阶段2个步骤 3 ...
- 2、运行WordCount程序
转载:http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html 单词计数是最简单也是最能体现MapReduce思想的程序之一,可以 ...
- spark学习11(Wordcount程序-本地测试)
wordcount程序 文件wordcount.txt hello wujiadong hello spark hello hadoop hello python 程序示例 package wujia ...
- spark:开发本地测试的wordcount程序
1.使用Java开发本地测试的wordcount程序-1 2.使用Scala开发本地测试的wordcount程序-1 测试文件上传: hadoop fs -put wordcount.txt /wor ...
最新文章
- 虚拟化部署之灵活应用Hyper-V快照
- 做支付遇到的HttpClient大坑(一)
- FastDFS测试图片上传
- 代码review工具:Review Board
- 你只差这两步 | 将Sentinel 控制台应用于生产环境
- Linux下ftp的安装配置
- Unity2018新功能抢鲜 | C# Job System Ⅱ
- 一种基于annotation的Spring-mvc权限控制方法
- 恐怖logo效果展示AE模板
- 安装马上6的问题解决
- mysql2008 精简版_精简版 SqlServer2008 的安装和使用
- 单片机控制步进电机程序c语言正反转停止,51单片机步进电机正反转停止实验-C51源代码...
- JavaWeb - 国家语言代码表
- 双向链表DoublyLinkedList类
- 【Pytorch】pack_padded_sequence与pad_packed_sequence实战详解
- 目标检测模型组件构成
- LeetCode Word Abbreviation
- PR剪辑-电子相册学习笔记
- plsql取消文件备份
- kali linux 软件包密钥管理
热门文章
- python内置函数有哪些_Python集合有哪些内置函数可以使用,这些内置函数有什么功能...
- Android源码解析(一)动画篇-- Animator属性动画系统
- 【IntelliJ IDEA】tomcat启动,打印日志乱码问题
- 何新生—你是哪国人?
- ubuntu 16.04 更新后搜狗输入法无法输入中文的问题
- Ubuntu14.04重启网卡不生效
- mysql之优化小技巧
- forfiles命令批量删除N天前文件
- 将页面多个下拉框的值以字符串拼接方式存放至数据库一个字段中
- JTAG与STC,DEBUGWIRE区别