目录

  • 1.启动9870端口服务
  • 2.加载镜像文件和编辑日志
  • 3.创建RPC服务
  • 4.对NameNode启动资源检查
  • 5.DataNode心跳超时判断
  • 6.安全模式

以hadoop3.x版本为例

namenode启动时大致有6个步骤

  • 1)首先启动9870端口
  • 2)启动完端口之后开始加载镜像文件和编辑日志
  • 3)紧接着创建RPC服务
  • 4)然后开始对NameNode的资源进行检测
    • 检测当前磁盘空间是否能够启动NameNode
    • 要检查镜像文件和编辑日志的路径上能存储多少内容,默认大小是100M,如果当前磁盘空间小于100M,就要进行警告
  • 5)对DataNode的心跳进行校验
    • 心跳检测时如果DateNode超过10分钟30秒没有被检测到,就认为DateNode宕机
  • 6)判断是否进入安全模式
    • 默认block块加载到99.9%之后离开安全模式

下面手把手带着大家跟进NameNode启动源码

ctrl + n搜索namenode,找到namenode的main方法

1.启动9870端口服务

public static void main(String argv[]) throws Exception {NameNode namenode = createNameNode(argv, null);}

进入createNameNode方法,nameNode中new了一个Namenode对象

public static NameNode createNameNode(String argv[], Configuration conf) throws IOException {return new NameNode(conf);}}

进入Namenode,在里面进行了初始化

public NameNode(Configuration conf) throws IOException {this(conf, NamenodeRole.NAMENODE);}protected NameNode(Configuration conf, NamenodeRole role) throws IOException {initialize(getConf());
}

进入initialize(getConf()),初始化时启动了http服务

protected void initialize(Configuration conf) throws IOException {if (NamenodeRole.NAMENODE == role) {startHttpServer(conf);}loadNamesystem(conf);
}

进入startHttpServer(conf),启动http服务时,要获取端口号

private void startHttpServer(final Configuration conf) throws IOException {httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));httpServer.start();httpServer.setStartupProgress(startupProgress);}
  • 进入getHttpServerBindAddress(conf)

    • protected InetSocketAddress getHttpServerBindAddress(Configuration conf) {InetSocketAddress bindAddress = getHttpServerAddress(conf);
      }
      
    • 进入getHttpServerAddress(conf)

    • protected InetSocketAddress getHttpServerAddress(Configuration conf) {return getHttpAddress(conf);
      }
      
    • 进入getHttpAddress(conf)

    • public static InetSocketAddress getHttpAddress(Configuration conf) {return  NetUtils.createSocketAddr(conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT));}
      
    • 进入DFS_NAMENODE_HTTP_ADDRESS_DEFAULT

    • public static final String  DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT;
      
    • 进入DFS_NAMENODE_HTTP_PORT_DEFAULT

    • public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT =HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
      
    • 进入DFS_NAMENODE_HTTP_PORT_DEFAULT,就看到了9870端口

    • int DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870;
      

调用httpServer.start()启动http服务,setupServlets(httpServer, conf)

void start() throws IOException {setupServlets(httpServer, conf);httpServer.start();}

进入setupServlets(httpServer, conf)setupServlets(httpServer, conf)会启动各种服务,例如image和fsck(磁盘刷写)

private static void setupServlets(HttpServer2 httpServer, Configuration conf) {httpServer.addInternalServlet("startupProgress",StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class);httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,true);httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,ImageServlet.class, true);}

2.加载镜像文件和编辑日志

回退到initialize方法,里面有一个loadNamesystem(conf)方法,用来加载镜像文件

protected void initialize(Configuration conf) throws IOException {if (NamenodeRole.NAMENODE == role) {startHttpServer(conf);}loadNamesystem(conf);
}

进入loadNamesystem(conf)方法,里面有一个loadFromDisk(conf)方法,从磁盘上加载镜像文件

protected void loadNamesystem(Configuration conf) throws IOException {this.namesystem = FSNamesystem.loadFromDisk(conf);}

接着往下走,进入loadFromDisk(conf)

  • 从磁盘加载镜像文件时,会先new 一个镜像文件new FSImage(namenode对应存储地址, 编辑日志对应存储地址)
  • 然后会加载镜像文件和编辑日志,loadFSImage(startOpt)
