一、MapReduce综合案例:MR综合案例
### --- 需求~~~     现在有一些订单的评论数据,需求,将订单按照好评与差评区分开来,
~~~     将数据输出到不同的文件目录下,数据内容如下,其中数据第九个字段表示好评,
~~~     中评,差评。0:好评,1:中评,2:差评。
~~~     现需要根据好评,中评,差评把数据分类并输出到不同的目录中,并且要求按照时间顺序降序排列。
~~~     # 备注:现在有大量类似上面的小文件!

300 东西很不错,物流也很快 \N 1 106 131******33 0 2019-02-06 19:10:13
301 还行,洗完有点干,不知道怎么回事 \N 1 106 136******44 0 2019-03-2214:16:41
302 还可以吧,保质期短,感觉貌似更天然些 \N 1 106 134******34 0 2019-04-1013:40:06
303 还可以吧,保质期短,感觉貌似更天然些 \N 1 105 134******33 0 2019-01-1514:40:21
304 还没用,,不知道效果怎么样 \N 1 105 137******66 0 2019-02-28 18:55:43
305 刚收到,还没用,用后再追评!不过,听朋友说好用,才买的! \N 1 105 138******600 2019-03-13 19:10:09
306 一般,感觉用着不是很好,可能我头发太干了 \N 1 105 132******44 0 2019-04-09 10:35:49
307 非常好用,之前买了10支,这次又买了10支,不错,会继续支持! \N 1 103 131******330 2019-01-15 13:10:46
308 喜欢茶树油的 \N 1 103 135******33 0 2019-02-08 14:35:09
309 好像比其他的强一些,继续使用中 \N 1 103 133******99 0 2019-03-1419:55:36
310 感觉洗后头发很干净,头皮有一定改善。 \N 1 103 138******44 0 2019-04-0922:55:59
311 从出生到现在一直都是惠氏 现在宝宝两周半了 \N 1 157 那***情 0 2017-12-01 06:05:30
312 口感不错,孩子很喜欢。推荐。 \N 1 157 w***4 0 2017-12-12 08:35:06
313 价格优惠,日期新鲜,包装完好!发货速度快,非常喜欢!还有赠品! \N 1 157 j***00 2019-01-09 22:55:41

二、分析
### --- 分析~~~     自定义InputFormat合并小文件
~~~     自定义分区根据评论等级把数据分区
~~~     自定义OutputFormat把数据输出到多个目录

三、开发步骤
### --- 合并小文件
~~~     创建项目:comment.step1
~~~     Mapperpackage com.yanqi.mr.comment.step1;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;//text:代表的是一个文件的path+名称,BytesWritable:一个文件的内容
public class MergeMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write(key, value);}
}

### --- 自定义InputFormat
### --- MergeInputFormatpackage com.yanqi.mr.comment.step1;
//自定义inputformat读取多个小文件合并为一个SequenceFile文件//SequenceFile文件中以kv形式存储文件,key--》文件路径+文件名称,value-->文件的整个内容import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;
import java.util.List;//TextInputFormat中泛型是LongWritable:文本的偏移量, Text:一行文本内容;指明当前inputformat的输出数据类型
//自定义inputformat:key-->文件路径+名称,value-->整个文件内容
public class MergeInputFormat extends FileInputFormat<Text, BytesWritable> {//重写是否可切分@Overrideprotected boolean isSplitable(JobContext context, Path filename) {//对于当前需求,不需要把文件切分,保证一个切片就是一个文件return false;}@Overridepublic List<InputSplit> getSplits(JobContext job) throws IOException {//分片逻辑依然是原始的分片逻辑,一个文件一个maptask,jvm重用优化,uber模式,小文件任务优化?return super.getSplits(job);}//recordReader就是用来读取数据的对象@Overridepublic RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {MergeRecordReader recordReader = new MergeRecordReader();//调用recordReader的初始化方法recordReader.initialize(split, context);return recordReader;}
}

