前言

最近在CSDN的首页上看到了hadoop十周年的文章,不禁感慨这真是一个伟大的系统啊.在这十年间,hadoop自身进行了许多演化和大的改变,而在其下,也孵化出了许多子项目,围绕着hadoop的生态圈现在变得越来越丰富了.所以作为一个出色的分布式系统,他有很多地方值得我们学习,最近本人在研究DataXceiver方面的代码,此篇文章算是这几天学习的一个总结吧.

为什么选择学习DataXceiver?

我们从大的层面往小说,你就知道他有多重要了.我们使用Hadoop系统,最看重的是什么,2个字,存储,存储的过程中,什么又是最看着的呢,那当然是数据了.而这些数据都是存在于各个DataNode之上的.所以掌握了解DataNode的读写操作原理就显得尤为重要了.而这个控制中心就在DataXceiver中.

DataXceiver的定义

DataXceiver是干什么用的呢,很多人只知DataNode,而不知另外一个很重要的线程DataXceiver.在Hadoop中对于DataXceiver中的注释解释如下:

/*** Thread for processing incoming/outgoing data stream.*/
class DataXceiver extends Receiver implements Runnable {...

中文大意为"处理输入/输出数据流的线程".我的个人理解就是数据流的处理中心. DataXceiver线程数的多少在一定程度上能反映出一个节点的忙碌程度.DataXceiver这个类中包含的变量和方法还是比较多的,我不大建议读者逐行的去详细的阅读内部的代码.我们去学习一个机制,原理的时候,主要去明白的是结构.比如我们现在要去学习DataXceiver这个类, 我们的目标是去了解这个类中主要做了哪些操作,上游被哪些对象调用,下游又调用了哪些类,具体的代码细节等碰到具体的问题时再去分析即可,否则可能会被里面复杂的逻辑绕晕,毕竟这是一个成熟的分布式的程序,不是一时半会能够立刻理解的.

DataXceiver的结构

为了我们更好的去理解这个"数据处理中心",我们需要去了解这个类的整体结构,在此之前不妨去了解一下其中的内部方法:

首先,这是一个线程服务,执行入口一定是run方法,执行run方法,就可以找到与这些方法相关的联系.

/*** Read/write data from/to the DataXceiverServer.*/@Overridepublic void run() {int opsProcessed = 0;Op op = null;...// We process requests in a loop, and stay around for a short timeout.// This optimistic behaviour allows the other end to reuse connections.// Setting keepalive timeout to 0 disable this behavior.do {updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));try {if (opsProcessed != 0) {assert dnConf.socketKeepaliveTimeout > 0;peer.setReadTimeout(dnConf.socketKeepaliveTimeout);} else {peer.setReadTimeout(dnConf.socketTimeout);}op = readOp();} catch (InterruptedIOException ignored) {// Time out while we wait for client rpcbreak;} catch (IOException err) {// Since we optimistically expect the next op, it's quite normal to get EOF here.if (opsProcessed > 0 &&(err instanceof EOFException || err instanceof ClosedChannelException)) {if (LOG.isDebugEnabled()) {LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");zhu}} else {incrDatanodeNetworkErrors();throw err;}break;}// restore normal timeoutif (opsProcessed != 0) {peer.setReadTimeout(dnConf.socketTimeout);}opStartTime = monotonicNow();processOp(op);++opsProcessed;} while ((peer != null) &&(!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));...

在run方法中间的主循环方法中,可以看到1个readOp,对应的1个processOp.Op对应的意思是操作码.readOp会从输入流中读取操作码:

/** Read an Op.  It also checks protocol version. */protected final Op readOp() throws IOException {final short version = in.readShort();if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {throw new IOException( "Version Mismatch (Expected: " +DataTransferProtocol.DATA_TRANSFER_VERSION  +", Received: " +  version + " )");}return Op.read(in);}

而processOp则会进行判断处理:

  /** Process op by the corresponding method. */protected final void processOp(Op op) throws IOException {switch(op) {case READ_BLOCK:opReadBlock();break;case WRITE_BLOCK:opWriteBlock(in);break;case REPLACE_BLOCK:opReplaceBlock(in);break;case COPY_BLOCK:opCopyBlock(in);break;case BLOCK_CHECKSUM:opBlockChecksum(in);break;case TRANSFER_BLOCK:opTransferBlock(in);break;case REQUEST_SHORT_CIRCUIT_FDS:opRequestShortCircuitFds(in);break;case RELEASE_SHORT_CIRCUIT_FDS:opReleaseShortCircuitFds(in);break;case REQUEST_SHORT_CIRCUIT_SHM:opRequestShortCircuitShm(in);break;default:throw new IOException("Unknown op " + op + " in data stream");}}

总共9种类型,对应着9种处理方法.到此,DataXceiver的基本结构慢慢清晰了,可以用下面的一张图展示:

左上方的Sender是什么意思在后面会解释,可以先忽略.

DataXceiver下游处理方法

从上一小节中的结构图已经看到了处理相应码的9个方法另加2个response回复方法.这个9个方法可以大致分为2大类方法:

1.普通读写block块操作方法.

划分到普通读写block块方法的有readBlock,writeBlock,transferBlock,copyBlock,replaceBlock,blockChecksum,剩下的待ShortCircuit的方法就是属于shortCircuit读相关的方法.下面对具体的这些方法做场景分析.

1.readBlock

方法名就已经体现了这个方法的操作了,自然是读取block信息操作,一般用于远程读或或者本地读操作.

2.writeBlock

写block块操作,将参数传入的数据块写入目标节点列表中.

3.transferBlock

传输指定副本到目标节点列表中,官方注释如下:

Transfer a replica to the datanode targets.

4.copyBlock

拷贝块信息数据,与readBlock原理类似,都用到了BlockSender.send方法.

5.replaceBlock

replaceBlock在DataXceiver中更接近的意思其实是moveBlock,此操作一般会在数据Balance的时候会做.

6.blockChecksum

从文件元信息头部读取校验和数据.

HDFS中的ShortCircuit读机制

这里要特地将shortCircuit读的几个方法单独分到一个模块中,因为shortCircuit读机制是HDFS在后面的版本中才引入的概念,可能有些人还不了解,这里给大家普及一下这方面的知识.

ShortCircuit的缘来

在早些时候,hadoop为了能让数据处理的更加的高效,都尽可能的让数据维持在本地,以此来避免大量的远程读操作,本地读的专业术语就是"Local Read".但是渐渐的到了后面,尽管本地读的比例确实提升了,但是好像还不是最优.因为虽说数据是在本地,但是每次客户端读取数据,还是需要走DataNode这一层,在其间还是会走网络通信的1块,能不能以类似于直接读取本地文件系统的方式去读本地的数据,而shortCircuit读就是源自于这个想法而诞生的.

shortCircuit本地读的实现

shortCircuit读俗称"短路读",后来采用了Linux操作系统中一种计数来实现这个功能,"Unix Domain Socket".他是一种进程间通信的方式,他很重要的一点是可以在进程间传递文件描述符,借此来进行进程间的通信.关于shortCircuit本地读的更细节的文章可以读此原文How Improved Short-Circuit Local Reads Bring Better Performance and Security to Hadoop.

shortCircuit机制

在HDFS中用的是short-circuit memory segments来实现数据的读操作.DfsClient客户端通过shortCircuit实现本地读的简要过程如下:

1.DfsClient客户端从DataNode请求shared memory segments共享内存片段.

2.ShortCircuitRegistry注册对象会产生并管理这些内存对象对象.

3.在本地读之前,DfsClient客户端会向DataNode请求需要的文件描述符,对应的就是requestShortCircuitFds方法.

4.block块在此期间的状态跟踪用的是slot表示.

5.如果一次本度读数据完成之后,相应的会执行释放操作.

给出源码中的官方解释:

/*** Manages client short-circuit memory segments on the DataNode.** DFSClients request shared memory segments from the DataNode.  The * ShortCircuitRegistry generates and manages these segments.  Each segment* has a randomly generated 128-bit ID which uniquely identifies it.  The* segments each contain several "slots."** Before performing a short-circuit read, DFSClients must request a pair of* file descriptors from the DataNode via the REQUEST_SHORT_CIRCUIT_FDS* operation.  As part of this operation, DFSClients pass the ID of the shared* memory segment they would like to use to communicate information about this* replica, as well as the slot number within that segment they would like to* use.  Slot allocation is always done by the client.** Slots are used to track the state of the block on the both the client and* datanode. When this DataNode mlocks a block, the corresponding slots for the* replicas are marked as "anchorable".  Anchorable blocks can be safely read* without verifying the checksum.  This means that BlockReaderLocal objects* using these replicas can skip checksumming.  It also means that we can do* zero-copy reads on these replicas (the ZCR interface has no way of* verifying checksums.)* * When a DN needs to munlock a block, it needs to first wait for the block to* be unanchored by clients doing a no-checksum read or a zero-copy read. The * DN also marks the block's slots as "unanchorable" to prevent additional * clients from initiating these operations in the future.* * The counterpart of this class on the client is {@link DfsClientShmManager}.*/

DataXceiver的上游调用

DataXceiver的上游调用其实就是Op操作码的输入方,通过寻找Op,XX的调用位置可以找到都是来自于同一个对象类,Sende.其中输入Op.COPY_BLOCK的例子:

@Overridepublic void copyBlock(final ExtendedBlock blk,final Token<BlockTokenIdentifier> blockToken) throws IOException {OpCopyBlockProto proto = OpCopyBlockProto.newBuilder().setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)).build();send(out, Op.COPY_BLOCK, proto);}

