分区(分区实战案例)、Combiner、Shuffer

  • 1 分区
  • 2 根据部门号建立分区
  • 3 Combiner
  • 4 Shuffer

手动反爬虫,禁止转载: 原博地址 https://blog.csdn.net/lys_828/article/details/118990176(CSDN博主:Be_melting)

 知识梳理不易,请尊重劳动成果,文章仅发布在CSDN网站上,在其他网站看到该博文均属于未经作者授权的恶意爬取信息

1 分区

在MapReduce中,通过我们指定分区,会将同一个分区的数据发送到同一个Reduce当中进行处理。例如:为了数据的统计,可以把一批类似的数据发送到同一个Reduce当中,在同一个Reduce当中统计相同类型的数据,就可以实现类似的数据分区和统计等。其实就是相同类型的数据,有共性的数据,送到一起去处理。

MapReduce默认是只有一个分区(一个分区就是对应一个输出文件),在之前写过的程序中p1.jar-p9.jar程序的运行,最后生成的文件文件都是part-r-00000的形式,也就是默认只有一个输出文件

接下来通过例子来理解分区

  • 如果不进行索引或者分区,那么在orcle数据库中进行数据查询就需要进行全表扫描,采用索引或者分区可以提高查询的性能,不同点在于索引不会破坏原有的表,只是相当于创建了一个查询目录,而分区是会改变表(这里就先介绍分区,之后再介绍索引)
  • 分区创建会有相对应的判断条件,满足筛选条件的查询只会扫描对应分区中的内容,比如要查询的2500-2800之间的信息,就只在分区2中进行扫描
  • 导入、导出、备份数据时候可以针对分区来进行操作,而不需要针对一整张表,如果某个分区发生了损坏,我们可以只针对这一个分区进行恢复,其它的分区不受影响
  • 分区表的类型有五种(以orcle数据库为例):范围分区、列表分区、hash分区、范围-hash分区、列表-hash分区
  • hash分区,就是对数据通过hash运算,如果得到的结果一致,就将对应的数据放置在同一个分区中,比如下面将数据分为四份,就可以除以4取余数(一种hash运算的方式,也可以选用其它的运算方法)
  • 分区的要求是尽量把数据打散进行存放,避免形成热块。每一个分区可以视为一个Server,如果当前分区的数据量过大,那么服务器处理这部分数据就会占用很大一部分的时间,该服务区的压力就会非常大,性能就会受到影响
  • 打散的效果是取决于采用的hash计算的方法

2 根据部门号建立分区

MapReduce是根据什么标准进行分区的?这个问题在敲代码之前应该先弄清楚。答案就是:根据Map的输出建立分区<k2,v2>

接下来的案例就是以员工的部门号建立分区,还是使用之前的员工表emp.csv文件中的数据,其中最后一个字段就是员工部门,共有三个分类,分别为10、20、30

首先创建一个名为part的package,然后把实现序列化的Employee.java文件复制粘贴在part下,接着建立一下分区的规则,新建一个Java Class文件命名为MyPartitor,还是不用自己写底层代码,直接继承Partitioner父类,然后重写里面的方法

