在《HDFS源码分析心跳汇报之数据块增量汇报》一文中,我们详细介绍了数据块增量汇报的内容,了解到它是时间间隔更长的正常数据块汇报周期内一个smaller的数据块汇报,它负责将DataNode上数据块的变化情况及时汇报给NameNode。那么,时间间隔更长的正常数据块汇报都做了些什么呢?本文,我们将开始研究下时间间隔更长的正常数据块汇报。

首先,看下正常数据块汇报是如何发起的?我们先看下BPServiceActor工作线程的offerService()方法:

  /*** Main loop for each BP thread. Run until shutdown,* forever calling remote NameNode functions.*/private void offerService() throws Exception {//// Now loop for a long time....//while (shouldRun()) {// 又是一个利用shouldRun()判断的while循环try {// 省略部分代码...// 调用blockReport()方法,进行数据块汇报,放返回来自名字节点NameNode的相关命令cmdsList<DatanodeCommand> cmds = blockReport();// 调用processCommand()方法处理来自名字节点NameNode的相关命令cmdsprocessCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));// 省略部分代码//// There is no work to do;  sleep until hearbeat timer elapses, // or work arrives, and then iterate again.// 计算等待时间waitTime:心跳时间间隔减去上次心跳后截至到现在已过去的时间long waitTime = dnConf.heartBeatInterval - (Time.now() - lastHeartbeat);synchronized(pendingIncrementalBRperStorage) {if (waitTime > 0 && !sendImmediateIBR) {// 如果等待时间大于0,且不是立即发送数据块增量汇报try {// 利用pendingIncrementalBRperStorage进行等待,并加synchronized关键字进行同步pendingIncrementalBRperStorage.wait(waitTime);} catch (InterruptedException ie) {LOG.warn("BPOfferService for " + this + " interrupted");}}} // synchronized} catch(RemoteException re) {
<pre name="code" class="java">       // 省略部分代码

} catch (IOException e) {

        // 省略部分代码

} } // while (shouldRun())          可以看出,在BPServiceActor工作线程offerService()方法的while循环内,数据块汇报blockReport()方法执行时,仅有下面的waitTime的等待时间,其他情况下都是立即执行的。那么等待时间waitTime是如何计算的呢?它就是心跳时间间隔减去上次心跳后截至到现在已过去的时间,并且,如果等待时间waitTime大于0,且不是立即发送数据块增量汇报(标志位sendImmediateIBR为false),那么才会利用pendingIncrementalBRperStorage进行等待,并加synchronized关键字进行同步。在这里,我们就可以大胆猜测,数据块汇报的时间间隔应该是大于心跳时间间隔的,并且两者之间的距离肯定不小。

