上次分析完JobTracker通过TaskScheduler如何把作业分配给TaskTracker,这次把目光 移动到TaskTracker上面。TaskTracker在这里其实是一个slave的从属关系。我在后面的分析会通过TaskTracker的执行流程,主要讲他的2个过程的分析1.作业启动执行2.与JobTracker的heatbeat的过程。2个过程都是非常的典型。

与JobTracker一样,TaskTracker也是作为一项服务所运行的,他也有自己的main函数入口。下面是一张全局的TaskTracker执行过程流程图:

jvmManager负责为每个Task分配一个java虚拟机环境让其执行,避免任务之间的干扰,TaskMemoryManager负责任务内存的监控,对于某些任务恶意消耗资源内存,会给予杀死此任务的处理。

1.TaskTracker任务启动

下面从main函数的入口开始分析一下TaskTracker的执行流程:

/*** Start the TaskTracker, point toward the indicated JobTracker* taskTracker同样也是一个服务程序,main函数开始执行*/public static void main(String argv[]) throws Exception {StringUtils.startupShutdownMessage(TaskTracker.class, argv, LOG);if (argv.length != 0) {System.out.println("usage: TaskTracker");System.exit(-1);}try {//初始化作业配置JobConf conf=new JobConf();// enable the server to track time spent waiting on locksReflectionUtils.setContentionTracing(conf.getBoolean("tasktracker.contention.tracking", false));//初始化度量统计系统DefaultMetricsSystem.initialize("TaskTracker");//根据作业配置初始化TaskTrackerTaskTracker tt = new TaskTracker(conf);//注册MBean,方便外界工具检测TaskTracker的状态MBeans.register("TaskTracker", "TaskTrackerInfo", tt);//执行TaskTracker服务主程序tt.run();} catch (Throwable e) {LOG.error("Can not start task tracker because "+StringUtils.stringifyException(e));System.exit(-1);}}

让后我们进入其中的执行主程序tt.run():

/*** The server retry loop.  * This while-loop attempts to connect to the JobTracker.  It only * loops when the old TaskTracker has gone bad (its state is* stale somehow) and we need to reinitialize everything.*/public void run() {try {getUserLogManager().start();//开启CleanUp清理线程startCleanupThreads();boolean denied = false;while (running && !shuttingDown && !denied) {boolean staleState = false;try {// This while-loop attempts reconnects if we get network errorswhile (running && !staleState && !shuttingDown && !denied) {try {//offerService()执行了核心的启动操作State osState = offerService();if (osState == State.STALE) {staleState = true;} else if (osState == State.DENIED) {denied = true;}......

我们可以看到,这里通过while操作,循环进行服务操作,如果拒绝服务,则会shutdown中断服务,服务的主要操作又在offerService方法中:

/*** Main service loop.  Will stay in this loop forever.* 主要的循环服务操作*/State offerService() throws Exception {.....// Send the heartbeat and process the jobtracker's directives//发送给JobTracker心跳包HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);// Note the time when the heartbeat returned, use this to decide when to send the// next heartbeat   lastHeartbeat = System.currentTimeMillis();....        //在这里获取了心跳回应中的action命令TaskTrackerAction[] actions = heartbeatResponse.getActions();if(LOG.isDebugEnabled()) {LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + heartbeatResponse.getResponseId() + " and " + ((actions != null) ? actions.length : 0) + " actions");}if (reinitTaskTracker(actions)) {return State.STALE;}// resetting heartbeat interval from the response.heartbeatInterval = heartbeatResponse.getHeartbeatInterval();justStarted = false;justInited = false;if (actions != null){ for(TaskTrackerAction action: actions) {if (action instanceof LaunchTaskAction) {//如果是执行Task任务指令,执行添加任务到任务队列中addToTaskQueue((LaunchTaskAction)action);} else if (action instanceof CommitTaskAction) {//如果是提交任务的指令,则执行后面的操作CommitTaskAction commitAction = (CommitTaskAction)action;if (!commitResponses.contains(commitAction.getTaskID())) {LOG.info("Received commit task action for " + commitAction.getTaskID());commitResponses.add(commitAction.getTaskID());}} else {//其他的指令一律添加到tasksToCleanup队列中等待被处理tasksToCleanup.put(action);}}}.....

在这里我省略了比较多的代码,把执行任务相关的核心操作保留了,在这里就开始执行了后面的和Task相关的很多操作了,当然这些任务都是通过收到JobTracker的心跳包Response来获得的,在通过获取里面的TaskTrackerAction命令来判断执行的。TaskTrackerAction里面包含了1枚举类,包括了以下的相关指令:

具体什么意思,看上面的英文解释就能理解了吧,上面代表了6种命令操作,我们侧重看第一个launch_task的命令执行,在上面的判断执行方法是addToTaskQueue();方法:

private void addToTaskQueue(LaunchTaskAction action) {//任务类型加入到任务待执行的容器中if (action.getTask().isMapTask()) {mapLauncher.addToTaskQueue(action);} else {reduceLauncher.addToTaskQueue(action);}}

这里的mapLauncher,reduceLauncher的类型是TaskLauncher,他是一个线程类:

class TaskLauncher extends Thread {private IntWritable numFreeSlots;private final int maxSlots;private List<TaskInProgress> tasksToLaunch;....

也就是说,待执行的map,Reduce任务都是添加到taskToLauch中的,

public void addToTaskQueue(LaunchTaskAction action) {//新建1个TIP,并加入tasksToLaunch列表synchronized (tasksToLaunch) {TaskInProgress tip = registerTask(action, this);tasksToLaunch.add(tip);//唤醒所有被tasksToLaunch wait的操作,说明此时有新的任务了tasksToLaunch.notifyAll();}}

加入之后唤醒相应的操作,这个就很好理解了,一定是在empty的时候被阻塞住了,

public void run() {while (!Thread.interrupted()) {try {TaskInProgress tip;Task task;synchronized (tasksToLaunch) {while (tasksToLaunch.isEmpty()) {tasksToLaunch.wait();}//get the TIPtip = tasksToLaunch.remove(0);task = tip.getTask();LOG.info("Trying to launch : " + tip.getTask().getTaskID() + " which needs " + task.getNumSlotsRequired() + " slots");}//wait for free slots to run.....//got a free slot. launch the taskstartNewTask(tip);

到了startNewTask就是开始所谓的任务了。到此为止,TaskTracker的任务执行这条路,我们算彻底打通了,相关时序图如下:

2.Heateat过程

下面我们看另外一个流程,心跳机制。此过程的实现同样的主要是在offerService的循环操作中。首先第一步,判断是否到了发送心跳包的时间,因为心跳包是隔周期性的时间发送的,所以这里必须会进行判读:

/*** Main service loop.  Will stay in this loop forever.* 主要的循环服务操作*/State offerService() throws Exception {long lastHeartbeat = System.currentTimeMillis();while (running && !shuttingDown) {try {long now = System.currentTimeMillis();// accelerate to account for multiple finished tasks up-front//判断上次心跳的时间+心跳等待时间是否已经到了当前时间,如果到了可以发送新的心跳包long remaining = (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;//如果还没到,时间有剩余,则要强行等待剩余的时间while (remaining > 0) {// sleeps for the wait time or // until there are *enough* empty slots to schedule taskssynchronized (finishedCount) {finishedCount.wait(remaining);// Recomputenow = System.currentTimeMillis();remaining = (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;if (remaining <= 0) {// Reset count finishedCount.set(0);break;}}}.....

假设已经到达了发送时间了,会执行后面的操作,检测版本号,TaskTracker和JobTracker的版本号必须一致:

.....// If the TaskTracker is just starting up:// 1. Verify the buildVersion// 2. Get the system directory & filesystemif(justInited) {//验证版本号,如果版本号不对,则返回拒绝状态String jobTrackerBV = jobClient.getBuildVersion();if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {String msg = "Shutting down. Incompatible buildVersion." +"\nJobTracker's: " + jobTrackerBV + "\nTaskTracker's: "+ VersionInfo.getBuildVersion();LOG.error(msg);try {jobClient.reportTaskTrackerError(taskTrackerName, null, msg);} catch(Exception e ) {LOG.info("Problem reporting to jobtracker: " + e);}return State.DENIED;}

如果通过上述2个验证,基本上就达到了发送的条件了,下面就准备发送操作了:

// Send the heartbeat and process the jobtracker's directives//发送给JobTracker心跳包HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

就是在上面这个方法中实现了发送的操作,此方法的返回值是JobTracker的心跳回复包,里面就包含着刚刚的TaskTrackerAction命令信息。我们进入transmitHeartBeat。之前分析过,心跳机制的有1个主要的作用就是汇报TaskTracker的资源使用情况和作业执行情况给JobTracker节点。以此可以让主节点可以进行资源调配。所以在上面的这个方法必不可少的操作是构建TaskTracker的Status状态信息。这个类包含的信息还比较多。下面是主要的此类的关系结构:

里面2大包含类ResourceStatus(TaskTracker资源使用情况),TaskTrackerHealthStatus(TaskTracker节点健康状况)。首先当然是新建一个Status了:

/*** Build and transmit the heart beat to the JobTracker* 将TaskTracker自身的状态信息发送给JobTracker,并获得一个心跳包的回应* @param now current time* @return false if the tracker was unknown* @throws IOException*/HeartbeatResponse transmitHeartBeat(long now) throws IOException {....// // Check if the last heartbeat got through... // if so then build the heartbeat information for the JobTracker;// else resend the previous status information.//if (status == null) {synchronized (this) {status = new TaskTrackerStatus(taskTrackerName, localHostname, httpPort, cloneAndResetRunningTaskStatuses(sendCounters), failures, maxMapSlots,maxReduceSlots); }

后面就是各种获取节点CPU,内存等基本信息,这里就不列举了,不过这里提一点,对于TaskTracker是否还能运行任务,在这里是通过TaskTracker是否达到了它的maxSlot上限作为1个标准。一般1个Reduce Task占据1个slot单元,1个Map Task同样占据1个Slot单元,如果1个TaskTracker结点拥有好多slot单元,那么他就可以运行很多Task。

//// Check if we should ask for a new Task// 检测TaskTracker是否需要一个新 Task任务//boolean askForNewTask;long localMinSpaceStart;synchronized (this) {//通过判断当前所占据的slots数量是否已经达到最大slot的数量作为标准askForNewTask = ((status.countOccupiedMapSlots() < maxMapSlots || status.countOccupiedReduceSlots() < maxReduceSlots) && acceptNewTasks); localMinSpaceStart = minSpaceStart;}

askForNewTask布尔类型就代表TaskTracker是否还能运行新的任务,封装好了这些Status信息之后,就要执行关键的发送步骤了:

    //// Xmit the heartbeat// 通过JobClient发送给JobTracker,并获得1个回复//HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted,justInited,askForNewTask, heartbeatResponseId);

是通过JobClient的方法发送的。得到的heartbeatResponse返回结果就是JobTracker结果了。至于里面JobClient具体怎么发送就不是本次分析的重点了,HeartBeat也分析完毕。同样看一下流程图:

总结

2个过程都是在offerService核心服务程序中执行的。了解完JobTracker和TaskTracker的工作原理,在聊了具体Task任务的执行的5个阶段,从微观Task细节的执行到宏观上作业调度的原理分析理解,的确对MapReduce计算模型的理解上升了许多的层次。

TaskTracker学习笔记相关推荐

  1. Hadoop学习笔记(1) ——菜鸟入门

     Hadoop学习笔记(1) --菜鸟入门 Hadoop是什么?先问一下百度吧: [百度百科]一个分布式系统基础架构,由Apache基金会所开发.用户能够在不了解分布式底层细节的情况下.开发分布式 ...

  2. Hadoop学习笔记(1)

    原文:http://www.cnblogs.com/zjfstudio/p/3859704.html Hadoop学习笔记(1) --菜鸟入门 Hadoop是什么?先问一下百度吧: [百度百科]一个分 ...

  3. Hadoop学习笔记—4.初识MapReduce

    一.神马是高大上的MapReduce MapReduce是Google的一项重要技术,它首先是一个 编程模型 ,用以进行大数据量的计算.对于大 数据量的计算,通常采用的处理手法就是并行计算.但对许多开 ...

  4. Hadoop学习笔记—10.Shuffle过程那点事儿

    Hadoop学习笔记-10.Shuffle过程那点事儿 一.回顾Reduce阶段三大步骤 在第四篇博文<初识MapReduce>中,我们认识了MapReduce的八大步骤,其中在Reduc ...

  5. 云计算学习笔记---异常处理---hadoop问题处理ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: java.lang.NullPoin

    云计算学习笔记---异常处理---hadoop问题处理ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: java.lang.NullPoin ...

  6. Hadoop学习笔记—13.分布式集群中节点的动态添加与下架

    Hadoop学习笔记-13.分布式集群中节点的动态添加与下架 开篇:在本笔记系列的第一篇中,我们介绍了如何搭建伪分布与分布模式的Hadoop集群.现在,我们来了解一下在一个Hadoop分布式集群中,如 ...

  7. 最全的Hive学习笔记

    最全的Hive学习笔记 1. 什么是hive 1.1. hive基本思想 1.2 为什么使用Hive 1.3. Hive的特点 2. hive的基本架构 3. hive安装 3.1. 最简安装:用内嵌 ...

  8. 读《从0开始学大数据》-- 学习笔记和感想随笔(一)

    主要记录阅读<从0开始学大数据>课程的学习笔记.课程系统性的介绍大数据的发展史.大数据系统的原理及架构.大数据生态体系中的主要产品.如何进行呢大数据开发实践.大数据平台开发及系统集成.使用 ...

  9. PyTorch 学习笔记(六):PyTorch hook 和关于 PyTorch backward 过程的理解 call

    您的位置 首页 PyTorch 学习笔记系列 PyTorch 学习笔记(六):PyTorch hook 和关于 PyTorch backward 过程的理解 发布: 2017年8月4日 7,195阅读 ...

最新文章

  1. java垃圾回收机制_笔记 | Java垃圾回收机制
  2. oracle42997,oracle与db2的比较
  3. IDEA 终于支持中文版和 JDK 直接下载了(太方便了)附新版介绍视频
  4. MyBatis学习总结(11)——MyBatis动态Sql语句
  5. Android Studio属性动画,Android开发-RecyclerView-AndroidStudio(六)属性动画(3)AddDuration
  6. 深度学习2.0-神经网络
  7. 前后端-SpringBoot-JPA的简单写法(配合前端vue)
  8. AWR6843芯片使用JFlash下载外部NorFlash
  9. 第三讲 地理空间数据的组成与特征
  10. 黑盒测试设计专题:正交试验
  11. azul zing_Azul垃圾收集器
  12. 微信第三方网页关闭当前页面回到微信对话窗口
  13. window.onload的作用
  14. ModuleNotFoundError: No module named ‘ale_py._ale_py‘
  15. 安卓WebView的那些坑
  16. Using the URLconf defined in test1.urls, Django tried these URL patterns, in this order: ^admin/解决
  17. 基于多特征的技术融合关系预测及其价值评估
  18. 网络红人百度百科怎么做_如何做网红搜狗好搜百科技巧分享
  19. 计算机网络实践之元气骑士公网异地联机(二) 两种方案可行性分析
  20. 聚(N-异丙基丙烯酰胺),POLY(N-ISOPROPYL ACRYLAMIDE),PNIPAM

热门文章

  1. gradle工程之间依赖
  2. Spark学习之路 (二)Spark2.3 HA集群的分布式安装
  3. varchar与char有什么区别?
  4. 12月8日 排序和字符数组
  5. 《当你的才华还撑不起你的梦想时》-特立独行的猫
  6. 代理IP是按照流量计费还是数量计费好?
  7. WEBPLAYER9电影资源站小偷程序由 宋飞飞发布
  8. 【洁洁送书第一期】Python高并发与高性能编程: 原理与实践
  9. 镁光ddr3布线规则_DDR3走线规则.pdf
  10. Generic Webhook Trigger 自动化构建jenkins