SPARK SHUFFLE中 ShuffleId BlockManagerId 以及 与ESS(External Shuffle Server)交互
背景
本文基于SPARK 3.2.1
用来更好的理解spark shuffle中的点点滴滴
分析
- 我们直接从SortShuffleManager着手,因为这是个shuffle的纽带:
override def registerShuffle[K, V, C](shuffleId: Int,dependency: ShuffleDependency[K, V, C]): ShuffleHandle {
...override def getReader[K, C](handle: ShuffleHandle,startMapIndex: Int,endMapIndex: Int,startPartition: Int,endPartition: Int,context: TaskContext,metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {...override def getWriter[K, V](handle: ShuffleHandle,mapId: Long,context: TaskContext,metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
...
跟shuffle紧密关联的是这三个方法,
- 其中
registerShuffle
方法是在ShuffleDependency实例构建出来的时候机会被调用:
val shuffleId: Int = _rdd.context.newShuffleId()val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId, this)
其中shuffleId 是全局单调递增的,(其实这是为了一次shuffle的标示,是下游task获取上游task数据的label)
getWriter
方法是ShuffleMapTask RunTask中被调用的:
ShuffleMapTask.runTask||\/
dep.shuffleWriterProcessor.write||\/
writer = manager.getWriter[Any, Any](handle: ShuffleHandle,
这里会根据已经注册好的shuffleHandle来获取对应的writer
getReader
方法下游task读取shuffle数的时候被调用的:
ShuffledRowRDD.compure||\/
SparkEnv.get.shuffleManager.getReader(dependency.shuffleHandle,..)||\/
val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
这里会根据已经注册好的shuffleHandle来获取对应的reader,这里会调用getMapSizesByExecutorId最终根据shuffleId来获取(Array[MapStatus], Array[MergeStatus])
这里包括了所有的MapStatus和MergeStatus,这样reader就能根据策略来向不同的blockManager发送shuffle fetch请求,以保证远程Executor负载均衡。
具体的写操作和读操作细节,读者可以去自己细看代码,我们这里只说一些总体的数据流思路。
- 再来看BlockManager,BlockManager是每个Executor都会有的,在SparkEnv创建的时候就会创建,用来管理数据块的存储的,
其中shuffle 数据的读取和写入都是和他有关联的。
分析一下BlockManager的跟shuffle有关的重要方法:
private[spark] val blockStoreClient = externalBlockStoreClient.getOrElse(blockTransferService)def initialize(appId: String): Unit = {...val id =BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)blockManagerId = if (idFromMaster != null) idFromMaster else idshuffleServerId = if (externalShuffleServiceEnabled) {logInfo(s"external shuffle service port = $externalShuffleServicePort")BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)} else {blockManagerId}if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {registerWithExternalShuffleServer()...}...}
blockStoreClient
变量,用来读取其他Executor的Blocks文件的,也就是shuffle数据真正去读数的组件,
这在创建BlockManager的时候,如果开启ESS的话就会创建的:val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {...
如果没开启ESS的话,就用自带的BlockTransferService。
shuffleServerId
也就是blockManagerId,会在Executor创建的时候初始化,
如果开启ESS,端口就是spark.shuffle.service.port
,默认7337,否则就是spark.blockManager.port
,默认是随机端口:
Executor中env.blockManager.initialize(conf.getAppId)||\/
registerWithExternalShuffleServer()
registerWithExternalShuffleServer
这个方法是用来注册ESS的(如果开启ESS的情况下):
val shuffleConfig = new ExecutorShuffleInfo(diskBlockManager.localDirsString,diskBlockManager.subDirsPerLocalDir,shuffleManagerMeta)// Synchronous and will throw an exception if we cannot connect.blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer(shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
注册的信息包括block本地磁盘的位置,以及shuffleManagerMeta信息,注意如果这里开启push-based shuffle Server的话,就返回的是merge的路径,
否则返回sortShuffleManager的类名。
至于具体向哪个shuffle server实例注册,就是从shuffleServerId中获取的。 之后向对应的ESS发送RegisterExecutor消息
再来看YarnShuffleService
对RegisterExecutor消息的回应(实际上是ExternalBlockHandler
来处理的):
else if (msgObj instanceof RegisterExecutor) {final Timer.Context responseDelayContext =metrics.registerExecutorRequestLatencyMillis.time();try {RegisterExecutor msg = (RegisterExecutor) msgObj;checkAuth(client, msg.appId);blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);mergeManager.registerExecutor(msg.appId, msg.executorInfo);callback.onSuccess(ByteBuffer.wrap(new byte[0]));} finally {responseDelayContext.stop();}
其中blockManager
是把逻辑shuffle block转换为实际物理存储的组件,这里的注册就是blockManager对物理文件(LocalDirs等)做映射关系
mergeManager
就是push based shuffle Manager进行文件merge的组件,也就就是把merge的路径以及物理文件(LocalDirs等给注册上去,便于后续获取shuffle文件。
注意:
blockStoreClient
是Executor 存在的的时候才会有的组件,因为是去拉取shuffle数据;
而通过registerWithExternalShuffleServer注册的ESS组件是,可以在Executor不存在的时候提供服务的。这两者是有区别的。
至此SPARK SHUFFLE简单的流程就是这样了。
SPARK SHUFFLE中 ShuffleId BlockManagerId 以及 与ESS(External Shuffle Server)交互相关推荐
- 第31课:彻底解密Spark 2.1.X中Shuffle中内存管理源码解密:StaticMemory和UnifiedMemory
第31课:彻底解密Spark 2.1.X中Shuffle中内存管理源码解密:StaticMemory和UnifiedMemory 大数据的事情只需关注2个平台:spark,tensorFlow(图像, ...
- 第36课:kaishi 彻底解密Spark 2.1.X中Sort Shuffle中Reducer端源码内幕
第36课:kaishi 彻底解密Spark 2.1.X中Sort Shuffle中Reducer端源码内幕 本文根据家林大神系列课程编写 http://weibo.com/ilovepains 本课讲 ...
- 第34课:彻底解密Spark 2.1.X中Shuffle 中SortShuffleWriter排序源码内幕解密
第34课:彻底解密Spark 2.1.X中Shuffle 中SortShuffleWriter排序源码内幕解密 本文根据家林大神系列课程编写 http://weibo.com/ilovepains S ...
- 第33课:彻底解密Spark 2.1.X中Shuffle 中Mapper端的源码实现
第33课:彻底解密Spark 2.1.X中Shuffle 中Mapper端的源码实现 本文根据家林大神系列课程编写 http://weibo.com/ilovepains Spark是MapReduc ...
- spark代码中添加logger_Spark RDD中Runtime流程解析
一.Runtime架构图 (1)从Spark Runtime的角度讲,包括五大核心对象:Master.Worker.Executor.Driver.CoarseGrainedExecutorBacke ...
- 解决数据倾斜一:RDD执行reduceByKey或则Spark SQL中使用group by语句导致的数据倾斜
一:概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时Spark作业的性能会比期望差很多.数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的 ...
- spark ml中一个比较通用的transformer
spark ml中有许多好用的transformer,很方便用来做特征的处理,比如Tokenizer, StopWordsRemover等,具体可参看文档:http://spark.apache.or ...
- Spark Streaming中的操作函数分析
参考文章:http://blog.csdn.net/dabokele/article/details/52602412 根据Spark官方文档中的描述,在Spark Streaming应用中,一个DS ...
- Spark SQL中出现 CROSS JOIN 问题解决
Spark SQL中出现 CROSS JOIN 问题解决 参考文章: (1)Spark SQL中出现 CROSS JOIN 问题解决 (2)https://www.cnblogs.com/yjd_hy ...
最新文章
- “平头哥”半导体公司
- Windows Server 2012 r2 显示计算机图标
- 人参中第一次膜你退货
- ASP.NET基础教程-DataGrid表格控件-模板列的使用
- html5水调歌头代码,张惠言的五首《水调歌头》
- java集合体检套餐管理系统_体检套餐管理系统
- oracle频,Oracle动作频频 Java或浴火重生
- transformer 中的注意力机制和胶囊网络中的动态路由:它们在本质上或许具有相似性
- KindEditor在eclipse里的配置方法
- 使用SpringMVC搭建第一个项目
- 拼多多发单软件使用教程永久免费
- c语言吃豆豆游戏,高手帮我改下我的吃豆豆游戏吧
- NAACL 2022 | TAMT:通过下游任务无关掩码训练搜索可迁移的BERT子网络
- 多媒体计算机软件按功能分为,多媒体软件可分为
- 100个精彩的开源游戏
- android studio代码格式化设置,Android studio kotlin代码格式化操作
- 【审稿意见】科研菜鸟如何攥写审稿意见?万能模板!!!
- 概率论与数理统计——多方法解决-双样本方差的F检验-Excel/SPSS
- Nature Neuroscience综述:网络神经系统中的动态表征
- linux u盘空间越小 写入越慢,解决U盘容量变小问题
热门文章
- oracle mysql查询速度慢_oracle 根据时间范围查询缓慢问题排查解决
- 为什么祖传代码会被称为「屎山」?
- 建站宝超级站群版 v1.4
- mysql rollback作用_innodb_rollback_on_timeout的作用
- 北京理工大学计算机保研面试,保研之旅2:北京理工大学雷达所面试
- 吉林大学计算机保研到,吉林大学 保研到清北
- Markdown输入数学公式
- 深度式睡眠潜入虚拟世界_潜入swiftui的惊人世界
- 进服务器网站报错应用程序,IIS:应用程序池中的服务器错误、超时时间已到
- 苹果手机测试网络速度的软件,App Store 上的“网速测试大师-测网速首选”