那么,我们开始研究实现正常数据块汇报的blockReport()方法吧,代码如下:

  /*** Report the list blocks to the Namenode* @return DatanodeCommands returned by the NN. May be null.* @throws IOException*/List<DatanodeCommand> blockReport() throws IOException {// send block report if timer has expired.// 到期就发送数据块汇报// 取当前开始时间startTimefinal long startTime = now();// 如果当前时间startTime减去上次数据块汇报时间小于数据节点配置的数据块汇报时间间隔的话,直接返回null,// 数据节点配置的数据块汇报时间间隔取参数dfs.blockreport.intervalMsec,参数未配置的话默认为6小时if (startTime - lastBlockReport <= dnConf.blockReportInterval) {return null;}// 构造数据节点命令ArrayList列表cmds,存储数据块汇报返回的命令DatanodeCommandArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();// Flush any block information that precedes the block report. Otherwise// we have a chance that we will miss the delHint information// or we will report an RBW replica after the BlockReport already reports// a FINALIZED one.// 调用reportReceivedDeletedBlocks()方法发送数据块增量汇报reportReceivedDeletedBlocks();// 记录上次数据块增量汇报时间lastDeletedReportlastDeletedReport = startTime;// 设置数据块汇报起始时间brCreateStartTime为当前时间long brCreateStartTime = now();// 从数据节点DataNode根据线程对应块池ID获取数据块汇报集合perVolumeBlockLists,// key为数据节点存储DatanodeStorage,value为数据节点存储所包含的Long类数据块数组BlockListAsLongsMap<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());// Convert the reports to the format expected by the NN.int i = 0;int totalBlockCount = 0;// 创建数据块汇报数组StorageBlockReport,大小为上述perVolumeBlockLists的大小StorageBlockReport reports[] =new StorageBlockReport[perVolumeBlockLists.size()];// 遍历perVolumeBlockListsfor(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {// 取出value:BlockListAsLongsBlockListAsLongs blockList = kvPair.getValue();// 将BlockListAsLongs封装成StorageBlockReport加入数据块汇报数组reports,// StorageBlockReport包含数据节点存储DatanodeStorage和其上数据块数组reports[i++] = new StorageBlockReport(kvPair.getKey(), blockList.getBlockListAsLongs());// 累加数据块数目totalBlockCounttotalBlockCount += blockList.getNumberOfBlocks();}// Send the reports to the NN.int numReportsSent;long brSendStartTime = now();// 根据数据块总数目判断是否需要多次发送消息if (totalBlockCount < dnConf.blockReportSplitThreshold) {// 如果数据块总数目在split阈值之下,则将所有的数据块汇报信息放在一个消息中发送// split阈值取参数dfs.blockreport.split.threshold,参数未配置的话默认为1000*1000// Below split threshold, send all reports in a single message.// 发送的数据块汇报消息数numReportsSent设置为1numReportsSent = 1;// 通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息DatanodeCommand cmd =bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);// 将数据块汇报后返回的命令cmd加入到命令列表cmdsif (cmd != null) {cmds.add(cmd);}} else {// Send one block report per message.// 发送的数据块汇报消息数numReportsSent设置为1numReportsSent = i;// 遍历reports,取出每个StorageBlockReportfor (StorageBlockReport report : reports) {StorageBlockReport singleReport[] = { report };// 通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息DatanodeCommand cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), singleReport);// 将数据块汇报后返回的命令cmd加入到命令列表cmdsif (cmd != null) {cmds.add(cmd);}}}// Log the block report processing stats from Datanode perspective// 计算数据块汇报耗时并记录在日志Log、数据节点Metrics指标体系中long brSendCost = now() - brSendStartTime;long brCreateCost = brSendStartTime - brCreateStartTime;dn.getMetrics().addBlockReport(brSendCost);LOG.info("Sent " + numReportsSent + " blockreports " + totalBlockCount +" blocks total. Took " + brCreateCost +" msec to generate and " + brSendCost +" msecs for RPC and NN processing. " +" Got back commands " +(cmds.size() == 0 ? "none" : Joiner.on("; ").join(cmds)));// 调用scheduleNextBlockReport()方法,调度下一次数据块汇报scheduleNextBlockReport(startTime);// 返回命令cmdsreturn cmds.size() == 0 ? null : cmds;}

数据块汇报的blockReport()方法处理流程大体如下:

1、取当前开始时间startTime;

2、如果当前时间startTime减去上次数据块汇报时间小于数据节点配置的数据块汇报时间间隔的话,直接返回null:

数据节点配置的数据块汇报时间间隔取参数dfs.blockreport.intervalMsec,参数未配置的话默认为6小时;

3、构造数据节点命令ArrayList列表cmds,存储数据块汇报返回的命令DatanodeCommand;

4、调用reportReceivedDeletedBlocks()方法发送数据块增量汇报;

5、记录上次数据块增量汇报时间lastDeletedReport;

6、设置数据块汇报起始时间brCreateStartTime为当前时间;

7、从数据节点DataNode根据线程对应块池ID获取数据块汇报集合perVolumeBlockLists:

key为数据节点存储DatanodeStorage,value为数据节点存储所包含的Long类数据块数组BlockListAsLongs;

8、创建数据块汇报数组StorageBlockReport,大小为上述perVolumeBlockLists的大小;

9、取出value:BlockListAsLongs:

9.1、取出value:BlockListAsLongs;

9.2、将BlockListAsLongs封装成StorageBlockReport加入数据块汇报数组reports,StorageBlockReport包含数据节点存储DatanodeStorage和其上数据块数组;

9.3、累加数据块数目totalBlockCount;

10、根据数据块总数目判断是否需要多次发送消息:

10.1、如果数据块总数目在split阈值之下,则将所有的数据块汇报信息放在一个消息中发送(split阈值取参数dfs.blockreport.split.threshold,参数未配置的话默认为1000*1000):

10.1.1、发送的数据块汇报消息数numReportsSent设置为1;

10.1.2、通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息;

10.1.3、将数据块汇报后返回的命令cmd加入到命令列表cmds;

10.2、如果数据块总数目在split阈值之上,将数据块汇报按照DatanodeStorage分多个消息来发送:

10.2.1、发送的数据块汇报消息数numReportsSent设置为i,即DatanodeStorage数目;

10.2.2、遍历reports,取出每个StorageBlockReport:

10.2.2.1、通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息;

10.2.2.2、将数据块汇报后返回的命令cmd加入到命令列表cmds;

11、计算数据块汇报耗时并记录在日志Log、数据节点Metrics指标体系中;

12、调用scheduleNextBlockReport()方法,调度下一次数据块汇报;

13、返回命令cmds。

HDFS源码分析心跳汇报之数据块汇报相关推荐

  1. HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程

    在<HDFS源码分析心跳汇报之数据结构初始化>一文中,我们了解到HDFS心跳相关的BlockPoolManager.BPOfferService.BPServiceActor三者之间的关系 ...

  2. HDFS源码分析心跳汇报之数据结构初始化

    在<HDFS源码分析心跳汇报之整体结构>一文中,我们详细了解了HDFS中关于心跳的整体结构,知道了BlockPoolManager.BPOfferService和BPServiceActo ...

  3. HDFS源码分析DataXceiver之整体流程

    在<HDFS源码分析之DataXceiverServer>一文中,我们了解到在DataNode中,有一个后台工作的线程DataXceiverServer.它被用于接收来自客户端或其他数据节 ...

  4. Spark 源码分析之ShuffleMapTask内存数据Spill和合并

    Spark 源码分析之ShuffleMapTask内存数据Spill和合并 更多资源分享 SPARK 源码分析技术分享(视频汇总套装视频): https://www.bilibili.com/vide ...

  5. hdfs源码分析第二弹

    以写文件为例,串联整个流程的源码: FSDataOutputStream out = fs.create(outFile); 1. DistributedFileSystem 继承并实现了FileSy ...

  6. v70.05 鸿蒙内核源码分析(管道文件) | 如何降低数据流动成本 | 百篇博客分析OpenHarmony源码

    子曰:"其身正,不令而行:其身不正,虽令不从." <论语>:子路篇 百篇博客系列篇.本篇为: v70.xx 鸿蒙内核源码分析(管道文件篇) | 如何降低数据流动成本 文 ...

  7. Hadoop源码分析笔记(十一):数据节点--数据节点整体运行

    数据节点整体运行 数据节点通过数据节点存储和文件系统数据集,管理着保存在Linux文件系统上的数据块,通过流式接口提供数据块的读.写.替换.复制和校验信息等功能.建立在上述基础上的数据节点,还需要维护 ...

  8. Launcher3源码分析(LauncherModel加载数据)

    LauncherModel继承BroadcastReceiver,显然是一个广播接收者.在上一篇Launcher的启动中讲到桌面数据的加载工作是在LauncherModel中执行的,那么它是如何加载数 ...

  9. hdfs源码分析第一弹

    1. hdfs定义 HDFS is the primary distributed storage used by Hadoop applications. A HDFS cluster primar ...

  10. Launcher3源码分析 — 将Workspace的数据与界面绑定

    在数据从数据库加载到内存之后,接下来的工作就是把这些数据与launcher的UI视图绑定.绑定的过程在LauncherModel.bindWorkspace()方法中完成,在这个方法中会调用回调接口C ...

最新文章

  1. linux实现时间服务器视频,linux实现时间同步有哪几种方法
  2. P3605 [USACO17JAN]Promotion Counting晋升者计数
  3. php错误以及常用笔记
  4. 深圳店匠笔试题-4.01
  5. java计数器策略模式_java设计模式(二十一)--策略模式
  6. 自动提示_EXCEL2013版突然打不开,自动修复提示1907错误
  7. GP学习(四)—Running a geoprocessing tool using background geoprocessing
  8. python第三方库文件传输助手_Python中的第三方模块(itchat)
  9. 虚拟机讲只读文件变为可读可写文件_Linux虚拟机文件系统突然变成只读
  10. 21天 Jenkins打卡-Day1 环境准备
  11. dax powerbi 生成表函数_如何用Power BI对数据建模?
  12. [转载] 面试常见问题总结
  13. 论文笔记_S2D.63_2020-ICRA_LiStereo:从雷达和双目立体图像生成稠密深度图
  14. 面试题 MySQL的慢查询、如何监控、如何排查?
  15. Pandas+Pyecharts | 医院药品销售数据可视化
  16. p12解析流程_解析P12证书 | 学步园
  17. 编码:隐匿在计算机软硬件背后的语言
  18. mysql日期转季节_JavaScript_day02_15_查看一年四季变化案例说明
  19. @修改用户(user)信息
  20. 注册华为云用户: 访问官网 https://huaweicloud.com/ 注册华为云用户(需手机号验证) 登录并完成实名认证 为账号充值不少于100元(不用时可提现

热门文章

  1. 【Hibernate】【简单使用+四大对象】
  2. 阿里云搭建大数据平台(4):Yarn配置以及日志聚合以及优化
  3. 饭店小 姐的幽默 超搞笑
  4. 使用python将字符拼成图画
  5. Unity物理引擎中的碰撞、角色控制器、Cloth组件(布料)、关节 Joint
  6. Java8新特性笔记--波哥带你学JAVA--接口中的新增
  7. linux 摘挂节点,一种令克摘挂装置的使用方法与流程
  8. 根据图片完整路径,获得图片的宽和高,判断是横版还是竖版图片
  9. win10系统CH340模块连接上电脑后端口显示叹号而无法使用
  10. 机器人走路动作_CSS3 很棒的机器人行走(步行)动画模拟