Mapper类

我们自定义MyMapper类并继承Mapper,

import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*** KEYIN:输入数据的k类型,默认偏移量* VALUEIN:该行内容,字符串,hadoop.io.Text下的Text* KEYOUT:输出数据的k类型,Text* VALUEOUT:输出数据的v类型,1* */
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] wordsArr = value.toString().split(" ");//按照空格切割value,得到一个数组//得到数组,开始遍历for (int i = 0; i < wordsArr.length; i++) {String word = wordsArr[i];//取出数组中的值Text outKey = new Text(word);//字符串IntWritable outValue = new IntWritable(1);//单词出现一次,就是1context.write(outKey, outValue);//向外输出}}}

map输出数据要经过聚合,调用比较器。优先调用自定义比较器,若未自定义,就调用默认的比较器(按照ASCII值比较)。聚合后的数据value为1 。

分区器不用管,于是此刻数据就是“K-V-P”,之后放到内存缓冲区、溢写。之后调用比较器,键K的比较器就是Text。

关于Text的比较,源码Comparator提供了比较方法,比较ASCII值,使用字典序。

@Overridepublic int compare(byte[] b1, int s1, int l1,byte[] b2, int s2, int l2) {int n1 = WritableUtils.decodeVIntSize(b1[s1]);int n2 = WritableUtils.decodeVIntSize(b2[s2]);return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);}

a和b的ASCII分别为65、66,所以a在前,b在后。

Reducer类

自定义MyReducer类继承Reducer,参数四个类型。上一步Mapper的输出作为Reducer的输入。
传进来的iter就是“假迭代器”。比较键(单词,比较ASCII值)经过比较之后单词都是一样的,经过迭代累加后,输出。

package com.husky.hadoop.wc;import java.io.IOException;
import java.util.Iterator;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> iter,Context context) throws IOException, InterruptedException {int sum = 0;for(IntWritable i : iter){sum+=i.get();}context.write(key, new IntWritable(sum));}
}

客户端

客户端先和RM联系,通过配置文件,知道集群内角色在哪。