package part;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
//                                            k2 部门号  v2员工对象
public class MyPartitor extends Partitioner<IntWritable, Employee>{@Overridepublic int getPartition(IntWritable k2, Employee v2, int numTsk) {// 建立对应的分区//numTsk 表示的就是分区的个数//得到分区号int deptno = v2.getDeptno();if (deptno ==10) {//放入到一号分区return 1%numTsk;}else if (deptno ==20) {//放入到二号分区return 2%numTsk;}else {//放入到三号分区return 3%numTsk;}}
}

接着就要书写Map和Reduce来实现我们制定的分区的逻辑,先处理Map程序,在之前的序列化的Map程序的基础上进行(只需要修改的就是k2,代表着部门号)

package part;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//                                           k1     v1         k2 部门号            v2 员工对象
public class EmployeeMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {@Overrideprotected void map(LongWritable k1, Text v1, Context context)throws IOException, InterruptedException {//数据:7369,SMITH,CLERK,7902,1980/12/17,800,0,20String data = v1.toString();//分词String[] words = data.split(",");//创建员工对象Employee e = new Employee();//设置员工号e.setEmpno(Integer.parseInt(words[0]));//设置员工姓名e.setEname(words[1]);//职位e.setJob(words[2]);//老板e.setMgr(Integer.parseInt(words[3]));//入职日期e.setHiredate(words[4]);//薪水e.setSal(Integer.parseInt(words[5]));//奖金e.setComm(Integer.parseInt(words[6]));//部门号e.setDeptno(Integer.parseInt(words[7]));//输出k2 部门号    v2 员工对象context.write(new IntWritable(e.getDeptno()), e);   }
}

Map程序完成后,就是要进行Reduce程序设计,新建一个Java Class程序命名为EmployeeReducer,整个程序的代码如下

package part;import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;public class EmployeeReducer extends Reducer<IntWritable, Employee, IntWritable, Employee>{@Overrideprotected void reduce(IntWritable k3, Iterable<Employee> v3,Context context)throws IOException, InterruptedException {for (Employee e:v3) {context.write(k3, e);}}
}

然后就是差一个执行的主程序,还是将原来的运行主程序复制过来,修改一下其中部分的参数(一共修改了三行代码,添加了setReducerClass,然后在k2,v2之后添加了分区规则并指定了分区的个数)

package part;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class EmployeeMain {public static void main(String[] args) throws Exception {// (1)创建任务Job,并且制定任务的入口Job job = Job.getInstance(new Configuration());job.setJarByClass(EmployeeMain.class); //指定为当前程序//(2)指定任务的Map,Map的输出类型job.setMapperClass(EmployeeMapper.class); job.setMapOutputKeyClass(IntWritable.class);//k2 员工号job.setMapOutputValueClass(Employee.class);//v2  Employee对象//指定分区规则job.setPartitionerClass(MyPartitor.class);//分区的个数job.setNumReduceTasks(3);//(3)指定任务的Reduce,Reduce的输出类型job.setReducerClass(EmployeeReducer.class);job.setOutputKeyClass(IntWritable.class);//k4job.setOutputValueClass(Employee.class);//v4//(4)指定任务的输入和输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//(5)执行任务job.waitForCompletion(true); //表示执行的时候打印日志}
}

至此框架中的内容全部整理完毕,输出为p10.jar文件后上传到hadoop上运行(注意查看其中Reduce的操作,以前就是直接100%,这次变成了三步,而且是均分为三步)

核实一下生成的文件信息内容,最终三个文件和最初设置的三个分区对应上了,查看每个文件中的信息也符合分区存放数据的要求,至此关于MapReduce的分区的知识点就梳理完毕了

3 Combiner

集群上的可用带宽限制了MapReduce作业的数量,因此尽量避免map和reduce任务之间的数据传输是有利的。Hadoop允许用户针对map任务的输出指定一个combiner (就像.mapper,reducer) 。 combiner函数的输出作为reduce函数的输入。由于combiner术语优化方案,所以Hadoop无法确定对map任务输出记录调用多少次combiner (如果需要) 。换言之,不管调用多次combiner, reducer的输出结果都是一样的

可能上面说的有点抽象,下面根据图示进行理解

  • Combiner是一种特殊的Reducer,需要继承Reducer的父类
  • Combiner是运行在Map端。先对Map输出的结果,进行本地的合合

有Combiner和没有Combiner两种方式进行对比,可以发现没有Combiner情况下,Map的输出结果会直接通过网络传送给Reduce端,而有了Combiner后,显示在同服务器的本地上进行结果的初步合并,然后在把处理的结果送至Reduce端,这样减少了数据的传输,也就降低了功耗,提高了性能

之前的WordConut程序进行改造,注意Combiner的本质就是对于数据进行求和(就是在中间的部分添加一行代码,然后再打包为p11.jar)

将打包的jar包上传至hadoop上运行

核实一下输出的结果,应该是和原来的结果保持一致

但是还有注意地方:

  • (1)有些情况下Combiner是不可以使用的,前面提到的Combiner本质就是进行求和,比如在求平均值的情况下,这个功能就不能使用了
  • (2)不管有没有Combiner不能改变最后运行结果
  • (3)不管有没有Combiner,都不应该改变原有的处理逻辑(案例:倒排索引)

4 Shuffer

Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方。要想理解MapReduce,Shuffle是必须要了解的。

什么是Shuffer:

Shuffle的本意是洗牌、混洗的意思,把一组有规则的数据尽量打乱成无规则的数据。而在MapReduce中, shuffle更像是洗牌的逆过程,指的是将map端的无规则输出按指定的规则“打乱”成具有一定规则的数据,以便reduce端接收处理。其在MapReduce中所处的工作阶段是map输出后到reduce接收前,具体可以分为map端和reduce端前后两个部分。在shuffle之前,也就是在map阶段, MapReduce会对要处理的数据进行分片(split)操作,为每一个分片分配一个MapTask任务。接下来map()函数会对每一个分片中的每一行数据进行处理得到键值对(key,value) ,其中key为偏移量, value为一行的内容。此时得到的键值对又叫做“中间结果” 。此后便进入shuffle阶段,由此可以看出shuffle阶段的作用是处理“中间结果”

整个运行过程按照图解进行分析如下:

  • (1)数据导入会按照128M进行切片
  • (2)进入Map阶段,Map的输出并不是直接送到Reduce中,而是先将结果送到本地的缓冲区(内存为100M),如果达到了80%,就进行溢写(将内存中的数据写成一个一个的文件)
  • (3)然后再进行输出,就可以进行分区或者排序的操作
  • (4)将80M的多个小文件进行合并(由于每次超过80M就进行溢写,所以会存在很多小文件)
  • (5)Combiner对本地输出进行聚合
  • (6)Map端至此才结束,进入到Reduce端,最后数据处理完毕后输出到HDFS中

    以下是官方给的示例图

【MapReduce】分区(分区实战案例)、Combiner、Shuffer相关推荐

  1. Partition分区的使用案例

    Partition分区的使用案例: 将统计结果按照条件输出到不同文件中(分区) 文章目录 1)需求 2)需求分析 3)编程实现 1.创建Partitioner类 2.创建Bean类 3.创建Mappe ...

  2. Impala内存优化实战案例

    Impala内存优化实战案例 李珂 畅游DT时代 2016-03-25 文章来源:中国联通网研院网优网管部--IT技术研究团队 作者:李珂 一. 引言 Hadoop生态中的NoSQL数据分析三剑客Hi ...

  3. Linux 运维自动化之Cobbler实战案例

    大纲 一.前言 二.Cobbler 工作原理详解 三.Cobbler 常用命令汇总 四.Cobbler 各种目录说明 五.自定义Kickstart文件详解 六.Cobbler 实战案例安装CentOS ...

  4. 服务器制作raid,实战案例——服务器制作raid磁盘阵列并管理

    3.3 实战案例--服务器制作raid磁盘阵列并管理 3.3.1 案例目标 (1)掌握RAID的构建,挂载和使用. (2)掌握RAID的基础运维. Raid磁盘冗余 RAID LEVEL1:N块相同空 ...

  5. 2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD

    目录 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 注意: 代码实现 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 对上述案例的结果 ...

  6. 【大数据AI人工智能】大数据处理实战案例汇总

    大数据处理实战案例汇总 本文总结了一系列大数据处理相关的实战案例,让你一目了然地了解大数据处理技术. 文章目录 大数据处理实战案例汇总 1. 谷歌搜索引擎 2. Facebook实时动态 3. Net ...

  7. 2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数

    目录 SparkStreaming实战案例四 窗口函数 需求 代码实现 SparkStreaming实战案例四 窗口函数 需求 使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据! ...

  8. 2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey

    目录 SparkStreaming实战案例二 UpdateStateByKey 需求 1.updateStateByKey 2.mapWithState 代码实现 SparkStreaming实战案例 ...

  9. 2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount

    目录 SparkStreaming实战案例一 WordCount 需求 准备工作 代码实现 第一种方式:构建SparkConf对象 第二种方式:构建SparkContext对象 完整代码如下所示: 应 ...

