Hash-based Shuffle内幕彻底解密
视频学习来源:DT-大数据梦工厂 IMF传奇行动视频
本期内容:
1 Hash Shuffle彻底解密
2 Shuffle Pluggable解密
3 Sorted Shuffle解密
4 Shuffle性能优化
Shuffle:基本形态是将具有共同特征的一类数据洗牌后放在同一个节点上进行计算
一、Shuffle到底是什么?
        翻译成中文意思为洗牌在MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。Spark作为MapReduce框架的一种实现,自然也实现了shuffle的逻辑,本文就深入研究Spark的shuffle是如何实现的,有什么优缺点,与Hadoop MapReduce的shuffle有什么不同。
二、shuffle可能面临的问题(运行task的时候才会产生shuffle,shuffle已经融化在算子中了)
    1、数据量特别大;
    2、具体讲数据怎么分类,即如何Partition、Hash、Sort、钨丝计划;
    3、负载均衡(数据倾斜);
    4、网络传输效率,需要在压缩和解压之间进行权衡;
5、序列化和反序列化;
说明:具体进行计算的时候尽最大可能使数据具备Process  Locality的特性,退而求其次是增加数据分片,减少每个task处理的数据量,一般 cache的时候有风险;有oom的风险;别的进程占用的风险;读磁盘IO风险也较大,某种情况下不如直接在内存中计算,如果采用shuffle,必须要等待父Stage计算完成后,下一个stage才能进行计算,如果其中有一个stage故障,整个job就不能正常执行了。
三、Hash shuffle(无需排序,数据规模较小的情况)
1、key不能是Array;
2、使用hash 就无需进行排序,此时从理论上讲就节省了Hadoop MapReduce中进行 shuffle需要排序时候的时间浪费,因为实际生产环境有大量的不需要排序的shuffle类型;
思考:不需要排序的Hash Shuffle是否一定比需要排序的Sorted Shuffle速度更快?
解答:不一定!如果数据规模比较小的情况下,Hash Shuffle会比Sorted Shuffle速度快(很多)!如果数据量大,此时Sorted Shuffle一般会比Hash Shuffle快(很多)!
 /*** Choose a partitioner to use for a cogroup-like operation between a number of RDDs.*选择一个分区使用一个群就像一批RDDS之间操作。* If any of the RDDs already has a partitioner, choose that one.** Otherwise, we use a default HashPartitioner. For the number of partitions, if* spark.default.parallelism is set, then we'll use the value from SparkContext* defaultParallelism, otherwise we'll use the max number of upstream partitions.** Unless spark.default.parallelism is set, the number of partitions will be the* same as the number of partitions in the largest upstream RDD, as this should* be least likely to cause out-of-memory errors.** We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.*/def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reversefor (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {return r.partitioner.get}if (rdd.context.conf.contains("spark.default.parallelism")) {new HashPartitioner(rdd.context.defaultParallelism)} else {new HashPartitioner(bySize.head.partitions.size)}}
}
3、每个ShuffleMapTask会根据key的哈希值计算出当前的key需要写入的Partition,然后把决定后的结果写入当前单独的文件,此时会导致每个Task产生R(指下一个Stage的并行度)个文件,如果当前的Stage中有M个ShuffleMapTask,则会M*R个文件!!!
注意:Shuffle操作绝大多数情况下都要通过网络,如果Mapper和Reducer在 同一台机器上,此时值需要本地磁盘即可。
Hash Shuffle的量大死穴:
    1、Shuffle前会产生海量的小文件与磁盘之上,此时会产生大量耗抵消的IO操作;
    2、内存不共用!由于内存中需要保存海量的文件操作句柄和临时缓存信息,如果数据处理规模比较庞大的话,内存不可承受,出现OOM等问题。
 上图中将数据分成六种类型,产生六种小文件,因为并行度为2,所以一共产生12个小文件,当并行度和task特别多的时候,磁盘会有很大的压力。
        为了改善上述问题(同时打开过多的文件导致Writer Handle内存使用过大以及产生过度文件导致大量的随机读写带来的效率极为低下的磁盘IO),Spark后来推出了Consalidate机制,来把小文件合并,此时Shuffle文件产生的数量为cores*R,对于ShuffleMapTask的数量明显多于同时可用的并行Cores的数量的情况下,Shuffle产生的文件会大幅度减少,减少OOM。
    为此Spark推出了Shuffle Pluggable开放框架,方便系统升级的时候定制Shuffle功能模块,也方便改造人员根据实际的业务场景来开放具体最佳的Shuffle模块;核心接口ShuffleManager,具体默认实现由HashShuffleManager、SortShuffleManager等,Spark1.6.0中具体的配置如下:
    val shortShuffleMgrNames = Map("hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager","sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager","tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)val memoryManager: MemoryManager =if (useLegacyMemoryManager) {new StaticMemoryManager(conf, numUsableCores)} else {UnifiedMemoryManager(conf, numUsableCores)}
 上述代码我们可以看到基于Hash的shuffle其实是org.apache.spark.shuffle.hash.HashShuffleManager类,而sort的shuffle是org.apache.spark.shuffle.sort.SortShuffleManager。如果我们没有配置spark.shuffle.manager,则默认选用hash,这个大小写没有关系。如果用户配置的spark.shuffle.manager在shortShuffleMgrNames中没有查到,则选用用户自定义的Shuffle。用户自定义的Shuffle必须继承ShuffleManager类,重写里面的一些方法。
    采用consalidate的方式,每个Core会产生一个文件,那么总共文件的数量就是Cores的数量*Reducer的数量,就Core本身而言,第二个ShuffleMapTask会将结果追加第一个ShuffleMapTask上;
    Sort在具体实现的时候会进行归并排序,并加上自己的索引,此时产生的文件的数量会大大降低,但也会消耗性能,此时它具有两个文件,一个是索引文件,一个是ShuffleMapTask具体的文件,Reducer访问的时候,首先访问索引文件,然后通过index定位到具体文件的内容。
王家林老师是大数据技术集大成者,中国Spark第一人:

DT大数据梦工厂

新浪微博:www.weibo.com/ilovepains/

微信公众号:DT_Spark

博客:http://.blog.sina.com.cn/ilovepains

TEL:18610086859

Email:18610086859@vip.126.com

Hash-based Shuffle内幕彻底解密相关推荐

  1. spark Hash Shuffle内幕彻底解密

    本博文的主要内容: 1.Hash Shuffle彻底解密 2.Shuffle Pluggable解密 3.Sorted Shuffle解密 4.Shuffle性能优化 一:到底什么是Shuffle? ...

  2. spark shuffle 内幕彻底解密

    一:到底什么是Shuffle? Shuffle中文翻译为"洗牌",需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算. 二:Shuffle可 ...

  3. 阿里云Spark Shuffle的优化

    转自:大数据技术与架构 本次分享者:辰石,来自阿里巴巴计算平台事业部EMR团队技术专家,目前从事大数据存储以及Spark相关方面的工作. Spark Shuffle介绍 Smart Shuffle设计 ...

  4. spark union 会引起shuffle吗_Spark高性能Job

    知乎导入md文件会失真,无语,将就着看吧!原博客地址为:Spark高性能Job 1.1 Job 遇到一个action算子就会提交一个job,常见的transformation算子以及Action算子: ...

  5. Spark 系列——Spark的Shuffle原理

    目录 一.基本介绍 1.1 Lineage 1.2 窄依赖 1.3 宽依赖 二.Spark Shuffle的原理 2.1 ShuffleManager 2.2 ShuffleWriter 2.2.1 ...

  6. Spark shuffle机制演进史及原理说明(sort-based/hash-based/bypassShuffleManager)

    spark shuffle 演进的历史 Spark 0.8及以前 Hash Based Shuffle Spark 0.8.1 为Hash Based Shuffle引入File Consolidat ...

  7. 2018年新春报喜!热烈祝贺王家林大咖大数据经典传奇著作《SPARK大数据商业实战三部曲》 畅销书籍 清华大学出版社发行上市!

    2018年新春报喜!热烈祝贺王家林大咖大数据经典传奇著作<SPARK大数据商业实战三部曲>畅销书籍 清华大学出版社发行上市! 本书基于Spark 2.2.0新版本,以Spark商业案例实战 ...

  8. 群内2018_4月讨论整理2

    归档至github 说明 以下内容来自群中出现的问题,大家讨论的结果 Q群:432600958 微信群:加微信w3aboutyun,附上about云铁粉 部分内容整理时,已经注明出处,但很多内容,较为 ...

  9. sparkcore分区_Spark学习:Spark源码和调优简介 Spark Core (二)

    本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容. 第一部分内容见: S ...

最新文章

  1. 正确的 Git 提交记录和分支模型
  2. Paxos、ZAB、RAFT协议
  3. 标题与文字的组合[摘]
  4. linux tcp在传输数据的时候断网了_选择最合适的协议 让传输数据更灵敏
  5. 【送书】2021年哪些好书值得读(小姐姐配音)
  6. OSChina 周四乱弹 —— 要成立复仇者联盟了,来报名
  7. 年仅26岁!这位双一流大学的特任教授,攻克世界数学难题
  8. 安卓手机浏览器_chrome浏览器插件安卓下载-chrome apk手机版下载v4.8.2安卓版
  9. Gridview模板中提供的删除功能
  10. linux tcp文件分包_畅谈linux下TCP(下)
  11. 几百万消息在消息队列里积压了几个小时!完了。。。
  12. Assembly.CreateInstance()与Activator.CreateInstanc
  13. 3分钟全面了解元数据和数据元
  14. egret引擎生命周期相关
  15. Python 集和篇
  16. 邮件服务器正常工作亮几个灯,光纤猫正常亮几个灯 光纤猫的灯都代表意思是什么【详解】...
  17. mysql错误1548 Cannot load from mysql.proc的最终解决方法
  18. centos发现网络连不上了,重启网络服务报错“systemctl status network.service” and “journalctl -xe” for details. [失败]
  19. div 设置a4大小_网页打印时设置A4大小
  20. 腾讯CSIG面试题目总结

热门文章

  1. 数据中台已成下一风口,它会颠覆数据工程师的工作吗?
  2. 和ZLTT一起学pwn 2.ret2text
  3. PDF转图片乱码问题解决
  4. discuz密码找回:忘记UCENTER创始人密码
  5. CSDN个人博客管理
  6. sap成本流怎么看_[原创]SAP方丈-写给新手的SAP成本核算流程
  7. 人工智能助力未来教育
  8. 一篇文章学会二进制的运算以及原码、反码、补码
  9. 2020YKB西医综合全程班资料
  10. 统计单词频率 java_java 统计一个(英文)文本中频率最高的10个单词