package com.husky.hadoop.wc;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MyWC {public static void main(String[] args) throws Exception {客户端自动读取配置文件,并把配置信息加载到conf对象中Configuration conf = new Configuration(true);//jobJob job = Job.getInstance(conf);FileSystem fs = FileSystem.get(conf);//必须要配置的,入口类job.setJarByClass(MyWC.class);//设置job namejob.setJobName("wc");//设置Mapper和Reducerjob.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);//设置输出的K-V类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//设置reduce的数量,默认1job.setNumReduceTasks(1);//设置计算输入数据,path就是hdfs上的文件路径FileInputFormat.addInputPath(job, new Path("/user/root/test.txt"));//设置计算输出目录,最后的计算结果要在这该目录中//该目录必须不存在,否则计算容易出错。submitter.submitJobInternal(Job.this, cluster)//方法中调用了checkSpecs(job)方法来检查该目录是否存在。Path outPath = new Path("/output/wc/");if (fs.exists(outPath)) { //如果目录存在就删除fs.delete(outPath,true);}FileOutputFormat.setOutputPath(job, outPath);//开始执行boolean f = job.waitForCompletion(true);if (f) {System.out.println("MapReduce程序执行成功!");}}}

点击查看Client提交Job作业相关流程的源码分析!

执行

把编写的这几个类,export成一个jar包,并上传。执行以下命令:

#最后的参数为包名+类名
hadoop jar ./MyWC.jar com.husky.hadoop.wc.MyWC

在执行的过程中,jps会出现YarnChild和MRAppMaster进程。

执行结束,结果文件就已经按照字典序排好顺序了,且会打印出我们的输出信息:
根据为我们提供的插件查看结果:

注意

笔者在进行以上操作时,遇见了以下问题:

这里省略了一万行报错信息...
Note: System times on machines may be out of sync. Check system time and time zones.

报错信息提示我的集群时间并不同步,导致我的task跑不起来。

解决方案:

安装ntpdate工具

yum -y install ntp ntpdate

设置系统时间与网络时间同步

ntpdate cn.pool.ntp.org

最后

WordCount案例是最简单的入门案例,我们需要和MR整个流程对应起来,这能帮助我们理解掌握Shrffle机制原理。所以,这个小案例,最好能做到手写并口述流程。

MapReduce—第一个WordCount程序相关推荐

  1. Hadoop官网的一个WordCount程序

    下面是Hadoop官网的一个WordCount程序: package org.myorg;import java.io.IOException; import java.util.*;import o ...

  2. 一个wordcount程序轻松玩转MapReduce编程模型

    可以毫不夸张的说,几乎开发中绝大部分的MR程序都是基于wordcount编程模型而来,或者说用wordcount变化而来(改变的主要是业务方面的逻辑).所以,熟练掌握wordcount编程模型,是掌握 ...

  3. Flink基础系列8-Flink on yarn运行wordcount程序

    文章目录 环境介绍 一.Maven配置 二.Java代码编写 三.Maven打包并上传 四.运行jar文件 五.运行其它的class文件 参考 环境介绍 测试服务器CDH 6.3.1版本安装Flink ...

  4. Spark在Yarn上运行Wordcount程序

    过往记忆专注于Hadoop.Spark.Hive.Flume.Hbase,QQ交流群:138615359 首页 Hadoop Spark Hive Hbase Flume 面试题 面试题 数据结构 算 ...

  5. MapReduce的工作原理,详细解释WordCount程序

    本篇文章主要说两部分:简单介绍MapReduce的工作原理:详细解释WordCount程序. MapReduce的工作原理 在<Hadoop in action>一书中,对MapReduc ...

  6. MapReduce流程(WordCount案例实现)

    文章目录 1 MapReduce概述 设计构思 实例进程 实例进程分类 完整执行过程 总结 2 MapReduce编程规范 Map阶段2个步骤 Shuffle阶段4个步骤 Reduce阶段2个步骤 3 ...

  7. 2、运行WordCount程序

    转载:http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html 单词计数是最简单也是最能体现MapReduce思想的程序之一,可以 ...

  8. spark学习11(Wordcount程序-本地测试)

    wordcount程序 文件wordcount.txt hello wujiadong hello spark hello hadoop hello python 程序示例 package wujia ...

  9. spark:开发本地测试的wordcount程序

    1.使用Java开发本地测试的wordcount程序-1 2.使用Scala开发本地测试的wordcount程序-1 测试文件上传: hadoop fs -put wordcount.txt /wor ...

最新文章

  1. 虚拟化部署之灵活应用Hyper-V快照
  2. 做支付遇到的HttpClient大坑(一)
  3. FastDFS测试图片上传
  4. 代码review工具:Review Board
  5. 你只差这两步 | 将Sentinel 控制台应用于生产环境
  6. Linux下ftp的安装配置
  7. Unity2018新功能抢鲜 | C# Job System Ⅱ
  8. 一种基于annotation的Spring-mvc权限控制方法
  9. 恐怖logo效果展示AE模板
  10. 安装马上6的问题解决
  11. mysql2008 精简版_精简版 SqlServer2008 的安装和使用
  12. 单片机控制步进电机程序c语言正反转停止,51单片机步进电机正反转停止实验-C51源代码...
  13. JavaWeb - 国家语言代码表
  14. 双向链表DoublyLinkedList类
  15. 【Pytorch】pack_padded_sequence与pad_packed_sequence实战详解
  16. 目标检测模型组件构成
  17. LeetCode Word Abbreviation
  18. PR剪辑-电子相册学习笔记
  19. plsql取消文件备份
  20. kali linux 软件包密钥管理

热门文章

  1. python内置函数有哪些_Python集合有哪些内置函数可以使用,这些内置函数有什么功能...
  2. Android源码解析(一)动画篇-- Animator属性动画系统
  3. 【IntelliJ IDEA】tomcat启动,打印日志乱码问题
  4. 何新生—你是哪国人?
  5. ubuntu 16.04 更新后搜狗输入法无法输入中文的问题
  6. Ubuntu14.04重启网卡不生效
  7. mysql之优化小技巧
  8. forfiles命令批量删除N天前文件
  9. 将页面多个下拉框的值以字符串拼接方式存放至数据库一个字段中
  10. JTAG与STC,DEBUGWIRE区别