最新文章

  1. 手机支架3d打印模型_3D打印工艺模型制造
  2. apu和atom处理器两者的定位
  3. 潘越云《面朝海子》:诗里的人都会终成眷属
  4. 在实际使用中 mysql所支持的触发器有_计算机二级考试MySQL数据库每日一练 12月26日...
  5. 深度学习-线性回归基础-02
  6. 数据分析与挖掘实战-中医证型关联规则挖掘
  7. IIS企业案例系列之七:发布Exchange OWA之SSL桥接模式
  8. 解决安卓字体偏移:页面整体缩放
  9. 场景化封装,一站式使用,普惠AI集成 ——阿里云发布智能媒体管理产品
  10. Oracle中insert into select和select into的区别
  11. Linux进程列表巧用,Linux下的进程分析–PS
  12. 解决.bashrc文件每次打开终端都需要source的问题
  13. EPPlus 读写 Excel 资料收集
  14. linux部署was找不到8879端口,WAS8.0与IHS集群安装与配置指导手册
  15. 在三维坐标中给出三个点,求三个点所在平面的圆心和圆心坐标
  16. mc冒险者传说java_我的世界冒险者传说1.9
  17. 为什么快捷指令无法将媒体转换为文本_刘星文:快捷指令这个神器,让你的 iPhone 好用到飞起...
  18. 两招快速教会你们PDF怎么转图片jpg格式
  19. vijos 1540 月亮之眼 并查集
  20. 1月16日服务器例行维护更新公告,1月16日9—11点例行更新维护公告

热门文章

  1. 智慧校园:02 Slide侧边栏开发
  2. IApp调用教程【IApp资源共享!】
  3. 2017年4月观影分享
  4. 威盾php加密家解密,威盾解密
  5. 全民k歌爬虫 by--Python
  6. 【解决方案】汽车修理厂如何实现远程监控?EasyCVR视频结构化平台助力智能维修安防
  7. 速看!你要的车间管理大屏方案来了~
  8. Beyond Compare4提示过期解决办法
  9. LD 文件:规则详解
  10. 车载c语言,车载考试系统