static FSNamesystem loadFromDisk(Configuration conf) throws IOException {checkConfiguration(conf);FSImage fsImage = new FSImage(conf,FSNamesystem.getNamespaceDirs(conf),FSNamesystem.getNamespaceEditsDirs(conf));long loadStart = monotonicNow();try {namesystem.loadFSImage(startOpt);} }

3.创建RPC服务

回退到initialize方法,加载完镜像文件之后会创建RPC服务,createRpcServer(conf),namenode是一个服务端,需要把服务开启,允许客户端后续对它进行访问

protected void initialize(Configuration conf) throws IOException {if (NamenodeRole.NAMENODE == role) {startHttpServer(conf);}loadNamesystem(conf);rpcServer = createRpcServer(conf);
}

进入到createRpcServer(conf)方法

protected NameNodeRpcServer createRpcServer(Configuration conf)throws IOException {return new NameNodeRpcServer(conf, this);}

进入到NameNodeRpcServer(conf, this),服务端会new一个RPC.Builder来产生服务,以此来供客户端访问

RPC.builder设置了相关协议、实例对象、对应的服务器地址、端口号

 public NameNodeRpcServer(Configuration conf, NameNode nn) throws IOException {  serviceRpcServer = new RPC.Builder(conf).setProtocol(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class).setInstance(clientNNPbService).setBindAddress(bindHost).setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount).setVerbose(false).setSecretManager(namesystem.getDelegationTokenSecretManager()).build();}

4.对NameNode启动资源检查

startCommonServices(conf)方法

protected void initialize(Configuration conf) throws IOException {if (NamenodeRole.NAMENODE == role) {startHttpServer(conf);}loadNamesystem(conf);rpcServer = createRpcServer(conf);startCommonServices(conf);
}

进入startCommonServices(conf)方法

private void startCommonServices(Configuration conf) throws IOException {namesystem.startCommonServices(conf, haContext);
}
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {try {nnResourceChecker = new NameNodeResourceChecker(conf);checkAvailableResources();}

进入NameNodeResourceChecker(conf)

public NameNodeResourceChecker(Configuration conf) throws IOException {duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);}

DFS_NAMENODE_DU_RESERVED_DEFAULT的值默认是100M,启动namenode之后,要检查镜像文件和编辑日志的路径上能存储多少内容,默认大小是100M,,如果空间小于100M,就要进行警告

public static final long    DFS_NAMENODE_DU_RESERVED_DEFAULT = 1024 * 1024 * 100; // 100 MB

再回到startCommonServices方法里,NameNodeResourceChecker(conf)相当于定义好一个阈值,接下来的checkAvailableResources()才是真正的进行磁盘检测

void startCommonServices(Configuration conf, HAContext haContext) throws IOException {nnResourceChecker = new NameNodeResourceChecker(conf);checkAvailableResources();blockManager.activate(conf, completeBlocksTotal);
}

进入到checkAvailableResources()方法

void checkAvailableResources() {hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();}

hasAvailableDiskSpace()用来检查是否有存储空间

public boolean hasAvailableDiskSpace() {return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(),minimumRedundantVolumes);}

areResourcesAvailable(volumes.values(), minimumRedundantVolumes)检测资源是否够用

