Hadoop的NameNode在启动时都做了哪些事情?源码解析
目录
- 1.启动9870端口服务
- 2.加载镜像文件和编辑日志
- 3.创建RPC服务
- 4.对NameNode启动资源检查
- 5.DataNode心跳超时判断
- 6.安全模式
以hadoop3.x版本为例
namenode启动时大致有6个步骤
- 1)首先启动9870端口
- 2)启动完端口之后开始加载镜像文件和编辑日志
- 3)紧接着创建RPC服务
- 4)然后开始对NameNode的资源进行检测
- 检测当前磁盘空间是否能够启动NameNode
- 要检查镜像文件和编辑日志的路径上能存储多少内容,默认大小是100M,如果当前磁盘空间小于100M,就要进行警告
- 5)对DataNode的心跳进行校验
- 心跳检测时如果DateNode超过10分钟30秒没有被检测到,就认为DateNode宕机
- 6)判断是否进入安全模式
- 默认block块加载到
99.9%
之后离开安全模式
- 默认block块加载到
下面手把手带着大家跟进NameNode启动源码
ctrl + n搜索namenode,找到namenode的main方法
1.启动9870端口服务
public static void main(String argv[]) throws Exception {NameNode namenode = createNameNode(argv, null);}
进入createNameNode方法,nameNode中new了一个Namenode对象
public static NameNode createNameNode(String argv[], Configuration conf) throws IOException {return new NameNode(conf);}}
进入Namenode,在里面进行了初始化
public NameNode(Configuration conf) throws IOException {this(conf, NamenodeRole.NAMENODE);}protected NameNode(Configuration conf, NamenodeRole role) throws IOException {initialize(getConf());
}
进入initialize(getConf())
,初始化时启动了http服务
protected void initialize(Configuration conf) throws IOException {if (NamenodeRole.NAMENODE == role) {startHttpServer(conf);}loadNamesystem(conf);
}
进入startHttpServer(conf)
,启动http服务时,要获取端口号
private void startHttpServer(final Configuration conf) throws IOException {httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));httpServer.start();httpServer.setStartupProgress(startupProgress);}
进入
getHttpServerBindAddress(conf)
protected InetSocketAddress getHttpServerBindAddress(Configuration conf) {InetSocketAddress bindAddress = getHttpServerAddress(conf); }
进入
getHttpServerAddress(conf)
protected InetSocketAddress getHttpServerAddress(Configuration conf) {return getHttpAddress(conf); }
进入
getHttpAddress(conf)
public static InetSocketAddress getHttpAddress(Configuration conf) {return NetUtils.createSocketAddr(conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT));}
进入
DFS_NAMENODE_HTTP_ADDRESS_DEFAULT
public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT;
进入
DFS_NAMENODE_HTTP_PORT_DEFAULT
public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT =HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
进入
DFS_NAMENODE_HTTP_PORT_DEFAULT
,就看到了9870端口int DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870;
调用httpServer.start()
启动http服务,setupServlets(httpServer, conf)
void start() throws IOException {setupServlets(httpServer, conf);httpServer.start();}
进入setupServlets(httpServer, conf)
,setupServlets(httpServer, conf)
会启动各种服务,例如image和fsck(磁盘刷写)
private static void setupServlets(HttpServer2 httpServer, Configuration conf) {httpServer.addInternalServlet("startupProgress",StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class);httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,true);httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,ImageServlet.class, true);}
2.加载镜像文件和编辑日志
回退到initialize
方法,里面有一个loadNamesystem(conf)
方法,用来加载镜像文件
protected void initialize(Configuration conf) throws IOException {if (NamenodeRole.NAMENODE == role) {startHttpServer(conf);}loadNamesystem(conf);
}
进入loadNamesystem(conf)
方法,里面有一个loadFromDisk(conf)
方法,从磁盘上加载镜像文件
protected void loadNamesystem(Configuration conf) throws IOException {this.namesystem = FSNamesystem.loadFromDisk(conf);}
接着往下走,进入loadFromDisk(conf)
- 从磁盘加载镜像文件时,会先new 一个镜像文件
new FSImage(namenode对应存储地址, 编辑日志对应存储地址)
- 然后会加载镜像文件和编辑日志,
loadFSImage(startOpt)
static FSNamesystem loadFromDisk(Configuration conf) throws IOException {checkConfiguration(conf);FSImage fsImage = new FSImage(conf,FSNamesystem.getNamespaceDirs(conf),FSNamesystem.getNamespaceEditsDirs(conf));long loadStart = monotonicNow();try {namesystem.loadFSImage(startOpt);} }
3.创建RPC服务
回退到initialize
方法,加载完镜像文件之后会创建RPC服务,createRpcServer(conf)
,namenode是一个服务端,需要把服务开启,允许客户端后续对它进行访问
protected void initialize(Configuration conf) throws IOException {if (NamenodeRole.NAMENODE == role) {startHttpServer(conf);}loadNamesystem(conf);rpcServer = createRpcServer(conf);
}
进入到createRpcServer(conf)
方法
protected NameNodeRpcServer createRpcServer(Configuration conf)throws IOException {return new NameNodeRpcServer(conf, this);}
进入到NameNodeRpcServer(conf, this)
,服务端会new一个RPC.Builder来产生服务,以此来供客户端访问
RPC.builder设置了相关协议、实例对象、对应的服务器地址、端口号
public NameNodeRpcServer(Configuration conf, NameNode nn) throws IOException { serviceRpcServer = new RPC.Builder(conf).setProtocol(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class).setInstance(clientNNPbService).setBindAddress(bindHost).setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount).setVerbose(false).setSecretManager(namesystem.getDelegationTokenSecretManager()).build();}
4.对NameNode启动资源检查
startCommonServices(conf)
方法
protected void initialize(Configuration conf) throws IOException {if (NamenodeRole.NAMENODE == role) {startHttpServer(conf);}loadNamesystem(conf);rpcServer = createRpcServer(conf);startCommonServices(conf);
}
进入startCommonServices(conf)
方法
private void startCommonServices(Configuration conf) throws IOException {namesystem.startCommonServices(conf, haContext);
}
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {try {nnResourceChecker = new NameNodeResourceChecker(conf);checkAvailableResources();}
进入NameNodeResourceChecker(conf)
,
public NameNodeResourceChecker(Configuration conf) throws IOException {duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);}
DFS_NAMENODE_DU_RESERVED_DEFAULT
的值默认是100M,启动namenode之后,要检查镜像文件和编辑日志的路径上能存储多少内容,默认大小是100M,,如果空间小于100M,就要进行警告
public static final long DFS_NAMENODE_DU_RESERVED_DEFAULT = 1024 * 1024 * 100; // 100 MB
再回到startCommonServices
方法里,NameNodeResourceChecker(conf)
相当于定义好一个阈值,接下来的checkAvailableResources()
才是真正的进行磁盘检测
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {nnResourceChecker = new NameNodeResourceChecker(conf);checkAvailableResources();blockManager.activate(conf, completeBlocksTotal);
}
进入到checkAvailableResources()
方法
void checkAvailableResources() {hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();}
hasAvailableDiskSpace()
用来检查是否有存储空间
public boolean hasAvailableDiskSpace() {return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(),minimumRedundantVolumes);}
areResourcesAvailable(volumes.values(), minimumRedundantVolumes)
检测资源是否够用
static boolean areResourcesAvailable(Collection<? extends CheckableNameNodeResource> resources,int minimumRedundantResources) {if (!resource.isResourceAvailable()) {// Short circuit - a required resource is not available.return false;}
}
isResourceAvailable
是一个接口,ctrl + alt + B就找到方法的实现类,实现类里面会判断配置信息是否够用,如果不够用,会打印出来提示信息,提示磁盘空间不足
public class NameNodeResourceChecker {public boolean isResourceAvailable() {if (availableSpace < duReserved) {LOG.warn("Space available on volume '" + volume + "' is "+ availableSpace +", which is below the configured reserved amount " + duReserved);return false;} else {return true;}}
}
5.DataNode心跳超时判断
回到startCommonServices
方法里,磁盘检测完毕之后,有个blockManager.activate(conf, completeBlocksTotal)
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {nnResourceChecker = new NameNodeResourceChecker(conf);checkAvailableResources();blockManager.activate(conf, completeBlocksTotal);
}
进入activate(conf, completeBlocksTotal)
,先看datanodeManager.activate(conf)
,是用来管理datanode和namenode通信的manager
public void activate(Configuration conf, long blockTotal) {datanodeManager.activate(conf);bmSafeMode.activate(blockTotal);
}
进入datanodeManager.activate(conf)
,其中的heartbeatManager
是心跳管理者,用来管理心跳信息
void activate(final Configuration conf) {heartbeatManager.activate();
}
进入到heartbeatManager.activate()
,里面会启动心跳机制的线程,启动线程需要看一下run()
方法
void activate() {heartbeatThread.start();
}
查找到run()
方法,在run方法里面会进行心跳检查heartbeatCheck()
public void run() {if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {heartbeatCheck();}
}
进入到heartbeatCheck()
,dm.isDatanodeDead(d)
会判断datanode是否还活着
void heartbeatCheck() {synchronized(this) {for (DatanodeDescriptor d : datanodes) {if (dead == null && dm.isDatanodeDead(d)) {stats.incrExpiredHeartbeats();dead = d;}}}
进入dm.isDatanodeDead(d)
,检测死掉的标准是上次获取的时间如果小于 当前时间 减去 一个固定的 阈值,就说明namenode死掉,这个heartbeatExpireInterval
的值是10分钟30秒
boolean isDatanodeDead(DatanodeDescriptor node) {return (node.getLastUpdateMonotonic() <(monotonicNow() - heartbeatExpireInterval));}
其中heartbeatRecheckInterval
是五分钟,heartbeatIntervalSeconds
的值是3,10乘以1000是10秒,10秒乘以3是30秒。
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * 1000 * heartbeatIntervalSeconds;
6.安全模式
再往上回退到blockManager.activate(conf, completeBlocksTotal)
,心跳超时判断之后紧接着执行安全模式,bmSafeMode.activate(blockTotal)
public void activate(Configuration conf, long blockTotal) {datanodeManager.activate(conf);bmSafeMode.activate(blockTotal);
}
进入到bmSafeMode.activate(blockTotal)
,看一下是如何管理安全模式的,其中setBlockTotal(total)
是用来设置阈值,表示有多少的块启动之后datanode才能启动
void activate(long total) {setBlockTotal(total);if (areThresholdsMet()) {boolean exitResult = leaveSafeMode(false);Preconditions.checkState(exitResult, "Failed to leave safe mode.");} else {// enter safe modestatus = BMSafeModeStatus.PENDING_THRESHOLD;initializeReplQueuesIfNecessary();reportStatus("STATE* Safe mode ON.", true);lastStatusReport = monotonicNow();}
}
void setBlockTotal(long total) {assert namesystem.hasWriteLock();synchronized (this) {this.blockTotal = total;this.blockThreshold = (long) (total * threshold);}}
其中threshold
=0.999f
,意思就是,所有的块当中,假设1000个块,如果有1个块没启动,是允许datanode正常启动的,也就是说99.9%的块都启动了之后,namenode就可以启动
this.blockThreshold = (long) (total * threshold);
回退到bmSafeMode.activate(blockTotal)
,设置完datanode启动的阈值之后,紧接着会判断是进入安全模式还是离开安全模式
void activate(long total) {setBlockTotal(total);if (areThresholdsMet()) {boolean exitResult = leaveSafeMode(false);Preconditions.checkState(exitResult, "Failed to leave safe mode.");} else {// enter safe modestatus = BMSafeModeStatus.PENDING_THRESHOLD;initializeReplQueuesIfNecessary();reportStatus("STATE* Safe mode ON.", true);lastStatusReport = monotonicNow();}
}
进入到areThresholdsMet()
,会判断blockSafe >= blockThreshold
,表示正常启动的块是否大于对应的阈值,
private boolean areThresholdsMet() {synchronized (this) {return blockSafe >= blockThreshold && datanodeNum >= datanodeThreshold;}
}
Hadoop的NameNode在启动时都做了哪些事情?源码解析相关推荐
- android 输入法如何启动流程_android输入法02:openwnn源码解析01—输入流程
android 输入法 02:openwnn 源码解析 01-输入流程 之后要开始 android 日文输入法的测试,因此现在开始研究 android 输入法.之前两 篇文章已经对 android 自 ...
- 构造函数中new在执行时都做了哪些事情?
构造函数是一种特殊的函数,主要用于初始化对象,即为对象成员变量赋初始值,它总与new一起使用,我们可以把对象中的一些公共的属性和方法抽离出来,然后封装到这个函数里面 new在执行时会做四件事情: 1. ...
- 使用U盘传数据时操作系统做了什么(源码分析)
一.背景 学习linux文件系统时考虑一个具体的问题:我们经常会用U盘传输东西到计算机中.当我们把U盘插入到一台计算机上,并且从U盘中复制文件到计算机里,然后卸载U盘,将U盘拔出.操作系统在这一连串过 ...
- 【Spring框架】 ☞ 项目启动时执行特定处理及ApplicationListener源码分析
1.背景 在一些业务场景中,在容器启动完成后,需要处理一些诸如:kafka业务注册,数据处理,初始化缓存等的操作. 本文重点介绍如何在服务启动中,或启动完成时执行相关处理. 2.针对上述场景,有如下实 ...
- eureka client客户端启动时都做了哪些事
- 计算机启动时都发生了什么?
计算机启动时都发生了什么? 介绍下基本输入输出系统 1.BIOS(Basic I/O system) BIOS 也就是基本输入输出系统 – 同时也是计算机启动时加载的第一个软件 它的位置: 计算机主板 ...
- 【RocketMQ|源码分析】namesrv启动停止过程都做了什么
简介 namesrv在是RocketMQ中一个十分重要的组件,它相当于是Kafka中的zookeeper,Spring Cloud Alibaba中的nacos.它的主要作用是为消息生产者和消息消费者 ...
- JVM SandBox源码解析(一):启动时初始化、启动时加载模块、ModuleHttpServlet进行Http路由
前言 上篇JVM SandBox实现原理详解文章中,主要解析了JVM SandBox的核心实现原理,并且对SandBoxClassLoader和ModuleClassLoader做了源码解析,也解释了 ...
- c语言程序执行完main函数后,一个c程序在执行main函数之前和main之后都做了那些事情啊该如何解决...
当前位置:我的异常网» C语言 » 一个c程序在执行main函数之前和main之后都做了那些 一个c程序在执行main函数之前和main之后都做了那些事情啊该如何解决 www.myexceptions ...
最新文章
- jquery 插件开发的作用域及基础
- NBT:用16S及18S rRNA全长进行微生物多样性研究
- 使用Python+Qt时解决QTreeWidget中的内容超出边界后自动隐藏的问题
- Android之TextView文字绘制流程
- linux lighttpd php,Ubuntu下搭建Lighttpd+PHP+MySQL环境
- 每日一笑 | 男朋友整天沉迷游戏怎么办...?
- 用于大型事件处理的Akka Java
- 基于GCN的推荐该怎么搞?
- 【Spring 5】响应式Web框架实战(上) 1
- TED如何掌控你的时间(第二天)
- 自拟计算机作文100字,三年级自拟作文100字
- Java基础Day05
- javascript原生移动云编程1 - 十分钟做出跨平台原生App
- 基于Vue.js活动倒计时组件
- 直播实录|百度大脑EasyDL·NVIDIA专场 部署专家
- 基于scrapy框架爬取新浪体育部分板块内容
- [顶][转]东北菜大集
- mongo数据库取并集
- 技术合同属于什么类别的合同
- 魔众EDM邮件营销系统 v1.0.0 专业的EDM邮件营销系统
热门文章
- 中国历史朝代及皇帝简介
- php 炒粉,一碗粉定终身?黄屋屯炒粉还有着这样的浪漫故事,钦州人口口相传...
- C++ 模板与泛型详解
- 新手小白设计干货|使用ps制作一张简单海报
- java编写分数加减法_JAVA 分数加减法
- ZSTU OJ-4454 招兵买马
- Linux下empress数据库,Linux命令compress使用“Lempress-Ziv”编码压缩数据文件
- ArrayList, LinkedList, Vector - dudu:史上最详解
- Deep Learning for Modulation Classification: Signal Features in Performance Analysis解读
- 基于MSP432控制的红外循迹爬坡小车设计报告