更新版的主要是解决上一篇文章里面比较不合理的地方

上一篇文章中在Reduce类里面重写了cleanup方法用于进行第二次排序,虽然可以实现要求,但是比较不科学

在本文章中将分两个两个MapReduce任务来执行。

补充概念:在Hadoop中,每个MapReduce任务都被初始化为一个job,每个job又可分为两个阶段:map阶段和reduce阶段。这两个阶段分别用两个函数来表示。Map函数接收一个<key,value>形式的输入,然后同样产生一个<key,value>形式的中间输出,Hadoop会负责将所有具有相同中间key值的value集合在一起传递给reduce函数,reduce函数接收一个如<key,(list of values)>形式的输入,然后对这个value集合进行处理,每个reduce产生0或1个输出,reduce的输出也是<key,value>形式。

TradeBean类:

package com.wqs.myWritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class TradeBean implements WritableComparable<TradeBean>{private String name;private int income;private int pay;private int profit;public TradeBean() {super();// TODO 自动生成的构造函数存根}public TradeBean(String name, int income, int pay, int profit) {super();this.name = name;this.income = income;this.pay = pay;this.profit = profit;}@Overridepublic void readFields(DataInput in) throws IOException {name = in.readUTF();income = in.readInt();pay = in.readInt();profit = in.readInt();}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(name);out.writeInt(income);out.writeInt(pay);out.writeInt(profit);}@Overridepublic int compareTo(TradeBean tradeBean) {if(this.profit > tradeBean.getProfit()) return -1;else if(this.profit < tradeBean.getProfit()) return 1;else if(this.income > tradeBean.getIncome()) return -1;else if(this.income < tradeBean.getIncome()) return -1;else return 0;}@Overridepublic String toString() {return name + " " + income + " " + pay + " " + profit;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getIncome() {return income;}public void setIncome(int income) {this.income = income;}public int getPay() {return pay;}public void setPay(int pay) {this.pay = pay;}public int getProfit() {return profit;}public void setProfit(int profit) {this.profit = profit;}}

Map类:

package com.wqs.myWritableComparable;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class Map extends Mapper<Object, Text, Text, TradeBean>{private TradeBean bean = new TradeBean();private Text name = new Text();@Overrideprotected void map(Object key, Text value, Context context) throws IOException, InterruptedException{String line = value.toString();String[] temp = line.split(" ");name.set(temp[0]);bean.setName(temp[0]);bean.setIncome(Integer.valueOf(temp[1]));bean.setPay(Integer.valueOf(temp[2]));bean.setProfit(0);context.write(name, bean);}
}

Reduce类:

package com.wqs.myWritableComparable;import java.io.IOException;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class Reduce extends Reducer<Text, TradeBean, TradeBean, NullWritable>{@Overrideprotected void reduce(Text k2, Iterable<TradeBean> vs2, Context context) throws IOException, InterruptedException {String name = null;int income = 0;int pay = 0;int profit = 0;for (TradeBean tradeBean : vs2) {income += tradeBean.getIncome();pay += tradeBean.getPay();}name = k2.toString();profit = income - pay;context.write(new TradeBean(name, income, pay, profit), NullWritable.get());}
}

Map2类:

package com.wqs.myWritableComparable;import java.io.IOException;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class Map2 extends Mapper<Object, Text, TradeBean, NullWritable> {private TradeBean bean = new TradeBean();@Overrideprotected void map(Object key, Text value, Context context) throws IOException, InterruptedException{String line = value.toString();String[] temp = line.split(" ");bean.setName(temp[0]);bean.setIncome(Integer.valueOf(temp[1]));bean.setPay(Integer.valueOf(temp[2]));bean.setProfit(Integer.valueOf(temp[3]));context.write(bean, NullWritable.get());}
}

Reduce2类:

package com.wqs.myWritableComparable;import java.io.IOException;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;public class Reduce2 extends Reducer<TradeBean, NullWritable, TradeBean, NullWritable> {@Overrideprotected void reduce(TradeBean k2, Iterable<NullWritable> vs2, Context context) throws IOException, InterruptedException {context.write(k2, NullWritable.get());}
}

Main类:

package com.wqs.myWritableComparable;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class Main {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();System.setProperty("hadoop.home.dir", "E:/hadoop-2.7.7");args = new String[] { "/demo03/in/", "/demo03/out", "/demo03/out2" };String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length != 3){System.err.println("Usage:InvertedIndex");System.exit(2);}Job job = Job.getInstance();job.setJarByClass(Main.class);job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TradeBean.class);job.setOutputKeyClass(TradeBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPath(job, new Path("hdfs://192.168.222.128:9000" + args[0]));FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.222.128:9000" + args[1]));Job job2 = Job.getInstance();job2.setJarByClass(Main.class);job2.setMapperClass(Map2.class);job2.setReducerClass(Reduce2.class);job2.setMapOutputKeyClass(TradeBean.class);job2.setMapOutputValueClass(NullWritable.class);job2.setOutputKeyClass(TradeBean.class);job2.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPath(job2, new Path("hdfs://192.168.222.128:9000" + args[1]));FileOutputFormat.setOutputPath(job2, new Path("hdfs://192.168.222.128:9000" + args[2]));//等待job执行完毕之后再执行job2if (job.waitForCompletion(true)) {System.exit(job2.waitForCompletion(true) ? 0 : 1);}}
}

更新:MapReduce编程之自定义序列化类及自定义排序2相关推荐

  1. SpringSecurity权限管理框架系列(七)-SpringSecurity自定义配置类中自定义Filter的使用详解

    1.Filter请求过滤器 filter请求过滤器可以帮助我们进行HttpServletRequest请求和HttpServletResponse响应的过滤 在自定义的Filter过滤器中我们可以对我 ...

  2. Remoting系列专题---自定义序列化类

    最近项目开发中的传输数据是围绕Remoting而召开的,所以想把所有的数据实体都定义统一的格式,于是就写了一个基于DataTable的基类BaseModal,其他数据实体全部继承于它.此BaseMod ...

  3. 【Kafka】Kafka 使用传统的 avro API 自定义序列化类和反序列化类

    1.概述 参考:https://www.jianshu.com/p/4f724c7c497d 做的自己的实验. stock.avsc文件生成相应的实体类,请参考 2.项目结构 3.定义 schema ...

  4. flume1.9自定义hbaseSink(实际是自定义序列化类)

    关于版本 由于flume1.9以下不支持hbase2.0以上,所以必须用1.9的版本; flume1.9 hbase2.0.0 实现方式 实现HBase2EventSerializer接口,注意是2; ...

  5. 4 MapReduce编程框架

    MapReduce编程框架 第 1 节 MapReduce思想 MapReduce思想在⽣活中处可见.我们或多或少都曾接触过这种思想.MapReduce的思想核⼼是分而治之,充分利用了并⾏处理的优势. ...

  6. Hadoop的MR编程实现partition、sort和自定义outputformat

    Partitioner public abstract class Partitioner<KEY, VALUE> (key.hashCode() & Integer.MAX_VA ...

  7. MapReduce编程框架

    1.MapReduce思想 MapReduce思想在生活中处处可见.我们或多或少都曾接触过这种思想.MapReduce的思想核心是分而治之,充分利用了并行处理的优势.即使是发布过论文实现分布式计算的谷 ...

  8. 获取类权限定名_自定义认证类、权限类

    自定义认证类,自定义权限类 一 自定义认证类 思路: 实现了自己确定身份的方式 1 定制自己的token串格式 2 应用drf-token里面的方法来实现token解密 3 然后写写确定身份的逻辑 u ...

  9. MapReduce编程——输入类FileInputFormat(切片)及其4个实现类(kv)的用法

    一个完整的MapReduce程序包括四个阶段:Map Task阶段.Shuffle阶段.Reduce Task阶段 InputFormat是一个抽象类,用于获取Input输入数据,并将其切分和打成&l ...

最新文章

  1. python 将列和索引的值变换_【编辑小组成长日记】Python学习第二期
  2. 初学算法-快速排序与线性时间选择(Deterministic Selection)的C++实现
  3. javascript之ua与urlSchema
  4. Kubernetes + .NET Core 的落地实践
  5. neo4j导入两个文件_Neo4j:找到两个纬度/经度之间的中点
  6. char p[]与char *p的区别
  7. 前端基础-html-水平线标签
  8. php 数据相加,PHP数组合并之array_merge和数组相加
  9. 生物信息学 | GEO介绍与安装
  10. 音视频聊天开发: 1 视频采集
  11. MPB:遗传发育所白洋组-​高通量分离培养和鉴定植物根系细菌
  12. 微信开发遇到的那些坑
  13. dnf超时空漩涡副本路线流程图_DNF超时空漩涡怎么打 队伍配置攻坚路线兵营boss攻略...
  14. android 源代码 毛笔,Android-毛笔的探索与开发
  15. 用聊天记录当证据 对方改了微信号怎么证明他是他
  16. springmvc防xss脚本注入攻击,springmvc过滤html和js标签,html和js标签转义
  17. 门户通专访爱思网创始人韩笑:SNS网站必然走向实用化!
  18. #BJTUOJ 铁憨憨骑士的小队分配(图论缩点+思维)
  19. 用ThoughtWorks.QRCode生成二维码时出现“索引超出了数组界限”的错误
  20. 远程工作:从300到300万,一个远程外包团队的发展历程和经验【转载】

热门文章

  1. ROS wiki绝对是相关的功能包的介绍最完整的网站。
  2. 什么是电力系统安全I区,安全II,安全III区
  3. 屏幕录像软件哪个好用?
  4. ipv6地址技术详解
  5. 现在很多的APP都有“附近的人“功能,这是哪个知识实现的呢!
  6. 论文阅读-《结合分析直接照明和随机阴影》
  7. 数据结构知识点思维导图(绪论)
  8. 银联宣布:无卡时代来了!有何神奇之处
  9. 手机号,身份证,银行卡号数据脱敏
  10. 春招快来了,杭电研究生学长聊聊踩过的坑,助你成功上岸