如何编写Hadoop调度器
1. 编写目的
在Hadoop中,调度器是一个可插拔的模块,用户可以根据自己的实际应用要求设计调度器,然后在配置文件中指定相应的调度器,这样,当Hadoop集群启动时,便会加载该调度器。当前Hadoop自带了几种调度器,分别是FIFO(默认调度器),Capacity Scheduler和FairScheduler,通常境况下,这些调度器很难满足公司复杂的应用需求,因而往往需要开发自己的调度器。本文介绍了Hadoop调度器的基本编写方法。
2. Hadoop调度框架
Hadoop的调度器是在JobTracker中加载和调用的,用户可以在配置文件mapred-site.xml中的mapred.jobtracker.taskScheduler属性中指定调度器。本节分析了Hadoop调度器的调度框架,实际上分析了两个重要类:TaskScheduler和JobTracker的关系。
(1) TaskScheduler
如果用户要编写自己的调度器,需要继承抽象类TaskScheduler,该类的接口如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
abstract class TaskScheduler implements Configurable {
protected Configuration conf; //配置文件
protected TaskTrackerManager taskTrackerManager; //一般会设为JobTracker
public Configuration getConf() {
return conf;
}
public void setConf(Configuration conf) {
this .conf = conf;
}
public synchronized void setTaskTrackerManager(
TaskTrackerManager taskTrackerManager) {
this .taskTrackerManager = taskTrackerManager;
}
public void start() throws IOException { //初始化函数,如加载配置文件等
// do nothing
}
public void terminate() throws IOException { //结束函数
// do nothing
}
//最重要的函数,为该taskTracker分配合适的task
public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker)
throws IOException;
//根据队列名字获job列表
public abstract Collection<JobInProgress> getJobs(String queueName);
}
|
(2) JobTracker
JobTracker是Hadoop最核心的组件,它监控整个集群中的作业运行情况并对资源进行管理和调度。
每个TaskTracker每个3s(默认值,可配置)通过heartbeat向JobTracker汇报自己管理的机器的一些基本信息,包括内存使用量,内存剩余量,正在运行的task,空闲的slot数目等,一旦JobTracker发现该TaskTracker出现了空闲的slot,便会调用调度器中的AssignTasks方法为该TaskTracker分配task。
下面分析JobTracker调用TaskScheduler的具体流程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
……
private final TaskScheduler taskScheduler; //声明调度器对象
……
public static JobTracker startTracker(JobConf conf, String identifier) {
…….
result = new JobTracker(conf, identifier);
result.taskScheduler.setTaskTrackerManager(result); //设置调度器的manager
……
}
//创建调度器
JobTracker(JobConf conf, String identifier) {
……
// Create the scheduler
Class<? extends TaskScheduler> schedulerClass
= conf.getClass( "mapred.jobtracker.taskScheduler" ,
JobQueueTaskScheduler. class , TaskScheduler. class );
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
…..
}
//run forever
public void offerService() {
……
taskScheduler.start(); //启动调度器
……
}
。。。。。
HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean restarted,
boolean initialContact,
boolean acceptNewTasks,
short responseId) {
…….
// Check for new tasks to be executed on the tasktracker
if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
……
//使用调度器,为该taskTracker分配作业
tasks = taskScheduler.assignTasks(taskTrackerStatus);
……
}
}
|
从上面的分析可以知道,Scheduler和JobTracker之间会相互包含(实际上是组合模式),Scheduler中要包含JobTracker(实际上就是TaskTrackerManager)对象,以便获取整个Hadoop集群的一些信息,如slot总数,QueueManager对象,添加JobInProgressListener以便增加或删除job时,通知Scheduler;JobTracker中要包含Scheduler对象,以便可以对每个TaskTracker分配task。
3. 编写Hadoop调度器
假设我们要编写一个新的调度器,为MyHadoopScheduler,需要进行以下工作:
(1) 用户需要自己实现的类
@ MyHadoopSchedulerConf:配置文件管理类,读取你自己的配置文件,并保存到合适的数据结构中,一般而言,这个类应该支持动态加载配置文件。
@ MyHadoopSchedulerListener:编写自己的JobInProgressListener,并调用JobTracker的addJobInProgressListener(),将之加到系统的Listener队列中,以便系统中添加或删除job后,JobTracker可立刻告诉调度器。
@ MyHadoopScheduler:调度器的核心实现算法
(2) 用户要用到的系统类
@ JobTracker:JobTracker在startTracker函数中,会将MyHadoopScheduler的taskTrackerManager赋值为JobTracker对象,这样,在MyHadoopScheduler中,可调用Jobracker中的所有public方法和成员变量,常用的有:
$ getClusterStatus():获取集群的状态,如tasktracker列表,map slot总数,reduce slot总数,当前正在运行的map/reduce task总数等
$ getQueueManager():如果MyHadoopScheduler支持多队列,那么需要使用该方法获取QueueManager对象,通过该对象,会用可以获取系统的所有队列名称,每个队列的ACL(Access Control List),具体参考:http://hadoop.apache.org/common/docs/current/service_level_auth.html
$ killJob:可以调用该函数杀死某个job
$ killTask:如果调度器支持资源抢占,可调用该函数 杀死某个task以便进行资源抢占。
@ JobInprogress:用户向Hadoop中提交一个job后,Hadoop会为该job创建一个叫JobInProgress的对象,该对象中包含了job相关的基本信息,且它会伴随某个job的一生(与job共存亡)。该对象中包含的job信息有:该job包含的所有task的信息(如:正在运行的task列表,已经完成的task列表,尚未运行的task列表等),作业的优先级,作业的提交时间,开始运行时间,运行结束时间等信息。
在JobInprogress的task列表中,每个task以对象TaskInProgress的形式保存,该对象中包含了每个task的基本信息,包括:task要处理的数据split,task创建时间,task开始执行时间,task结束时间等信息。这些信息肯定会在调度器中使用。
@ JobConf
每个作业的运行参数和配置选项被保存到一个JobConf对象中,该对象包含了配置文件mapred-site.xml,core-site.xml和hdfs-site.xml设置的选项和该作业的特有属性(用户名,InputFormat,Mapper等),一般是以key/value的形式保存,比如:想获取当前用户名,可以这样:
1
2
3
4
5
|
JobConf conf;
…….
String username = conf.get( "user.name" );
|
用户也可以通过该对象传递一些自己定义的全局属性,如用户自己定义了一个属性叫mapred.job.deadline(作业的deadline时间),用户可以在提交作业时设定该值:
hadoop jar hadoop-examples.jar wordcount -files cachefile.txt \
-D mapred.job.deadline=100000 \
input output
然后在调度器中这样获取该属性的值:
1
2
3
4
5
|
JobConf conf;
…….
int deadline=conf.getInt( "mapred.job.deadline" , -1); //获取mapred.job.deadline属性,如果没有设置,则返回-1
|
4. 总结
调度器是Hadoop的中枢,其重要性可想而知。用户如果要设计Hadoop调度器,需要对Hadoop的整个框架有比较深入的理解,同时需阅读一些很重要的类(如JobTracker和JobInprogress等)的源码,以便利用这些类完成你的调度算法。
Hadoop目前自带了三个比较常用的调度器,分别为JobQueueTaskScheduler (FIFO,但队列调度器),Capacity Scheduler(多队列多用户调度器)和Fair Scheduler(多队列多用户调度器),它们是你学习Hadoop调度器的最好资料。
5. 参考资料
(1) Hadoop-0.20.2源代码
原创文章,转载请注明: 转载自董的博客
本文链接地址: http://dongxicheng.org/mapreduce/how-to-write-hadoop-schedulers/
如何编写Hadoop调度器相关推荐
- 编写LitmusRT调度器插件
目录 背景 打桩 编译安装 引入TRACE模块来输出debug信息 为P-EDF定义每个CPU的状态 激活插件 模块测试 添加调度逻辑 帮助函数 demo_job_completion() demo_ ...
- Hadoop的调度器总结
随着MapReduce的流行,其开源实现Hadoop也变得越来越受推崇.在Hadoop系统中,有一个组件非常重要,那就是调度器,它的作用是将系统中空闲的资源按一定策略分配给作业.在Hadoop中,调度 ...
- Hadoop的调度器总结(转)
随着MapReduce的流行,其开源实现Hadoop也变得越来越受推崇.在Hadoop系统中,有一个组件非常重要,那就是调度器,它的作用是将系统中空闲的资源按一定策略分配给作业.在Hadoop中,调度 ...
- Hadoop资源调度器
hadoop调度器的作用是将系统中空闲的资源按一定策略分配给作业.调度器是一个可插拔的模块,用户可以根据自己的实际应用要求设计调度器.Hadoop中常见的调度器有三种,分别为: 1.基于队列的FIFO ...
- CDH 版本Hadoop Yarn配置容量调度器(默认是公平调度器)
配置中进行如下调整: 其实就是在在yarn-site.xm中添加yarn.resourcemanager.scheduler.class,修改成容量调度器: <property><n ...
- hadoop 2.9.2 yarn配置公平调度器
官网链接 任何文档都没有比官网提供的更权威.更准确 https://hadoop.apache.org/docs/r2.9.2/hadoop-yarn/hadoop-yarn-site/FairSch ...
- 配置hadoop 使用fair scheduler调度器
hadoop版本为cloudera hadoop cdh3u3 配置步骤为 1. 将$HADOOP_HOME/contrib/fairscheduler/hadoop-fairscheduler-0. ...
- 在win7上的eclipse向hadoop提交作业异常-权限/设置调度器
第一个问题,在win7上的eclipse向hadoop提交作业时,没有权限,异常信息如下: Java代码 Caused by: org.apache.hadoop.ipc.RemoteExcept ...
- 7种主流案例,告诉你调度器架构设计通用法则(干货!)
女主宣言 今天小编为大家转载一篇来自DBAplus社群的干货文章,希望能够帮助大家对关于调度器的理解.作者张晨,Strikingly数据平台工程师,算法.分布式系统和函数式编程爱好者.Shanghai ...
最新文章
- 几种民间排毒养颜祛痘DIY自制面膜 - 健康程序员,至尚生活!
- 文本输入框内实时检测输入的字数
- 汉字取首字母(第三节蓝桥杯决赛)
- 分辨率测试方法 ——TV line检测
- 【FPGA初级】4选一数据选择器的verilog实现(含testbench与波形)
- oracle清理磁盘空间
- RStudio入门使用常见问题(1)
- 有一种选择叫女程(2)
- 国外ERP产品点评 (转载)
- 华为鸿蒙概念机,华为Mate50Pro概念机,屏下镜头+一亿像素+麒麟9010,你喜欢吗?...
- 密码学入门(3):分组密码的模式
- M1卡性能简介及存取控制字节规则详解
- 【Java基础】NoClassDefFoundError 和 ClassNotFoundException的定义及其区别
- wpf使某个控件失去焦点_WPF的TextBox的焦点获取与失去焦点的死循环解决方案
- vue调用cordova 插件_Vue.js 使用cordova camera插件调取相机
- linux中安装无线网卡,linux中安装无线网卡
- 百胜系统连接服务器失败,MySQL服务器无法在百胜升级后启动
- 【安卓学习之微信抢红包】 微信抢红包 5 - 工具Android Monitor
- 只有程序员才能读懂的三国演义(二)
- 被骗几十万总结出来的Ddos***防护经验!(很值得分享 有意思)
热门文章
- 谷歌大脑科学家亲解 LSTM:一个关于“遗忘”与“记忆”的故事
- 外卖排序系统特征生产框架
- 这个德国山寨工厂靠抄袭干到240亿,让硅谷恨之入骨
- SpringBoot - 统一格式封装及高阶全局异常处理
- 白话Elasticsearch57-数据建模之实现悲观锁并发控制的三种方式(未成功)
- 实战SSM_O2O商铺_07【商铺注册】DAO层-新增与更新商铺
- Spring JDBC-Spring对事务管理的支持
- Python 循环控制语句-break/continue
- html5 滤色,深入理解CSS mix-blend-mode滤色screen混合模式
- Redis之跳跃表(面试重点容易考)