一 MapTask个数的决定因素

首先,我们需要明确以下几点:

1Map Task个数不能通过配置文件指定

2Map Task个数是在进行文件的切分时动态计算的

3FileInputFormat负责切分文件进行split操作

1.1分析源码:

intmaps = writeSplits(job, submitJobDir);

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,

Path jobSubmitDir) throws IOException,

InterruptedException,ClassNotFoundException {

JobConf jConf = (JobConf)job.getConfiguration();

int maps;

//判断是否采用新的API,现在我们应该都是新的

if (jConf.getUseNewMapper()) {

maps = writeNewSplits(job, jobSubmitDir);

} else {

maps = writeOldSplits(jConf, jobSubmitDir);

}

return maps;

}

private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,

InterruptedException,ClassNotFoundException {

Configuration conf = job.getConfiguration();

//创建FileInputFormat

InputFormat<?, ?> input =

ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

//调用FileInputFormat#getSplits

List<InputSplit> splits = input.getSplits(job);

T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

//对split数组元素进行排序,最大的是第一个

Arrays.sort(array, new SplitComparator());

//创建Split文件,这些个文件会存在提交路径的临时目录

JobSplitWriter.createSplitFiles(jobSubmitDir, conf,

jobSubmitDir.getFileSystem(conf), array);

return array.length;

}

public List<InputSplit> getSplits(JobContext job) throws IOException {

StopWatch sw = new StopWatch().start();

//根据mapreduce.input.fileinputformat.split.minsize配置和1取最大的

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

//根据mapreduce.input.fileinputformat.split.maxsize取最大的

long maxSize = getMaxSplitSize(job);

// generate splits

List<InputSplit> splits = new ArrayList<InputSplit>();

List<FileStatus> files = listStatus(job);

for (FileStatus file: files) {

//获取文件路径

Path path = file.getPath();

//获取文件大小

long length = file.getLen();

if (length != 0) {

BlockLocation[] blkLocations;

//从本地获取文件数据块位置

if (file instanceof LocatedFileStatus) {

blkLocations = ((LocatedFileStatus) file).getBlockLocations();

} else {//非本地文件,远程调用获取文件数据块信息

FileSystem fs = path.getFileSystem(job.getConfiguration());

blkLocations = fs.getFileBlockLocations(file, 0, length);

}

if (isSplitable(job, path)) {

//获取文件数据块大小,默认128M

long blockSize = file.getBlockSize();

//计算InputSplit大小

long splitSize = computeSplitSize(blockSize, minSize, maxSize);

//将bytesRemaining(剩余未分片字节数)设置为整个文件的长度

long bytesRemaining = length;

/*

* 若剩余值bytesRemaining > 1.1 * splitSize,则继续对文件进行逻辑切分

* 若小于这个值,则作为一个InputSplit

*/

while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

//计算文件的数据块的索引,只是计算InputSplit的起始位置是否位于某一块中

int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

//然后将计算的索引位置作为参数计算切分的split文件,然后添加到split数组

splits.add(makeSplit(path, length-bytesRemaining, splitSize,

blkLocations[blkIndex].getHosts(),

blkLocations[blkIndex].getCachedHosts()));

/*

* 剩余字节数-splitSize,相当于下一次从这儿开始计算

* 我们也可以推断出起始位置为0,splitSize,2*splitSize,3*splitSize 等

*/

bytesRemaining -= splitSize;

}

//如果block中剩下的一小段数据量小于splitSize,还是认为它是独立的分片

if (bytesRemaining != 0) {

int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,

blkLocations[blkIndex].getHosts(),

blkLocations[blkIndex].getCachedHosts()));

}

} else { // not splitable

splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),

blkLocations[0].getCachedHosts()));

}

} else {

//Create empty hosts array for zero lengthfiles

splits.add(makeSplit(path, 0, length, new String[0]));

}

}

// Save the number of input files for metrics/loadgen

//设置mapreduce.input.fileinputformat.numinputfile值为输入文件数量