### --- MergeRecordReaderpackage com.yanqi.mr.comment.step1;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;//负责读取数据,一次读取整个文件内容,封装成kv输出
public class MergeRecordReader extends RecordReader<Text, BytesWritable> {private FileSplit split;//hadoop配置文件对象private Configuration conf;//定义key,value的成员变量private Text key = new Text();private BytesWritable value = new BytesWritable();//初始化方法,把切片以及上下文提升为全局@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {this.split = (FileSplit) split;conf = context.getConfiguration();}private Boolean flag = true;//用来读取数据的方法@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {//对于当前split来说只需要读取一次即可,因为一次就把整个文件全部读取了。if (flag) {//准备一个数组存放读取到的数据,数据大小是多少?byte[] content = new byte[(int) split.getLength()];final Path path = split.getPath();//获取切片的path信息final FileSystem fs = path.getFileSystem(conf);//获取到文件系统对象final FSDataInputStream fis = fs.open(path); //获取到输入流IOUtils.readFully(fis, content, 0, content.length); //读取数据并把数据放入byte[]//封装key和valuekey.set(path.toString());value.set(content, 0, content.length);IOUtils.closeStream(fis);//把再次读取的开关置为falseflag = false;return true;}return false;}//获取到key@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {return key;}//获取到value@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return value;}//获取进度@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}//关闭资源@Overridepublic void close() throws IOException {}
}

### --- Reducerpackage com.yanqi.mr.comment.step1;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MergeReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {//输出value值(文件内容),只获取其中第一个即可(只有一个)context.write(key, values.iterator().next());}
}

### --- Driverpackage com.yanqi.mr.comment.step1;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import java.io.IOException;public class MergeDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//        1. 获取配置文件对象,获取job对象实例final Configuration conf = new Configuration();final Job job = Job.getInstance(conf, "MergeDriver");
//        2. 指定程序jar的本地路径job.setJarByClass(MergeDriver.class);
//        3. 指定Mapper/Reducer类job.setMapperClass(MergeMapper.class);
//        job.setReducerClass(MergeReducer.class);
//        4. 指定Mapper输出的kv数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);
//        5. 指定最终输出的kv数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);//设置使用自定义InputFormat读取数据job.setInputFormatClass(MergeInputFormat.class);FileInputFormat.setInputPaths(job, new Path("E:\\merge\\merge-out")); //指定读取数据的原始路径//指定输出使用的outputformatjob.setOutputFormatClass(SequenceFileOutputFormat.class);//尽可能降低数据的量,减少磁盘空间的占用,网络间通信时数据量小可以节省时间//针对Sequencefile的压缩SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);//压缩类型:record压缩SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.RECORD);
//        SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
//        7. 指定job输出结果路径FileOutputFormat.setOutputPath(job, new Path("E:\\merge\\merge-output")); //指定结果数据输出路径
//        8. 提交作业final boolean flag = job.waitForCompletion(true);//jvm退出:正常退出0,非0值则是错误退出System.exit(flag ? 0 : 1);}
}

二、编译打印输出
### --- 编译打印输出~~~     配置输入输出参数
~~~     编译打印
~~~     将多个小文件合并成一个文件

