背景

本文基于SPARK 3.2.1
用来更好的理解spark shuffle中的点点滴滴

分析

  1. 我们直接从SortShuffleManager着手,因为这是个shuffle的纽带:
override def registerShuffle[K, V, C](shuffleId: Int,dependency: ShuffleDependency[K, V, C]): ShuffleHandle {
...override def getReader[K, C](handle: ShuffleHandle,startMapIndex: Int,endMapIndex: Int,startPartition: Int,endPartition: Int,context: TaskContext,metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {...override def getWriter[K, V](handle: ShuffleHandle,mapId: Long,context: TaskContext,metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
...

跟shuffle紧密关联的是这三个方法,

  • 其中registerShuffle方法是在ShuffleDependency实例构建出来的时候机会被调用:
 val shuffleId: Int = _rdd.context.newShuffleId()val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId, this)

其中shuffleId 是全局单调递增的,(其实这是为了一次shuffle的标示,是下游task获取上游task数据的label)

  • getWriter方法是ShuffleMapTask RunTask中被调用的:
ShuffleMapTask.runTask||\/
dep.shuffleWriterProcessor.write||\/
writer = manager.getWriter[Any, Any](handle: ShuffleHandle,

这里会根据已经注册好的shuffleHandle来获取对应的writer

  • getReader方法下游task读取shuffle数的时候被调用的:
ShuffledRowRDD.compure||\/
SparkEnv.get.shuffleManager.getReader(dependency.shuffleHandle,..)||\/
val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)

这里会根据已经注册好的shuffleHandle来获取对应的reader,这里会调用getMapSizesByExecutorId最终根据shuffleId来获取(Array[MapStatus], Array[MergeStatus])
这里包括了所有的MapStatus和MergeStatus,这样reader就能根据策略来向不同的blockManager发送shuffle fetch请求,以保证远程Executor负载均衡。
具体的写操作和读操作细节,读者可以去自己细看代码,我们这里只说一些总体的数据流思路。

  1. 再来看BlockManager,BlockManager是每个Executor都会有的,在SparkEnv创建的时候就会创建,用来管理数据块的存储的,
    其中shuffle 数据的读取和写入都是和他有关联的。
    分析一下BlockManager的跟shuffle有关的重要方法:
  private[spark] val blockStoreClient = externalBlockStoreClient.getOrElse(blockTransferService)def initialize(appId: String): Unit = {...val id =BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)blockManagerId = if (idFromMaster != null) idFromMaster else idshuffleServerId = if (externalShuffleServiceEnabled) {logInfo(s"external shuffle service port = $externalShuffleServicePort")BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)} else {blockManagerId}if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {registerWithExternalShuffleServer()...}...}
  • blockStoreClient 变量,用来读取其他Executor的Blocks文件的,也就是shuffle数据真正去读数的组件,
    这在创建BlockManager的时候,如果开启ESS的话就会创建的:

    val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {...
    

    如果没开启ESS的话,就用自带的BlockTransferService。

  • shuffleServerId也就是blockManagerId,会在Executor创建的时候初始化,
    如果开启ESS,端口就是spark.shuffle.service.port,默认7337,否则就是spark.blockManager.port,默认是随机端口:

Executor中env.blockManager.initialize(conf.getAppId)||\/
registerWithExternalShuffleServer()

registerWithExternalShuffleServer这个方法是用来注册ESS的(如果开启ESS的情况下):

val shuffleConfig = new ExecutorShuffleInfo(diskBlockManager.localDirsString,diskBlockManager.subDirsPerLocalDir,shuffleManagerMeta)// Synchronous and will throw an exception if we cannot connect.blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer(shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)

注册的信息包括block本地磁盘的位置,以及shuffleManagerMeta信息,注意如果这里开启push-based shuffle Server的话,就返回的是merge的路径,
否则返回sortShuffleManager的类名。
至于具体向哪个shuffle server实例注册,就是从shuffleServerId中获取的。 之后向对应的ESS发送RegisterExecutor消息
再来看YarnShuffleService对RegisterExecutor消息的回应(实际上是ExternalBlockHandler来处理的):

 else if (msgObj instanceof RegisterExecutor) {final Timer.Context responseDelayContext =metrics.registerExecutorRequestLatencyMillis.time();try {RegisterExecutor msg = (RegisterExecutor) msgObj;checkAuth(client, msg.appId);blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);mergeManager.registerExecutor(msg.appId, msg.executorInfo);callback.onSuccess(ByteBuffer.wrap(new byte[0]));} finally {responseDelayContext.stop();}

其中blockManager是把逻辑shuffle block转换为实际物理存储的组件,这里的注册就是blockManager对物理文件(LocalDirs等)做映射关系
mergeManager就是push based shuffle Manager进行文件merge的组件,也就就是把merge的路径以及物理文件(LocalDirs等给注册上去,便于后续获取shuffle文件。

注意
blockStoreClient是Executor 存在的的时候才会有的组件,因为是去拉取shuffle数据;
而通过registerWithExternalShuffleServer注册的ESS组件是,可以在Executor不存在的时候提供服务的。这两者是有区别的。

至此SPARK SHUFFLE简单的流程就是这样了。

SPARK SHUFFLE中 ShuffleId BlockManagerId 以及 与ESS(External Shuffle Server)交互相关推荐

  1. 第31课:彻底解密Spark 2.1.X中Shuffle中内存管理源码解密:StaticMemory和UnifiedMemory

    第31课:彻底解密Spark 2.1.X中Shuffle中内存管理源码解密:StaticMemory和UnifiedMemory 大数据的事情只需关注2个平台:spark,tensorFlow(图像, ...

  2. 第36课:kaishi 彻底解密Spark 2.1.X中Sort Shuffle中Reducer端源码内幕

    第36课:kaishi 彻底解密Spark 2.1.X中Sort Shuffle中Reducer端源码内幕 本文根据家林大神系列课程编写 http://weibo.com/ilovepains 本课讲 ...

  3. 第34课:彻底解密Spark 2.1.X中Shuffle 中SortShuffleWriter排序源码内幕解密

    第34课:彻底解密Spark 2.1.X中Shuffle 中SortShuffleWriter排序源码内幕解密 本文根据家林大神系列课程编写 http://weibo.com/ilovepains S ...

  4. 第33课:彻底解密Spark 2.1.X中Shuffle 中Mapper端的源码实现

    第33课:彻底解密Spark 2.1.X中Shuffle 中Mapper端的源码实现 本文根据家林大神系列课程编写 http://weibo.com/ilovepains Spark是MapReduc ...

  5. spark代码中添加logger_Spark RDD中Runtime流程解析

    一.Runtime架构图 (1)从Spark Runtime的角度讲,包括五大核心对象:Master.Worker.Executor.Driver.CoarseGrainedExecutorBacke ...

  6. 解决数据倾斜一:RDD执行reduceByKey或则Spark SQL中使用group by语句导致的数据倾斜

    一:概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时Spark作业的性能会比期望差很多.数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的 ...

  7. spark ml中一个比较通用的transformer

    spark ml中有许多好用的transformer,很方便用来做特征的处理,比如Tokenizer, StopWordsRemover等,具体可参看文档:http://spark.apache.or ...

  8. Spark Streaming中的操作函数分析

    参考文章:http://blog.csdn.net/dabokele/article/details/52602412 根据Spark官方文档中的描述,在Spark Streaming应用中,一个DS ...

  9. Spark SQL中出现 CROSS JOIN 问题解决

    Spark SQL中出现 CROSS JOIN 问题解决 参考文章: (1)Spark SQL中出现 CROSS JOIN 问题解决 (2)https://www.cnblogs.com/yjd_hy ...

最新文章

  1. “平头哥”半导体公司
  2. Windows Server 2012 r2 显示计算机图标
  3. 人参中第一次膜你退货
  4. ASP.NET基础教程-DataGrid表格控件-模板列的使用
  5. html5水调歌头代码,张惠言的五首《水调歌头》
  6. java集合体检套餐管理系统_体检套餐管理系统
  7. oracle频,Oracle动作频频 Java或浴火重生
  8. transformer 中的注意力机制和胶囊网络中的动态路由:它们在本质上或许具有相似性
  9. KindEditor在eclipse里的配置方法
  10. 使用SpringMVC搭建第一个项目
  11. 拼多多发单软件使用教程永久免费
  12. c语言吃豆豆游戏,高手帮我改下我的吃豆豆游戏吧
  13. NAACL 2022 | TAMT:通过下游任务无关掩码训练搜索可迁移的BERT子网络
  14. 多媒体计算机软件按功能分为,多媒体软件可分为
  15. 100个精彩的开源游戏
  16. android studio代码格式化设置,Android studio kotlin代码格式化操作
  17. 【审稿意见】科研菜鸟如何攥写审稿意见?万能模板!!!
  18. 概率论与数理统计——多方法解决-双样本方差的F检验-Excel/SPSS
  19. Nature Neuroscience综述:网络神经系统中的动态表征
  20. linux u盘空间越小 写入越慢,解决U盘容量变小问题

热门文章

  1. oracle mysql查询速度慢_oracle 根据时间范围查询缓慢问题排查解决
  2. 为什么祖传代码会被称为「屎山」?
  3. 建站宝超级站群版 v1.4
  4. mysql rollback作用_innodb_rollback_on_timeout的作用
  5. 北京理工大学计算机保研面试,保研之旅2:北京理工大学雷达所面试
  6. 吉林大学计算机保研到,吉林大学 保研到清北
  7. Markdown输入数学公式
  8. 深度式睡眠潜入虚拟世界_潜入swiftui的惊人世界
  9. 进服务器网站报错应用程序,IIS:应用程序池中的服务器错误、超时时间已到
  10. 苹果手机测试网络速度的软件,‎App Store 上的“网速测试大师-测网速首选”