需求:

1949-10-01 14:21:02  34c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
1951-07-03 12:21:03 47c

统计每一年的每一个月中的气温最高的前三个,且一年的数据结果输出到一个文件。

需求分析:

分组这里还会用到比较器,要按照年、月分组,因此这个自定义KEY比较复杂了就,包含了年、月、温度。

案例实现:

在MR过程中Key往往用于分组或排序,当hadoop内置的key键的数据类型不能满足需求时,就需要自定义key了。接下来马上先定义一个键。

1.自定义Key

我们以前自定义的Mapper类中碰见最多的键是Text,来参考一下:public class Text extends BinaryComparable implements WritableComparable。他的类实现了一个接口WritableComparable。根据上面的注释:

<p>Example:</p>* <p><blockquote><pre>*     public class MyWritableComparable implements WritableComparable<MyWritableComparable> {*       // Some data*       private int counter;*       private long timestamp;*       *       public void write(DataOutput out) throws IOException {*         out.writeInt(counter);*         out.writeLong(timestamp);*       }*       *       public void readFields(DataInput in) throws IOException {*         counter = in.readInt();*         timestamp = in.readLong();*       }*       *       public int compareTo(MyWritableComparable o) {*         int thisValue = this.value;*         int thatValue = o.value;*         return (thisValue &lt; thatValue ? -1 : (thisValue==thatValue ? 0 : 1));*       }**       public int hashCode() {*         final int prime = 31;*         int result = 1;*         result = prime * result + counter;*         result = prime * result + (int) (timestamp ^ (timestamp &gt;&gt;&gt; 32));*         return result*       }*     }* </pre></blockquote></p>

这里要提两个接口:Writable接口和WritableComparable接口。在MR最终实现Writable接口的类可以是值,而实现WritableComparable接口的类可以是键,也可以是值。我自己可以定义一个MyWritableComparable来实现这个接口。这个接口继承了两个接口,writable接口定义了序列化和反序列化,compareable就负责比较。

spill to disk的过程中,调用快排算法的时候会调用比较器,优先调用户自定义比较器,其次才是KEY自己的比较器。把“温度”放到KEY里面去比较,比较合适(根据需求)。并且,有可能会用到分组来做聚合,分组得用“年-月”来分组,聚合的时候把数据按照分组聚合。

package com.husky.hadoop.weather;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class MyKey implements WritableComparable<MyKey>{private int year;private int month;private double temperature;public MyKey() {super();}public MyKey(int year, int month, double temperature) {super();this.year = year;this.month = month;this.temperature = temperature;}public int getYear() {return year;}public void setYear(int year) {this.year = year;}public int getMonth() {return month;}public void setMonth(int month) {this.month = month;}public double getTemperature() {return temperature;}public void setTemperature(double temperature) {this.temperature = temperature;}/*** 把对象写到流里面去,就是序列化和反序列化* */@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(year);out.writeInt(month);out.writeDouble(temperature);}/*** 把对象从输入流里面读出来* */@Overridepublic void readFields(DataInput in) throws IOException {this.year=in.readInt();this.month=in.readInt();this.temperature=in.readDouble();}/*** 当前key的比较方法,在排序时调用。返回0、正数、负数* 不能只比较温度,必须得在年月相同的情况下,再去比较温度* */@Overridepublic int compareTo(MyKey o) {int r1 = Integer.compare(this.getYear(), o.getYear());if (r1==0) {int r2 = Integer.compare(this.getMonth(), o.getMonth());if (r2==0) {//降序排序return -Double.compare(this.getTemperature(), o.getTemperature());}return r2;}return r1;}}

2.Mapper类

map是先读取一行数据,1949-10-01 14:21:02 34c ——> MyKey(1949,10,36):Text

K-V默认用的是偏移量和读取的一条记录,用到的是FileInputFormat,但是我们可以换掉。用KeyValueTextInputFormat,它用到的是KeyValueLineRecordReader:

public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {@Overrideprotected boolean isSplitable(JobContext context, Path file) {final CompressionCodec codec =new CompressionCodecFactory(context.getConfiguration()).getCodec(file);if (null == codec) {return true;}return codec instanceof SplittableCompressionCodec;}public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,TaskAttemptContext context) throws IOException {context.setStatus(genericSplit.toString());return new KeyValueLineRecordReader(context.getConfiguration());}}

看一下KeyValueLineRecordReader的一个关键方法setKeyValue:

 public static void setKeyValue(Text key, Text value, byte[] line,int lineLen, int pos) {if (pos == -1) {key.set(line, 0, lineLen);value.set("");} else {key.set(line, 0, pos);value.set(line, pos + 1, lineLen - pos - 1);}}

pos==-1表示没有制表符,就把整个一行都作为key,把value set(“”)。否则,制表符前的key(1949-10-01 14:21:02),后面为value(34c)。所以KEYIN就是Text

package com.husky.hadoop.weather;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class WeatherMapper extends Mapper<Text, Text, MyKey, Text>{static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");//得到时间对象@Overrideprotected void map(Text key, Text value, Context context)throws IOException, InterruptedException {//KEY:1949-10-01 14:21:02 ;VALUE:36Ctry {//根据时间对象去把年、月取出来Date date = sdf.parse(key.toString());Calendar c = Calendar.getInstance();//下面继续拿年和月c.setTime(date);int year = c.get(Calendar.YEAR);//拿出yearint month = c.get(Calendar.MONTH);//拿出month//把34C切割,拿出34double temperature = Double.parseDouble(value.toString().substring(0, value.toString().length()-1));MyKey outkey = new MyKey(year,month,temperature);Text outvalue = new Text(key+"\t"+value);context.write(outkey, outvalue);} catch (ParseException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}

3.自定义分区类

一条记录经过Map之后,K-V要打上P的标签明确自己未来要去哪个分区。很明显这里不能根据value的值来确定分区号,必须得根据year来确定分区号:

package com.husky.hadoop.weather;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
//从Mapper出来的数据类型就是<MyKey,Text>
public class MyPartitioner extends Partitioner<MyKey, Text>{@Overridepublic int getPartition(MyKey key, Text value, int numPartitions) {// TODO Auto-generated method stubreturn key.getYear()%numPartitions;//分区的数量就是reduce的数量,这里是3}}

4.自定义分组比较器

根据年月分组,此时已经排好序了,因此温度就不用再考虑了。只需要管年月并进行分组就可以了。自定义分组比较器一定有构造方法,并提供比较方法

package com.husky.hadoop.weather;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class MyGroupCompareTo extends WritableComparator{//构造方法public MyGroupCompareTo(){super(MyKey.class,true);//指定类,告诉它用哪个类比较,true表示是否构造当前对象}public int compare(WritableComparable a,WritableComparable b){//类型强转MyKey k1 = (MyKey)a;MyKey k2 = (MyKey)b;//先比较年,后比较月,得到结果直接returnint r1 = Integer.compare(k1.getYear(), k2.getYear());if (r1==0) {return Integer.compare(k1.getMonth(), k2.getMonth());//返回0}return r1;//返回非0}}

5.自定义Reducer类

数据经过分组,就可以流入Reducer中了。Reducer的输入,就是Mapper的输出,所以KEYIN为MyKey,VALUEIN为Text。反观Reducer的输出就很自由了,KEYOUT可以是Text,也可以是NullWritable,VALUEOUT也是如此。我们传入Reducer的是一整坨数据“1949-10-01 14:21:02 34c”

package com.husky.hadoop.weather;import java.io.IOException;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class WeatherReducer extends Reducer<MyKey, Text, Text, NullWritable>{@Overrideprotected void reduce(MyKey key, Iterable<Text> iter, Context context)throws IOException, InterruptedException {int num = 0;for(Text value : iter){if (num>=3) {break;}//key:1950-10-02 12:21:02  41c;value为null。反过来也行,我想咋滴就咋滴context.write(value, NullWritable.get());//输出num++;}}
}

6.Client客户端类

在提交Job任务之前,需要对分组比较器、输入格式化类、分区器作出设置

package com.husky.hadoop.weather;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import com.husky.hadoop.wc.MyMapper;
import com.husky.hadoop.wc.MyReducer;
import com.husky.hadoop.wc.MyWC;public class RunJob {public static void main(String[] args) {客户端自动读取配置文件,并把配置信息加载到conf对象中Configuration conf = new Configuration(true);try {//jobJob job = Job.getInstance(conf);FileSystem fs = FileSystem.get(conf);//必须要配置的,入口类job.setJarByClass(RunJob.class);//设置job namejob.setJobName("weather");//设置Mapper和Reducerjob.setMapperClass(WeatherMapper.class);job.setReducerClass(WeatherReducer.class);//设置分组比较器job.setGroupingComparatorClass(MyGroupCompareTo.class);//弃用FileInputFormat,改用KeyValueTextInputFormatjob.setInputFormatClass(KeyValueTextInputFormat.class);//指定自定义分区类job.setPartitionerClass(MyPartitioner.class);//设置输出的K-V类型job.setOutputKeyClass(MyKey.class);job.setOutputValueClass(Text.class);//设置reduce的数量,默认1job.setNumReduceTasks(3);//设置计算输入数据,path就是hdfs上的文件路径FileInputFormat.addInputPath(job, new Path("/input/weather"));//设置计算输出目录,最后的计算结果要在这该目录中Path outPath = new Path("/output/weather/");//该目录必须不存在,否则计算容易出错if (fs.exists(outPath)) { //如果目录存在就删除fs.delete(outPath,true);}FileOutputFormat.setOutputPath(job, outPath);//开始执行boolean f = job.waitForCompletion(true);if (f) {System.out.println("MapReduce程序执行成功!");}} catch (Exception e) {// TODO: handle exception}}}

执行结果

根据年来定义的分区数量,其中一个的结果为:

1950-01-01 11:21:02  32c
1950-10-02 12:21:02 41c
1950-10-01 12:21:02 37c
1950-10-03 12:21:02 27c

MapReduce天气案例相关推荐

  1. 大数据 - MapReduce编程案例 -BH3

    MapReduce编程案例 用mapreduce解决问题的关键是确定key,只有key相同的结果才会到同一个reduce中进行处理 默认分区使用HashPartitoner,hashCode%redu ...

  2. MapReduce经典案例总结

    MapReduce经典案例总结 首先得搭好hadoop环境,windows搭好单机环境 1.根据手机号统计上网流量,统计上行流量.下行流量和总流量 数据如下,文件 flow.log,上传到hadoop ...

  3. hadoop之mapreduce教程+案例学习(二)

    第3章 MapReduce框架原理 目录 第3章 MapReduce框架原理 3.1 InputFormat数据输入 3.1.1 切片与MapTask并行度决定机制 3.1.2 Job提交流程源码和切 ...

  4. hadoop之mapreduce教程+案例学习(一)

    第1章 MapReduce概述 目录 第1章 MapReduce概述 1.1 MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发"基于Hadoop的数据分析 ...

  5. Vue9.2天气案例_监视属性

    <!DOCTYPE html>> <html><head><title>天气案例_监视属性</title><meta chars ...

  6. 【赵强老师】MapReduce编程案例之求工资总额

    先看视频. [赵强老师]MapReduce编程案例之求工资总额 Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上, ...

  7. MapReducer——MapReduce编程案例:求部门的工资总额(2)

    MapReduce编程案例:求部门的工资总额 1.员工表  SQL:select deptno,sum(sal) from emp group by deptno; 2.分析数据处理的过程 3.开发程 ...

  8. MapReduce基本案例

    MapReduce基本案例 案例1. 单词统计 案例2.序列化重写 案例1. 单词统计 对文件里的单词进行计数 输入数据 ss ss cls cls jiao banzhang xue hadoop ...

  9. MapReduce(深入)---案例之用户上行流量 下行流量 总流量倒序 按省份分区

    1. MapReduce的输入和输出 MapReduce执行流程图 详细图解如下 maptask通过自带的TextInputFormat将数据按照一行一行的读取 , 用每一行的起始偏移量作为k , 每 ...

最新文章

  1. ETL学习总结(2)——ETL数据集成工具之kettle、sqoop、datax、streamSets 比较
  2. 使用Maven插件对项目进行打包
  3. linux系统工程师修改打开文件数限制代码教程。服务器运维技术
  4. c# foreach循环二维数组
  5. jpype了解,获取,安装
  6. 软件开发培训要学多久?怎么学?软件开发培训班多少钱?
  7. 疫情推动下的云联络中心终于引起了销售行业的重视。
  8. 'E:\AndroidSDK\platform-tools\adb.exe start-server' failed -- run manually if necessary
  9. cyusb3014的slavefifo程序的解读
  10. [初级理论]给老婆做测试培训-02
  11. 团员大会如何写组织学习计算机知识,团支部召开接收新团员的支部大会的会议记录怎么写?...
  12. VuePress构建一个文档管理网站
  13. 【算法详解】splay的初步了解
  14. 2012年移动互联网小心倒春寒
  15. ipynb 文件转 py
  16. linux逻辑卷管理(LVM )
  17. 某个牛人做的WINDOWS系统文件详解
  18. 深圳大数据培训:好程序员大数据学习路线之hive 存储格式
  19. Django连接MySQL对数据进行网页展示
  20. vb.net 文本转语音

热门文章

  1. python画笔初始位置_turtle绘画-移动落笔点(改变初始原点)
  2. 【vue插件篇】vue-form-check 表单验证
  3. C4D中重点、难点分析
  4. Node-流(Stream)三二事-1.0.1版本
  5. phpcurl 请求Chunked-Encoded data 遇到的一个问题
  6. 将用户添加至sudoers列表
  7. 东芝发布15nm SG5固态硬盘 容量高达1TB
  8. Postgres-XL部署记录(一)
  9. QCon演讲速递:异步处理在分布式系统中的优化作用
  10. stm32内部的CAN总线