CC00047.hadoop——|HadoopMapReduce.V20|——|Hadoop.v20|MapReduce综合案例.v01|相关推荐

  1. CC00046.hadoop——|HadoopMapReduce.V19|——|Hadoop.v19|MapReduce数据压缩机制|

    一.shuffle阶段数据的压缩机制 ### --- Hadoop当中支持的额压缩算法~~~ 数据压缩有两大好处,节约磁盘空间,加速数据在网络和磁盘上的传输!! ~~~ 我们可以使用bin/hadoo ...

  2. CC00063.hadoop——|HadoopMapReduce.V34|——|Hadoop.v34|NamenodeFullGC-FullGC的影响|

    一.Namenode Full GC ~~~ [NamenodeFullGC-FullGC的影响] ~~~ [NamenodeFullGC-FullGC的日志分析] 二.JVM堆内存 ### --- ...

  3. CC00055.hadoop——|HadoopMapReduce.V27|——|Hadoop.v27|源码剖析|DataNode启动流程|

    一.[源码剖析之DataNode启动流程] :DataNode 启动流程 ### --- datanode的Main Class是DataNode,先找到DataNode.main()public c ...

  4. CC00050.hadoop——|HadoopMapReduce.V23|——|Hadoop.v23|MR算法扩展|MergeSort归并排序|

    一.[MR算法扩展之MergeSort归并排序][MR算法扩展之QuickSort快速排序]:Mergesort归并排序 二.合并 三.合并细节 ### --- 不断地将当前序列平均分割成 2个子序列 ...

  5. 【Hadoop】第三天 mapreduce的原理和编程

    文章目录 MapReduce概述 MR过程各个角色的作用 作业提交 作业初始化 任务分配 任务执行 状态更新 作业完成 错误处理 MapReduce 原理 Mapper Reducer Worker ...

  6. 【Hadoop离线基础总结】MapReduce增强(上)

    MapReduce增强 MapReduce的分区与reduceTask的数量 概述 MapReduce当中的分区:物以类聚,人以群分.相同key的数据,去往同一个reduce. ReduceTask的 ...

  7. Hadoop源代码分析(包mapreduce.lib.input)

    接下来我们按照MapReduce过程中数据流动的顺序,来分解org.apache.hadoop.mapreduce.lib.*的相关内容,并介绍对应的基类的功能.首先是input部分,它实现了MapR ...

  8. Hadoop 4、Hadoop MapReduce的工作原理

    一.MapReduce的概念 MapReduce是hadoop的核心组件之一,hadoop要分布式包括两部分,一是分布式文件系统hdfs,一部是分布式计算框就是mapreduce,两者缺一不可,也就是 ...

  9. Hadoop集群搭建及MapReduce应用

    一.Hadoop集群的搭建与配置 1.节点准备 集群规划: 主机名 IP 安装的软件 运行的进程 weekend 01 192.168.1.60 jdk.hadoop NameNode.DFSZKFa ...

  10. Hadoop学习笔记—4.初识MapReduce

    一.神马是高大上的MapReduce MapReduce是Google的一项重要技术,它首先是一个 编程模型 ,用以进行大数据量的计算.对于大 数据量的计算,通常采用的处理手法就是并行计算.但对许多开 ...

最新文章

  1. 第一次使用Android Studio时你应该知道的一切配置
  2. 刚发现博客园又遇到了问题
  3. Adnroid提高效率之资源文件改名
  4. 七、操作系统——动态分区分配算法(详解)
  5. sql 列求和_图解面试题:累计求和问题如何分析?
  6. boost知识点查阅
  7. TortoiseSVN的安装与使用
  8. [译] 你的站点如你所想的移动友好吗?
  9. 前端H5 使用百度统计进行埋点
  10. 计算机的表白隐藏功能,微信还有这个功能?隐藏代码还能表白!教你高级告白手段...
  11. 稳压二极管工作原理及参数详解
  12. openGauss长沙Meetup | 共建数据库可信开源社区
  13. 和差化积公式详细推导
  14. 贝塞尔插值曲线绘制软件设计
  15. 【zz】陈硕:当析构函数遇到多线程──C++ 中线程安全的对象回调
  16. 绘制鱼骨图,卡壳了,找到了别人的文章,先好好学习理论知识。
  17. 帝国cms7.2 linux伪静态,帝国CMS7.0IIS伪静态设置教程
  18. 《计算机网络 自顶向下方法》(第7版)答案(第二章)(二)
  19. 工业视觉_57:霍夫(Hough)直线识别,交点与夹角
  20. 混音师的混音之道|公开我学习混音的方法,真正的捷径|MZD Studios

热门文章

  1. 毕设文案 赠源码28265-django 电子档管理系统
  2. 漫山遍野的多边形元素,正在入侵每一份设计稿
  3. 读研投小论文感想与总结(一)
  4. 有什么方法可以把酷我音乐转换音频格式
  5. 02.MM主数据-供应商主数据
  6. 大学生毕业工作构想论文(原创)
  7. es建立索引后的排序返回结果_#研发解决方案介绍#基于ES的搜索+筛选+排序解决方案...
  8. 2020年3月全国计算机等级考试真题(C语言二级)
  9. 云服务器操作系统可以改吗,云服务器的操作系统可以改吗
  10. 【设计模式】设计模式入门知识