剩余的8个方法均与DataXceiver中的相对应.现在就可以很好的解释上图中Sender存在的原因了,大家可以仔细理解一下.Sender对象虽然是操作码的直接传入类,但不是方法最初始的调用方,我们需要从这个点往上寻找,找到最开始的触发者.为了节省篇幅,直接给出结果:

最后Dispatcher类是用在Balancer操作中的.如上图所显示的,真正读写数据的发起方就是我们经常碰到的DfsClient,DfsOutputStream,BlcokReader这些对象类.这样的话,DataXceiver的上游以及下游处理就打通了.

DataXceiver与DataXceiverServer

提到DataXceiver,就不得不提DataXceiverServer.在DataXceiverServer会保存记录每次新启动的DataXceiver线程.在他的主循环方法中,会进行DataXceiver的创建

@Overridepublic void run() {Peer peer = null;while (datanode.shouldRun && !datanode.shutdownForUpgrade) {try {peer = peerServer.accept();// Make sure the xceiver count is not exceededint curXceiverCount = datanode.getXceiverCount();if (curXceiverCount > maxXceiverCount) {throw new IOException("Xceiver count " + curXceiverCount+ " exceeds the limit of concurrent xcievers: "+ maxXceiverCount);}new Daemon(datanode.threadGroup,DataXceiver.create(peer, datanode, this)).start();} catch (SocketTimeoutException ignored) {

随之会加入DataXceiverServer的2个map对象中:

  /*** Read/write data from/to the DataXceiverServer.*/@Overridepublic void run() {int opsProcessed = 0;Op op = null;try {dataXceiverServer.addPeer(peer, Thread.currentThread(), this);...
synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver)throws IOException {if (closed) {throw new IOException("Server closed.");}peers.put(peer, t);peersXceiver.put(peer, xceiver);}

所以DataXceiver与DataXceiverServer的关系图可用下面的关系结构表示:

补充

添加一下额外的补充,最近阅读了DataXceiver的源码,发现里面的代码比较乱,多处异常日志级别输出不准确,都是INFO级别,不利于发现异常日志记录,于是向社区提交Issue, HDFS-9727.

相关链接

Issue链接: https://issues.apache.org/jira/browse/HDFS-9727

Github patch链接: https://github.com/linyiqun/open-source-patch/tree/master/hdfs/HDFS-9727

DataNode数据处理中心DataXceiver相关推荐

  1. 数据中心网络故障处理_数据处理中心或数据中心

    数据中心网络故障处理 数据处理中心或数据中心 (Data Processing Center Or Data Center) A Data Center or Data Processing Cent ...

  2. HDFS精华文章汇总

    前言 自2015年下半年起,笔者开始写关于Hadoop的文章(主要集中在HDFS),包括源码分析类的,问题分析解决又或者是内部机制剖析.这些文章目前汇总数量已经达到70+篇.这些文章对于笔者来说是一个 ...

  3. 面对不同用户,数据中心如何将服务做到极致

    数据中心是一个面向特定人群提供服务的数据处理中心,当然,向互联网的数据中心往往受众面要广一些,几乎所有人都可以访问,并享受到优质服务.一个数据中心对外提供的服务质量直接决定了其未来发展的潜力,所以每个 ...

  4. 骄阳似火 细数史上数据中心火灾 如何才能重蹈覆辙?

    骄阳似火的6月,北京亦庄某数据中心机房柴油机发生一起起火事故,所幸并未出现去年多家金融机构设备大规模宕机.本次起火事件,缘起亦庄地区发布的30天限电令,当地不少数据中心开始使用柴油发电机进行带载. 亦 ...

  5. 天猫双11期间,阿里数据中心将由机器人完成巡检

    11月1日,阿里巴巴集团宣布,天猫双11期间,华北数据中心的巡检工作将由智能运维机器人"阿里巴巴天巡"完成. 天巡是目前互联网数据中心精度最高的智能运维机器人.届时,天巡将不分昼夜 ...

  6. 数据中心发电机组的选择及控制

    随着互联网技术的广泛应用,世界各地兴建了越来越多的数据中心.数据中心不仅要提供大量的数据信息存储空间,更要保证其中存储数据的安全.在保证数据中心安全的各环节中,电源供应是非常重要的一环.除了通常有两路 ...

  7. 数据中心监控管理系统设计(之一)

    数据中心监控管理系统设计 3.1引言 数据中心经历了四个功能阶段的发展和演进,从早期的"数据存储中心"阶段,经过"数据处理中心"和"数据应用中心&qu ...

  8. 高程数据处理_珠峰长高了吗?新高程怎么算出来的?揭秘

    珠穆朗玛峰是世界上海拔最高的山峰,被称为世界第三极,是亚洲的水塔.12月8日,珠穆朗玛峰最新高程公布,为8848.86米.珠峰长高了吗?怎么测算出来的?和以往的测量相比,"新"在哪 ...

  9. 苹果将投资10亿美元扩容位于美国雷诺的数据中心

    援引当地媒体Reno-Gazette Journal报道,位于美国内华达州西部城雷诺(Reno)的苹果数据中心将大规模扩张.近日雷诺市政厅对该苹果扩容项目进行了讨论,该媒体记者 Anjeanette ...

最新文章

  1. 如何才能知道TCP网络端口是否打开?
  2. flowable工作流 流程变量_Activiti工作流的应用示例
  3. Java笔记(十九) 反射
  4. 逻辑表达式——黑纸白纸
  5. 量子计算机怎么编程,量子计算机编程(一)——QPU编程
  6. 人工智能ai 学习_人工智能中强化学习的要点
  7. 大数据技术之 Kafka (第 1 章 Kafka 概述)
  8. linux导出表数据dmp,导出表的部分数据到dmp文件中
  9. 我的世界基岩版种子和java版种子_我的世界:对萌新最友好的种子,基岩版通用,对老玩家也很适合!...
  10. “差不多先生”姚劲波和不再神奇的58同城
  11. Strom 消息可靠性保障机制和Ack原理
  12. ansys轴对称模型之二维模型
  13. SQLException: Invalid value for getInt() - ‘XXX‘
  14. 爱因斯坦广义相对论: 质量/能量物体是如何影响时空的?
  15. 人工智能项目的伦理审查
  16. 排球分组循环交叉编排_第九届“理工杯”学生排球比赛正式拉开帷幕
  17. python实践3——利用爬虫爬取“广州各大行业微信群二维码信息”及存入数据到MySQL数据库
  18. 开源开放 | 中药说明书实体识别数据集TCM-NER
  19. 视频教程-Angular+Django前后端分离实战项目开发教程-AngularJS
  20. 接了个私活,甲方竟然让我教他写代码!

热门文章

  1. 小学计算机年考试题目,(完整版)小学四年级信息技术期末考试试题及答案
  2. 奥特能平台加持 别克Electra E5三电技术解析
  3. pandas中的read_csv参数详解
  4. 阿里云 OSS 静态网站托管
  5. 【面经】广联达-C++软件开发工程师
  6. 毕业论文计算机专业3500字,关于计算机专业论文开题报告格式
  7. 大数据中的反欺诈,平台与羊毛党的攻防恶战
  8. 实验报告总结_精选化学教学工作总结范文(精选5篇)
  9. system pause 的头文件
  10. Windows 实验总结