job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());

sw.stop();

if (LOG.isDebugEnabled()) {

LOG.debug("Total# of splits generated by getSplits: " + splits.size()

+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));

}

return splits;

}

public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,

Configuration conf, FileSystem fs, T[] splits)

throws IOException, InterruptedException {

/*

* 创建切片文件,并获取FSDataOutputStream对应路径jobSubmitDir

* 届时就会生成${jobSubmitDir}/job.split文件

* jobSubmitDir:参数yarn.app.mapreduce.am.staging-dir

* 指定的路径

*/

FSDataOutputStream out = createFile(fs,

JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);

//将切片数据写入切片文件,并得到切片元数据信息数组

SplitMetaInfo[] info = writeNewSplits(conf, splits, out);

out.close();

//将切片元数据信息写入切片元数据信息文件

writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),

new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,

info);

}

private static FSDataOutputStream createFile(FileSystem fs, Path splitFile,

Configuration job)  throws IOException {

FSDataOutputStream out = FileSystem.create(fs, splitFile,

new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

//获取副本数,默认是10

int replication = job.getInt(Job.SUBMIT_REPLICATION, 10);

fs.setReplication(splitFile, (short)replication);

//写入切片头信息

writeSplitHeader(out);

return out;

}

#遍历输入的文件

#获取文件数据块的位置以及文件数据块的大小(默认128m)

#计算分片的尺寸大小splitSize

#对文件数据块进行分片

#创建切片文件,写入头信息,文件位置位于提交job的路径

#将分片信息写入分片文件,并将得到的切片元数据信息写入切片元数据信息文件

1.2Map任务的决定因素

我们知道,map的个数是intmaps = writeSplits(job, submitJobDir);

这里产生的,也就是取决于切片数量。

那么切片数量又是由什么决定的呢?

>如果splitSize== blockSize(128M),那么只有一个切片

也就是一个Map 任务

>如果minSize超过blockSize,那么根据计算splitSize算法,会取128M和minSize中最大的,所以会减少分片数量,也就是会减少MapTask数量

>如果maxSize< blockSize,那么会选择之间比较小的然后跟minSize比较取较大者,那么这样这会增加分片数量,从而增加Map Task

总结:决定因素

# mapreduce.input.fileinputformat.split.minsize

# mapreduce.input.fileinputformat.split.maxsize

# blockSize

二 ReduceTask的决定因素

reduce在运行时往往需要从相关map端复制数据到reduce节点来处理,因此相比于map任务。reduce节点资源是相对比较缺少的,同时相对运行较慢,正确的reduce任务的个数应该是0.95或者1.75 *(节点数* mapred.tasktracker.tasks.maximum参数值)。如果任务数是节点个数的0.95倍,那么所有的reduce任务能够在 map任务的输出传输结束后同时开始运行。如果任务数是节点个数的1.75倍,那么高速的节点会在完成他们第一批reduce任务计算之后开始计算第二批 reduce任务,这样的情况更有利于负载均衡。同时需要注意增加reduce的数量虽然会增加系统的资源开销,但是可以改善负载匀衡,降低任务失败带来的负面影响。同样,Reduce任务也能够与 map任务一样,通过设定JobConf的conf.setNumReduceTasks(intnum)方法来增加任务个数。

map任务和reduce任务个数如何计算相关推荐

  1. 彻底明白Hadoop map和reduce的个数决定因素

    Hadoop map和reduce的个数设置,困扰了很多学习Hadoop的成员,为什么设置了配置参数就是不生效那?Hadoop Map和Reduce个数,到底跟什么有关系.首先他的参数很多,而且可能随 ...

  2. 初学者python笔记(map()函数、reduce()函数、filter()函数、匿名函数)

    文章目录 一.匿名函数 二.map()函数 三.reduce()函数 四.filter()函数 五.三大函数总结 本篇文章内容有Python中的匿名函数和map()函数.reduce()函数.filt ...

  3. hive如何确定map数量和reduce数量?

    因为Hive底层就是MR,所以问题实际是MR如何确定map数量和reduce数量. map数量 map数量 逻辑如下 map数量=split数量 split数量=文件大小/split size spl ...

  4. Python中的map()函数和reduce()函数的用法

    Python中的map()函数和reduce()函数的用法 这篇文章主要介绍了Python中的map()函数和reduce()函数的用法,代码基于Python2.x版本,需要的朋友可以参考下  

  5. hadoop 分片与分块,map task和reduce task的理解

    分块:Block HDFS存储系统中,引入了文件系统的分块概念(block),块是存储的最小单位,HDFS定义其大小为64MB.与单磁盘文件系统相似,存储在 HDFS上的文件均存储为多个块,不同的是, ...

  6. python中的map,feilter,和reduce函数

    python中的map,feilter,和reduce函数 map() map()的原型是map(function, iterable, -) 参数 function: 传的是一个函数名,可以是pyt ...

  7. Python Map, Filter and Reduce

    所属网站分类: python基础 > 函数 作者:慧雅 原文链接: http://www.pythonheidong.com/blog/article/21/ 来源:python黑洞网 www. ...

  8. CNN中feature map、卷积核、卷积核个数、filter、channel的概念解释,以及CNN 学习过程中卷积核更新的理解

    feature map.卷积核.卷积核个数.filter.channel的概念解释 feather map的理解 在cnn的每个卷积层,数据都是以三维形式存在的.你可以把它看成许多个二维图片叠在一起( ...

  9. Hive 设置map 和 reduce 的个数

    一.    控制hive任务中的map数: 1.    通常情况下,作业会通过input的目录产生一个或者多个map任务.  主要的决定因素有: input的文件总个数,input的文件大小,集群设置 ...

最新文章

  1. Sumblime Text 2 常用插件以及安装方法
  2. html简单网页代码表白_表白网页代码,不会代码也可以操作,告别单身
  3. 防止QQ密码被盗的五个绝招
  4. numpy——ravel()和flatten()
  5. 【空间数据库】ArcSDE 10.7+SQLEXPRESS+ArcServer 10.7.ecp企业级数据库环境搭建
  6. Datax-web 使用Python3 执行脚本
  7. Docker下redis与springboot
  8. LINQ to SQL的不足
  9. 12月19日绝地求生服务器维护公告,绝地求生12月19日更新到几点 绝地求生正式服更新维护公告...
  10. GitLab CI/CD实践
  11. Java代码动态分析JProfiler 13 for Mac
  12. JavaScript 闭包理解
  13. 常用的台式计算机,台式电脑常见简单故障排除
  14. 加权平均数的例子_EXCEL 加权平均数的计算
  15. [BUUCTF][Zer0pts2020]Can you guess it?
  16. 【MySQL面试】MyISAM和InnoDB的不同
  17. 理解 rb_tree
  18. 转战 Typora Mackdown 文档编辑器
  19. 4.1 使用旋转和镜像工具制作图标 [Illustrator CC教程]
  20. winform 窗体无法显示报错“未声明或从未赋值“解决办法

热门文章

  1. java 多线程取一条记录_java多线程从队列中取出数据执行
  2. php框架大全图解_PHP框架汇总 - 鱼煎的个人空间 - OSCHINA - 中文开源技术交流社区...
  3. redis 备份导出rdb_Redis持久化知识点—RDB+AOF ,你了解多少
  4. 当年叱咤风云的框架Struts2,你可知Struts2内功如何修炼之体系结构
  5. PostgreSQL修改pgsql提示符
  6. 搭建CentOS7.6容器镜像
  7. 处理多维特征的输出(糖尿病数据)
  8. java初反射_java中的反射机制
  9. 调用软键盘_Android 支持拖动、缩放的自定义软键盘
  10. delphi2007很卡_提升Delphi编程效率必须使用的快捷键(Delphi2007版本)