自定义分组
NameGroup

package test;import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;public class NameGroup  implements RawComparator<ConsumeWritable>{public int compare(ConsumeWritable o1, ConsumeWritable o2) {return o1.getName().compareTo(o2.getName());}/*** 封装key1:zhangsan,135.00  b1=12个字节   key2:yuti,11032 b2=8个字节   * 将组合key转为二进制数组* 比较两个对象在二进制层面* b1 第一个CosumeWritable对象转成的字节数据* s1代表从b1的第几个字节比较* l1代表b1的长度* compareBytes(b1,s1,l1-4(比较字节个数))*   */public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4);}}

ConsumeWritable

package test;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class ConsumeWritable implements WritableComparable<ConsumeWritable>{private  String name;private float money;public ConsumeWritable() {}public ConsumeWritable(String name, float money) {super();this.name = name;this.money = money;}//从源码中的获得public void  set(String  name,float money){this.name=name;this.money=money;}public String getName() {return name;}public void setName(String name) {this.name = name;}public float getMoney() {return money;}public void setMoney(float money) {this.money = money;}//序列化public void write(DataOutput out) throws IOException {out.writeUTF(name);out.writeFloat(money);}//反序列化public void readFields(DataInput in) throws IOException {name=in.readUTF();money=in.readFloat();}public int compareTo(ConsumeWritable o) {//第一次比较int compareTo = this.getName().compareTo(o.getName());if (compareTo !=0) {return compareTo;}//第二次比较  注意:普通的数据类型是没有compaerTo方法 所以要转换为他的包装类return Float.valueOf(this.getMoney()).compareTo(Float.valueOf(o.getMoney()));}//比较对象两个对象,需要重写equals和hashcode()方法@Overridepublic int hashCode() {final int prime = 31;int result = 1;result = prime * result + Float.floatToIntBits(money);result = prime * result + ((name == null) ? 0 : name.hashCode());return result;}@Overridepublic boolean equals(Object obj) {if (this == obj)return true;if (obj == null)return false;if (getClass() != obj.getClass())return false;ConsumeWritable other = (ConsumeWritable) obj;if (Float.floatToIntBits(money) != Float.floatToIntBits(other.money))return false;if (name == null) {if (other.name != null)return false;} else if (!name.equals(other.name))return false;return true;}@Overridepublic String toString() {return name + "," + money;}}

主要程序:

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/*** 主要思想:根据shuffle阶段排序是根据key来排序的* @author Administrator**/
public class SecondSortMapReduce extends Configured implements Tool{//map映射public static class  SecondSortMapper extends Mapper<LongWritable, Text, ConsumeWritable, FloatWritable>{private ConsumeWritable mapOutPutKey = new ConsumeWritable();private FloatWritable mapOutPutValue= new FloatWritable();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//把读取出来的内容 装换为String  类型String  line = value.toString();//通过制表符分割String[] split = line.split("\t");mapOutPutKey.set(split[0], Float.valueOf(split[1]));mapOutPutValue.set(Float.parseFloat(split[1]));System.err.print("key: "+mapOutPutKey.toString());System.err.print("->value: "+mapOutPutValue+"\n");context.write(mapOutPutKey, mapOutPutValue);}}//分区   参数是map输出的public static class MyPartitoner  extends Partitioner<ConsumeWritable, FloatWritable>{@Overridepublic int getPartition(ConsumeWritable key, FloatWritable value,int numPartitions) {//根据hashpatitioner源码的得到return (key.getName().hashCode() & Integer.MAX_VALUE) % numPartitions;}}public static class SecondSortReducer extends Reducer<ConsumeWritable, FloatWritable, Text, FloatWritable>{private  Text  OutPutKey =new Text();private  FloatWritable OutPutValue = new FloatWritable();@Overrideprotected void reduce(ConsumeWritable key,Iterable<FloatWritable> values,Context context )throws IOException, InterruptedException {System.out.print("key:"+key.toString()+"["+"value:");OutPutKey.set(key.getName());for (FloatWritable floatWritable : values) {System.out.print(floatWritable+",");OutPutValue.set(floatWritable.get());context.write(OutPutKey, OutPutValue);}System.out.println("]"+"\n");}}public int run(String[] args) throws Exception {// 1.创建Configuration对象,获取配置文件Configuration conf = new Configuration();// 2.构建MapReduce Job对象Job job = Job.getInstance(conf, this.getClass().getSimpleName());job.setJarByClass(getClass());// 3.输入目录/文件(input) -》 map -》 reduce -》输出路径 (output)// 3.1 设置输入文件所在目录Path inPath = new Path(args[0]);FileInputFormat.setInputPaths(job, inPath);// 3.2 设置Map输出信息job.setMapperClass(SecondSortMapper.class);job.setMapOutputKeyClass(ConsumeWritable.class);job.setMapOutputValueClass(FloatWritable.class);//自定义分区job.setPartitionerClass(MyPartitoner.class);//自定义分组job.setGroupingComparatorClass(NameGroup.class);// 3.3设置reduce的输出信息job.setReducerClass(SecondSortReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FloatWritable.class);// 3.4 设置输出路径Path outPath = new Path(args[1]);FileSystem fs = outPath.getFileSystem(conf);if (fs.exists(outPath)) {fs.delete(outPath, true);}FileOutputFormat.setOutputPath(job, outPath);// 提交job/*** 可以详细显示任务的进度信息 job.submit()这种方式是做不到的*/boolean isSuccessed = job.waitForCompletion(true);// job.submit(); 不推荐return isSuccessed ? 0 : 1;}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();args = new String[] { "hdfs://hive01:8020/input/ceshi.txt", "hdfs://hive01:8020/outputtest1"};int status = ToolRunner.run(conf, new SecondSortMapReduce(), args);System.exit(status);}}

MapReduce二次排序分区,分组优化相关推荐

  1. 详细讲解MapReduce二次排序过程

    2019独角兽企业重金招聘Python工程师标准>>> 我在15年处理大数据的时候还都是使用MapReduce, 随着时间的推移, 计算工具的发展, 内存越来越便宜, 计算方式也有了 ...

  2. Hadoop学习笔记—11.MapReduce中的排序和分组

    Hadoop学习笔记-11.MapReduce中的排序和分组 一.写在之前的 1.1 回顾Map阶段四大步骤 首先,我们回顾一下在MapReduce中,排序和分组在哪里被执行: 从上图中可以清楚地看出 ...

  3. MapReduce二次排序

    2019独角兽企业重金招聘Python工程师标准>>> 默认情况下,Map输出的结果会对Key进行默认的排序,但是有时候需要对Key排序的同时还需要对Value进行排序,这时候就要用 ...

  4. hadoop之MapReduce自定义二次排序流程实例详解

    一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的.在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求 ...

  5. Hadoop Mapreduce分区、分组、二次排序过程详解

    2019独角兽企业重金招聘Python工程师标准>>> 1.MapReduce中数据流动    (1)最简单的过程:  map - reduce    (2)定制了partition ...

  6. Hadoop Mapreduce分区、分组、二次排序过程详解[转]

    徐海蛟 教学用途 1.MapReduce中数据流动 (1)最简单的过程: map - reduce (2)定制了partitioner以将map的结果送往指定reducer的过程: map - par ...

  7. 大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

       前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分 ...

  8. MapReduce自定义二次排序流程

    每一条记录开始是进入到map函数进行处理,处理完了之后立马就入自定义分区函数中对其进行分区,当所有输入数据经过map函数和分区函数处理完之后,就调用自定义二次排序函数对其进行排序. MapReduce ...

  9. Mapreduce的排序、全排序以及二次排序

    一:背景 Hadoop中虽然有自动排序和分组,由于自带的排序是按照Key进行排序的,有些时候,我们希望同时对Key和Value进行排序.自带的排序功能就无法满足我们了,还好Hadoop提供了一些组件可 ...

最新文章

  1. 关于java调用Dll文件的异常 %1 不是有效的 Win32 应用程序。
  2. 机器学习算法的基础知识
  3. C++ this指针初步使用,与链式编程
  4. autojs 按下状态_AutoJs4.1.0实战教程---最后惊喜的一篇
  5. JavaTPoint Python 中文教程【翻译完成】
  6. javascript实现页面滚屏效果
  7. python3.7 matplotlib增加坐标说明_python matplotlib:如何在极坐标图中的轴和刻度标签之间插入更多空格?...
  8. log4j教程_Log4j教程
  9. 批处理批量更改文件名并排序
  10. emply() php,thinkphp3.2.3 分页代码分享
  11. MYSQL have_innodb DISABLED无法创建innodb类型的表
  12. 常用验证码功能实现大全
  13. 三调数据库标准和输出表格
  14. PHP之 通过银行卡账号匹配银行名称
  15. Instant Contiki
  16. Veracrypt加密,配合WD Security西数移动硬盘加密
  17. 用matlab语言实现下面的分段函数,分段函数的运算与可视化的MATLAB实现
  18. python 报错'tuple' object does not support item assignment
  19. Docker上部署SpringBoot项目并推送镜像到Docker Hub上---以MacOS为例
  20. 计算机的英语作文模板,高中英语作文模板 第243期:My Computer 我的电脑

热门文章

  1. 千兆以太网一致性测试
  2. 西电软工操作系统实验:编译Ubuntu18.04新内核并添加系统调用(含代码以及详细分析)
  3. android美团底部栏实现,仿美团APP的底部滑动菜单实现
  4. 云转码:express-ffmpeg 免费开源云转码切片平台
  5. VMware虚拟机装win7系统时遇到vmware tools无法安装的问题
  6. WMI Provider Host可以关闭吗?
  7. oppo手机android 版本号,oppo手机怎么升级到安卓11
  8. 蓝宝石 590 GME D5 8G可以刷的vbios,黑苹果免驱
  9. 服务器被黑,服务器管理工具 .msc不能用,msc打不开
  10. 2022-2028年中国婴童藻油软胶囊行业市场深度监测及投资前景预测报告