static boolean areResourcesAvailable(Collection<? extends CheckableNameNodeResource> resources,int minimumRedundantResources) {if (!resource.isResourceAvailable()) {// Short circuit - a required resource is not available.return false;}
}

isResourceAvailable是一个接口,ctrl + alt + B就找到方法的实现类,实现类里面会判断配置信息是否够用,如果不够用,会打印出来提示信息,提示磁盘空间不足

public class NameNodeResourceChecker {public boolean isResourceAvailable() {if (availableSpace < duReserved) {LOG.warn("Space available on volume '" + volume + "' is "+ availableSpace +", which is below the configured reserved amount " + duReserved);return false;} else {return true;}}
}

5.DataNode心跳超时判断

回到startCommonServices方法里,磁盘检测完毕之后,有个blockManager.activate(conf, completeBlocksTotal)

void startCommonServices(Configuration conf, HAContext haContext) throws IOException {nnResourceChecker = new NameNodeResourceChecker(conf);checkAvailableResources();blockManager.activate(conf, completeBlocksTotal);
}

进入activate(conf, completeBlocksTotal),先看datanodeManager.activate(conf),是用来管理datanode和namenode通信的manager

public void activate(Configuration conf, long blockTotal) {datanodeManager.activate(conf);bmSafeMode.activate(blockTotal);
}

进入datanodeManager.activate(conf),其中的heartbeatManager是心跳管理者,用来管理心跳信息

void activate(final Configuration conf) {heartbeatManager.activate();
}

进入到heartbeatManager.activate(),里面会启动心跳机制的线程,启动线程需要看一下run()方法

void activate() {heartbeatThread.start();
}

查找到run()方法,在run方法里面会进行心跳检查heartbeatCheck()

public void run() {if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {heartbeatCheck();}
}

进入到heartbeatCheck()dm.isDatanodeDead(d)会判断datanode是否还活着

void heartbeatCheck() {synchronized(this) {for (DatanodeDescriptor d : datanodes) {if (dead == null && dm.isDatanodeDead(d)) {stats.incrExpiredHeartbeats();dead = d;}}}

进入dm.isDatanodeDead(d),检测死掉的标准是上次获取的时间如果小于 当前时间 减去 一个固定的 阈值,就说明namenode死掉,这个heartbeatExpireInterval的值是10分钟30秒

boolean isDatanodeDead(DatanodeDescriptor node) {return (node.getLastUpdateMonotonic() <(monotonicNow() - heartbeatExpireInterval));}

其中heartbeatRecheckInterval是五分钟,heartbeatIntervalSeconds的值是3,10乘以1000是10秒,10秒乘以3是30秒。

this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * 1000 * heartbeatIntervalSeconds;

6.安全模式

再往上回退到blockManager.activate(conf, completeBlocksTotal),心跳超时判断之后紧接着执行安全模式,bmSafeMode.activate(blockTotal)

public void activate(Configuration conf, long blockTotal) {datanodeManager.activate(conf);bmSafeMode.activate(blockTotal);
}

进入到bmSafeMode.activate(blockTotal),看一下是如何管理安全模式的,其中setBlockTotal(total)是用来设置阈值,表示有多少的块启动之后datanode才能启动

void activate(long total) {setBlockTotal(total);if (areThresholdsMet()) {boolean exitResult = leaveSafeMode(false);Preconditions.checkState(exitResult, "Failed to leave safe mode.");} else {// enter safe modestatus = BMSafeModeStatus.PENDING_THRESHOLD;initializeReplQueuesIfNecessary();reportStatus("STATE* Safe mode ON.", true);lastStatusReport = monotonicNow();}
}
void setBlockTotal(long total) {assert namesystem.hasWriteLock();synchronized (this) {this.blockTotal = total;this.blockThreshold = (long) (total * threshold);}}

其中threshold=0.999f,意思就是,所有的块当中,假设1000个块,如果有1个块没启动,是允许datanode正常启动的,也就是说99.9%的块都启动了之后,namenode就可以启动

this.blockThreshold = (long) (total * threshold);

回退到bmSafeMode.activate(blockTotal),设置完datanode启动的阈值之后,紧接着会判断是进入安全模式还是离开安全模式

void activate(long total) {setBlockTotal(total);if (areThresholdsMet()) {boolean exitResult = leaveSafeMode(false);Preconditions.checkState(exitResult, "Failed to leave safe mode.");} else {// enter safe modestatus = BMSafeModeStatus.PENDING_THRESHOLD;initializeReplQueuesIfNecessary();reportStatus("STATE* Safe mode ON.", true);lastStatusReport = monotonicNow();}
}

进入到areThresholdsMet(),会判断blockSafe >= blockThreshold,表示正常启动的块是否大于对应的阈值,

private boolean areThresholdsMet() {synchronized (this) {return blockSafe >= blockThreshold && datanodeNum >= datanodeThreshold;}
}

Hadoop的NameNode在启动时都做了哪些事情?源码解析相关推荐

  1. android 输入法如何启动流程_android输入法02:openwnn源码解析01—输入流程

    android 输入法 02:openwnn 源码解析 01-输入流程 之后要开始 android 日文输入法的测试,因此现在开始研究 android 输入法.之前两 篇文章已经对 android 自 ...

  2. 构造函数中new在执行时都做了哪些事情?

    构造函数是一种特殊的函数,主要用于初始化对象,即为对象成员变量赋初始值,它总与new一起使用,我们可以把对象中的一些公共的属性和方法抽离出来,然后封装到这个函数里面 new在执行时会做四件事情: 1. ...

  3. 使用U盘传数据时操作系统做了什么(源码分析)

    一.背景 学习linux文件系统时考虑一个具体的问题:我们经常会用U盘传输东西到计算机中.当我们把U盘插入到一台计算机上,并且从U盘中复制文件到计算机里,然后卸载U盘,将U盘拔出.操作系统在这一连串过 ...

  4. 【Spring框架】 ☞ 项目启动时执行特定处理及ApplicationListener源码分析

    1.背景 在一些业务场景中,在容器启动完成后,需要处理一些诸如:kafka业务注册,数据处理,初始化缓存等的操作. 本文重点介绍如何在服务启动中,或启动完成时执行相关处理. 2.针对上述场景,有如下实 ...

  5. eureka client客户端启动时都做了哪些事

  6. 计算机启动时都发生了什么?

    计算机启动时都发生了什么? 介绍下基本输入输出系统 1.BIOS(Basic I/O system) BIOS 也就是基本输入输出系统 – 同时也是计算机启动时加载的第一个软件 它的位置: 计算机主板 ...

  7. 【RocketMQ|源码分析】namesrv启动停止过程都做了什么

    简介 namesrv在是RocketMQ中一个十分重要的组件,它相当于是Kafka中的zookeeper,Spring Cloud Alibaba中的nacos.它的主要作用是为消息生产者和消息消费者 ...

  8. JVM SandBox源码解析(一):启动时初始化、启动时加载模块、ModuleHttpServlet进行Http路由

    前言 上篇JVM SandBox实现原理详解文章中,主要解析了JVM SandBox的核心实现原理,并且对SandBoxClassLoader和ModuleClassLoader做了源码解析,也解释了 ...

  9. c语言程序执行完main函数后,一个c程序在执行main函数之前和main之后都做了那些事情啊该如何解决...

    当前位置:我的异常网» C语言 » 一个c程序在执行main函数之前和main之后都做了那些 一个c程序在执行main函数之前和main之后都做了那些事情啊该如何解决 www.myexceptions ...

最新文章

  1. jquery 插件开发的作用域及基础
  2. NBT:用16S及18S rRNA全长进行微生物多样性研究
  3. 使用Python+Qt时解决QTreeWidget中的内容超出边界后自动隐藏的问题
  4. Android之TextView文字绘制流程
  5. linux lighttpd php,Ubuntu下搭建Lighttpd+PHP+MySQL环境
  6. 每日一笑 | 男朋友整天沉迷游戏怎么办...?
  7. 用于大型事件处理的Akka Java
  8. 基于GCN的推荐该怎么搞?
  9. 【Spring 5】响应式Web框架实战(上) 1
  10. TED如何掌控你的时间(第二天)
  11. 自拟计算机作文100字,三年级自拟作文100字
  12. Java基础Day05
  13. javascript原生移动云编程1 - 十分钟做出跨平台原生App
  14. 基于Vue.js活动倒计时组件
  15. 直播实录|百度大脑EasyDL·NVIDIA专场 部署专家
  16. 基于scrapy框架爬取新浪体育部分板块内容
  17. [顶][转]东北菜大集
  18. mongo数据库取并集
  19. 技术合同属于什么类别的合同
  20. 魔众EDM邮件营销系统 v1.0.0 专业的EDM邮件营销系统

热门文章

  1. 中国历史朝代及皇帝简介
  2. php 炒粉,一碗粉定终身?黄屋屯炒粉还有着这样的浪漫故事,钦州人口口相传...
  3. C++ 模板与泛型详解
  4. 新手小白设计干货|使用ps制作一张简单海报
  5. java编写分数加减法_JAVA 分数加减法
  6. ZSTU OJ-4454 招兵买马
  7. Linux下empress数据库,Linux命令compress使用“Lempress-Ziv”编码压缩数据文件
  8. ArrayList, LinkedList, Vector - dudu:史上最详解
  9. Deep Learning for Modulation Classification: Signal Features in Performance Analysis解读
  10. 基于MSP432控制的红外循迹爬坡小车设计报告