CC00047.hadoop——|HadoopMapReduce.V20|——|Hadoop.v20|MapReduce综合案例.v01|
### --- 需求~~~ 现在有一些订单的评论数据,需求,将订单按照好评与差评区分开来,
~~~ 将数据输出到不同的文件目录下,数据内容如下,其中数据第九个字段表示好评,
~~~ 中评,差评。0:好评,1:中评,2:差评。
~~~ 现需要根据好评,中评,差评把数据分类并输出到不同的目录中,并且要求按照时间顺序降序排列。
~~~ # 备注:现在有大量类似上面的小文件!
300 东西很不错,物流也很快 \N 1 106 131******33 0 2019-02-06 19:10:13
301 还行,洗完有点干,不知道怎么回事 \N 1 106 136******44 0 2019-03-2214:16:41
302 还可以吧,保质期短,感觉貌似更天然些 \N 1 106 134******34 0 2019-04-1013:40:06
303 还可以吧,保质期短,感觉貌似更天然些 \N 1 105 134******33 0 2019-01-1514:40:21
304 还没用,,不知道效果怎么样 \N 1 105 137******66 0 2019-02-28 18:55:43
305 刚收到,还没用,用后再追评!不过,听朋友说好用,才买的! \N 1 105 138******600 2019-03-13 19:10:09
306 一般,感觉用着不是很好,可能我头发太干了 \N 1 105 132******44 0 2019-04-09 10:35:49
307 非常好用,之前买了10支,这次又买了10支,不错,会继续支持! \N 1 103 131******330 2019-01-15 13:10:46
308 喜欢茶树油的 \N 1 103 135******33 0 2019-02-08 14:35:09
309 好像比其他的强一些,继续使用中 \N 1 103 133******99 0 2019-03-1419:55:36
310 感觉洗后头发很干净,头皮有一定改善。 \N 1 103 138******44 0 2019-04-0922:55:59
311 从出生到现在一直都是惠氏 现在宝宝两周半了 \N 1 157 那***情 0 2017-12-01 06:05:30
312 口感不错,孩子很喜欢。推荐。 \N 1 157 w***4 0 2017-12-12 08:35:06
313 价格优惠,日期新鲜,包装完好!发货速度快,非常喜欢!还有赠品! \N 1 157 j***00 2019-01-09 22:55:41
### --- 分析~~~ 自定义InputFormat合并小文件
~~~ 自定义分区根据评论等级把数据分区
~~~ 自定义OutputFormat把数据输出到多个目录
### --- 合并小文件
~~~ 创建项目:comment.step1
~~~ Mapperpackage com.yanqi.mr.comment.step1;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;//text:代表的是一个文件的path+名称,BytesWritable:一个文件的内容
public class MergeMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write(key, value);}
}
### --- 自定义InputFormat
### --- MergeInputFormatpackage com.yanqi.mr.comment.step1;
//自定义inputformat读取多个小文件合并为一个SequenceFile文件//SequenceFile文件中以kv形式存储文件,key--》文件路径+文件名称,value-->文件的整个内容import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;
import java.util.List;//TextInputFormat中泛型是LongWritable:文本的偏移量, Text:一行文本内容;指明当前inputformat的输出数据类型
//自定义inputformat:key-->文件路径+名称,value-->整个文件内容
public class MergeInputFormat extends FileInputFormat<Text, BytesWritable> {//重写是否可切分@Overrideprotected boolean isSplitable(JobContext context, Path filename) {//对于当前需求,不需要把文件切分,保证一个切片就是一个文件return false;}@Overridepublic List<InputSplit> getSplits(JobContext job) throws IOException {//分片逻辑依然是原始的分片逻辑,一个文件一个maptask,jvm重用优化,uber模式,小文件任务优化?return super.getSplits(job);}//recordReader就是用来读取数据的对象@Overridepublic RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {MergeRecordReader recordReader = new MergeRecordReader();//调用recordReader的初始化方法recordReader.initialize(split, context);return recordReader;}
}
### --- MergeRecordReaderpackage com.yanqi.mr.comment.step1;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;//负责读取数据,一次读取整个文件内容,封装成kv输出
public class MergeRecordReader extends RecordReader<Text, BytesWritable> {private FileSplit split;//hadoop配置文件对象private Configuration conf;//定义key,value的成员变量private Text key = new Text();private BytesWritable value = new BytesWritable();//初始化方法,把切片以及上下文提升为全局@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {this.split = (FileSplit) split;conf = context.getConfiguration();}private Boolean flag = true;//用来读取数据的方法@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {//对于当前split来说只需要读取一次即可,因为一次就把整个文件全部读取了。if (flag) {//准备一个数组存放读取到的数据,数据大小是多少?byte[] content = new byte[(int) split.getLength()];final Path path = split.getPath();//获取切片的path信息final FileSystem fs = path.getFileSystem(conf);//获取到文件系统对象final FSDataInputStream fis = fs.open(path); //获取到输入流IOUtils.readFully(fis, content, 0, content.length); //读取数据并把数据放入byte[]//封装key和valuekey.set(path.toString());value.set(content, 0, content.length);IOUtils.closeStream(fis);//把再次读取的开关置为falseflag = false;return true;}return false;}//获取到key@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {return key;}//获取到value@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return value;}//获取进度@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}//关闭资源@Overridepublic void close() throws IOException {}
}
### --- Reducerpackage com.yanqi.mr.comment.step1;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MergeReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {@Overrideprotected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {//输出value值(文件内容),只获取其中第一个即可(只有一个)context.write(key, values.iterator().next());}
}
### --- Driverpackage com.yanqi.mr.comment.step1;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import java.io.IOException;public class MergeDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 1. 获取配置文件对象,获取job对象实例final Configuration conf = new Configuration();final Job job = Job.getInstance(conf, "MergeDriver");
// 2. 指定程序jar的本地路径job.setJarByClass(MergeDriver.class);
// 3. 指定Mapper/Reducer类job.setMapperClass(MergeMapper.class);
// job.setReducerClass(MergeReducer.class);
// 4. 指定Mapper输出的kv数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);
// 5. 指定最终输出的kv数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);//设置使用自定义InputFormat读取数据job.setInputFormatClass(MergeInputFormat.class);FileInputFormat.setInputPaths(job, new Path("E:\\merge\\merge-out")); //指定读取数据的原始路径//指定输出使用的outputformatjob.setOutputFormatClass(SequenceFileOutputFormat.class);//尽可能降低数据的量,减少磁盘空间的占用,网络间通信时数据量小可以节省时间//针对Sequencefile的压缩SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);//压缩类型:record压缩SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.RECORD);
// SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
// 7. 指定job输出结果路径FileOutputFormat.setOutputPath(job, new Path("E:\\merge\\merge-output")); //指定结果数据输出路径
// 8. 提交作业final boolean flag = job.waitForCompletion(true);//jvm退出:正常退出0,非0值则是错误退出System.exit(flag ? 0 : 1);}
}
### --- 编译打印输出~~~ 配置输入输出参数
~~~ 编译打印
~~~ 将多个小文件合并成一个文件
![](/assets/blank.gif)
![](/assets/blank.gif)
CC00047.hadoop——|HadoopMapReduce.V20|——|Hadoop.v20|MapReduce综合案例.v01|相关推荐
- CC00046.hadoop——|HadoopMapReduce.V19|——|Hadoop.v19|MapReduce数据压缩机制|
一.shuffle阶段数据的压缩机制 ### --- Hadoop当中支持的额压缩算法~~~ 数据压缩有两大好处,节约磁盘空间,加速数据在网络和磁盘上的传输!! ~~~ 我们可以使用bin/hadoo ...
- CC00063.hadoop——|HadoopMapReduce.V34|——|Hadoop.v34|NamenodeFullGC-FullGC的影响|
一.Namenode Full GC ~~~ [NamenodeFullGC-FullGC的影响] ~~~ [NamenodeFullGC-FullGC的日志分析] 二.JVM堆内存 ### --- ...
- CC00055.hadoop——|HadoopMapReduce.V27|——|Hadoop.v27|源码剖析|DataNode启动流程|
一.[源码剖析之DataNode启动流程] :DataNode 启动流程 ### --- datanode的Main Class是DataNode,先找到DataNode.main()public c ...
- CC00050.hadoop——|HadoopMapReduce.V23|——|Hadoop.v23|MR算法扩展|MergeSort归并排序|
一.[MR算法扩展之MergeSort归并排序][MR算法扩展之QuickSort快速排序]:Mergesort归并排序 二.合并 三.合并细节 ### --- 不断地将当前序列平均分割成 2个子序列 ...
- 【Hadoop】第三天 mapreduce的原理和编程
文章目录 MapReduce概述 MR过程各个角色的作用 作业提交 作业初始化 任务分配 任务执行 状态更新 作业完成 错误处理 MapReduce 原理 Mapper Reducer Worker ...
- 【Hadoop离线基础总结】MapReduce增强(上)
MapReduce增强 MapReduce的分区与reduceTask的数量 概述 MapReduce当中的分区:物以类聚,人以群分.相同key的数据,去往同一个reduce. ReduceTask的 ...
- Hadoop源代码分析(包mapreduce.lib.input)
接下来我们按照MapReduce过程中数据流动的顺序,来分解org.apache.hadoop.mapreduce.lib.*的相关内容,并介绍对应的基类的功能.首先是input部分,它实现了MapR ...
- Hadoop 4、Hadoop MapReduce的工作原理
一.MapReduce的概念 MapReduce是hadoop的核心组件之一,hadoop要分布式包括两部分,一是分布式文件系统hdfs,一部是分布式计算框就是mapreduce,两者缺一不可,也就是 ...
- Hadoop集群搭建及MapReduce应用
一.Hadoop集群的搭建与配置 1.节点准备 集群规划: 主机名 IP 安装的软件 运行的进程 weekend 01 192.168.1.60 jdk.hadoop NameNode.DFSZKFa ...
- Hadoop学习笔记—4.初识MapReduce
一.神马是高大上的MapReduce MapReduce是Google的一项重要技术,它首先是一个 编程模型 ,用以进行大数据量的计算.对于大 数据量的计算,通常采用的处理手法就是并行计算.但对许多开 ...
最新文章
- 第一次使用Android Studio时你应该知道的一切配置
- 刚发现博客园又遇到了问题
- Adnroid提高效率之资源文件改名
- 七、操作系统——动态分区分配算法(详解)
- sql 列求和_图解面试题:累计求和问题如何分析?
- boost知识点查阅
- TortoiseSVN的安装与使用
- [译] 你的站点如你所想的移动友好吗?
- 前端H5 使用百度统计进行埋点
- 计算机的表白隐藏功能,微信还有这个功能?隐藏代码还能表白!教你高级告白手段...
- 稳压二极管工作原理及参数详解
- openGauss长沙Meetup | 共建数据库可信开源社区
- 和差化积公式详细推导
- 贝塞尔插值曲线绘制软件设计
- 【zz】陈硕:当析构函数遇到多线程──C++ 中线程安全的对象回调
- 绘制鱼骨图,卡壳了,找到了别人的文章,先好好学习理论知识。
- 帝国cms7.2 linux伪静态,帝国CMS7.0IIS伪静态设置教程
- 《计算机网络 自顶向下方法》(第7版)答案(第二章)(二)
- 工业视觉_57:霍夫(Hough)直线识别,交点与夹角
- 混音师的混音之道|公开我学习混音的方法,真正的捷径|MZD Studios