Spark core

作业提交流程

1. client运行脚本提交命令。
2. SparkSubmit实例化SparkSubmitArguments进行参数解析。
3. SparkSubmit实例化YarnClusterApplication并创建客户端。
4. 在客户端中封装YarnClient信息,包含提交参数和命令。
5. 将信息提交给RM。
6. RM向NM的yarnRMClient发送消息,启动APPmaster。
7. NM分配资源生成APPmaster,并启动Driver线程。
8. 执行代码,初始化SparkContext。
9. APPmaster向RM申请注册,要求分配资源。
10. RM向APPmaster返回可使用资源列表。
11. APPmaster实例化工作线程池launcherPool。
12. 工作线程池launcherPool中实例化ExecutorRunnalbe。
13. ExecutorRunnalbe向NM2的RPC申请启动Executor。
14. Driver的RPC和Executor的RPC自启动。
15. Executor的RPC向Driver的RPC通信,申请注册Executor。
16. Driver的RPC向Executor的RPC通信,返回注册成功。
17. NM2启动CoarseGrainedExecutorBackend进程作为Executor的守护进程,并实例化Executor,实例化线程池。
18. Executor的RPC向Driver的RPC通信,Executor启动成功,可以运行任务。
19. Driver进行任务切分。
20. Driver进行任务分配。

宽窄依赖的区别,哪些算子有shuffle

窄依赖:父RDD的一个分区最多只能被一个子RDD使用。

宽依赖:父RDD的一个分区可以被多个子RDD使用。

引起shuffle的算子。

算子名称 转换 功能
sortBy 使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。
sortByKey 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
reduceByKey 对K-V类型的RDD按照Key对value进行聚合。
join (K,V).join((K,W)) => (K,(K,W)) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
leftOuterJoin 左表不管有没有与右表相同的key,左表都全部显示
rightOuterJoin 右表不管有没有与左表相同的key,左表都全部显示
fullOuterJoin 返回左右数据集的全部数据,左右有一边不存在的数据以None填充
distinct 对源RDD进行去重后返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。
cogroup (K,V).cogroup((K,W)) => (K,(Iterable,Iterable)) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
repartition 根据分区数,重新通过网络随机洗牌所有数据,百分百进行shuffle。
groupByKey 对K-V类型的RDD按照Key对value分组。

MR shuffle 和Spark shuffle 原理异同

  • 从整体功能上看

    两者没有太大区别,都是将Map端数据重新分发至Reduce端的过程。

  • 从数据流来看

    两者有较大区别,MR是先分区排序,之后在磁盘上归并排序,reduce拉取后再进行归并排序。Spark之前默认的是hash-based,使用HashMap进行合并,但是不会提前排序,现在Spark使用sort-based进行shuffle,也会进行排序。

  • 从整体流程上看

    两者有较大区别,MR将处理流程明显的划分出map、spill、merge、shuffle、sort、reduce等过程,各个阶段各司其职,通过过程式编程实现每个阶段的功能。Spark并没有明确的阶段,只是通过不同的stage和一系列transformation实现各个阶段的功能。

Spark和MapReduce的异同

  • 资源依赖的不同

    Spark将中间数据放在内存中,避免数据落盘,数据计算效率高;MR将中间数据落在磁盘上,数据计算效率低,但是没有OOM风险。

  • 容错程度的不同

    Spark使用RDD来实现高容错,某一部分RDD出现问题,可以通过血缘回溯这部分数据;MR某个阶段发生问题会导致整个作业失败,需要重跑。

  • 生态复杂程度的不同

    Spark既是计算引擎,也是一个包含流计算、SQL、图计算、算法包的框架,而MR只有map-reduce两个算子。

Spark和Flink的异同

  • 数据模型不同

    Flink采用dataflow编程模式,配合事件组成数据模型,这样的模型在目前在时延和吞吐上是最优的,Spark使用RDD组成数据模型,RDD丰富的API降低了批处理的难度,但是SparkStreaming的DStream API很少,无法支撑复杂的流计算。

  • 运行架构不同

    Spark将DAG划分为不同的stage,DAG节点间有血缘关系,运行中一个stage完成后才会销毁;SparkStreaming则是对持续流划分为不同批次,定时执行不同批次的数据运算。(Spark将流当做连续的微小的批处理)

    Flink使用Dataflow编程模式有统一的runtime,在Flink的流处理中,一个事件在一个节点处理完后输出就可以发到下一个节点立即处理,这样不会引入额外的延迟。

  • 时延和吞吐不同

    Flink是亚秒级别时延,SparkStreaming是秒级别时延。Flink吞吐量是SparkStreaming的1.X倍。

  • 反压不同

    Flink下游阻塞后,会将压力逐级上传直到数据源。Spark则是设置反压的吞吐量,到达阈值后开始限流。

  • 一致性语义支持不同

    Flink提供exactly-once语义,Spark提供at-least-once语义。

  • 对状态的支持不同

    Flink提供完整的状态管理,基于状态提供局部恢复快照。Spark通过RDD的API进行状态管理,因为Spark面向大数据集,快照不宜太频繁。

Spark调优讲解并举例

  • 减少Spark中运行的数据库连接数。

    使用foreachPartition代替foreach,在foreachPartition内获取数据库的连接。

  • 优化TopN的获取方式

    (1)取出所有的key进行迭代,每取出一个key利用排序算子进行排序。

    (2)对key值建立自定义分区器,使不同的key进入到不同的分区,对每个分区进行排序。

  • 降低map数

    对map进行coalesce操作,降低map数量。

  • 调高reduce缓存

    spark.reduce.maxSizeInFilght 参数控制reduceTask一次可以拉取多少数据量。

    spark.shuffle.file.buffer 参数控制shuffle文件输出流的内存缓冲区大小。

Spark数据倾斜讲解并举例

数据倾斜表现

  • OOM
  • shuffle报错
  • 单个Executor执行时间过长
  • 任务突然失败

数据倾斜原因

  • 根:key值分布不均匀

  • 内因:建表规范缺失

    两张表做join时,关联字段存在对同一属性的不同散列声明,会导致作业直接卡死。

  • 外因:业务数据激增

    因为活动等因素导致某几个key值对应数据量激增从而导致数据倾斜。

定位数据倾斜代码

某个task执行特别慢的情况

首先明确数据倾斜发生在哪个stage中,可以通过Spark web UI来看当前运行到了第几个stage,分析当前执行的stage有几个task,每个task运行时间和分配数据量,来判断是不是这个stage出现的问题。

之后通过阅读作业代码判断是哪个宽依赖算子引起的问题,根据数据来定位引起数据倾斜的算子。

某个task莫名其妙内存溢出的情况

直接阅读日志,日志中会清晰的标明哪一行引起的内存溢出。

对key值采样分析分布情况

先试用pairs采样10%的样本数据,之后使用countByKey算子统计出每个key出现的次数。

导致数据倾斜的原因是Hive表

Hive预先对数据按照key进行聚合或其他操作,spark直接使用梳理好的数据。

但是没有从根本解决数据倾斜的问题,只能适用于短期解决问题的场景。

过滤少数导致倾斜的key

对于少数几个数据量特别大的key,且对计算结果不是特别重要的话,可以过滤掉。

这种方式适用场景较少。

提升shuffle并行度

给shuffle算子传入一个参数,比如reduceByKey(1000),该参数会增加该算子的并行度。

该方法不能解决数据倾斜,只能缓解数据倾斜带来的影响。

两阶段聚合(局部聚合+全局聚合)

对聚合类算子和分组聚合时,比较适用。

首先进行局部聚合,对key值打上一个固定范围的随机数前缀,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1),之后进行聚合。

之后将随机数前缀去掉,在此进行全局聚合。

通常可以从根本上解决数据倾斜的问题,但是只能适用于聚合类业务。

将reduce join 转换为 map join

使用broadcast变量和map类算子进行join操作,避免shuffle类操作,从根本上避免数据倾斜的发生和出现。将小表的数据通过collect算子拉取到driver端的内存中,然后创建一个Broadcast变量,广播给Executor节点。接着对另外一个RDD执行map类算子,使用广播变量获取较小的RDD全量数据,与当前RDD每一条数据按照key进行比对,如果连接的key相同,那就把这两个RDD的数据连接起来。

只适用于一个小表和一个大表进行join操作时引起的数据倾斜。

采样倾斜key并拆分join

首先采样提取出几个数据量较大的key,给这几个可以打上随机数前缀,这样拆分数量大的key值,两边进行正常join,最后union即可。

适用于两个大表进行join操作,其中一方有数据倾斜,另一方数据分布较为均匀。

使用随机前缀和扩容RDD进行join

将有大量数据倾斜的key的RDD,打上1-100以内的随机数前缀。将另一个key分布较为均匀的RDD膨胀100倍,两个处理后的RDD进行join即可。

以空间解决数据倾斜。

Spark原理与调优相关推荐

  1. spark原理参数调优

    一.spark原理 参考: Hive on Spark调优_窗外的屋檐-CSDN博客_spark.executor.instancesSpark资源参数调优参数_TURING.DT-CSDN博客_sp ...

  2. spark 资源参数调优

    资源参数调优 了解完了Spark作业运行的基本原理之后,对资源相关的参数就容易理解了.所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使 ...

  3. Spark的性能调优

    下面这些关于Spark的性能调优项,有的是来自官方的,有的是来自别的的工程师,有的则是我自己总结的. 基本概念和原则 首先,要搞清楚Spark的几个基本概念和原则,否则系统的性能调优无从谈起: 每一台 ...

  4. Spark系列之Spark的资源调优

    title: Spark系列 第十一章 Spark的资源调优 11.1 概述 ​ 在开发完Spark作业之后,就该为作业配置合适的资源了.Spark的资源参数,基本都可以在sparksubmit命令中 ...

  5. Spark开发性能调优

    Spark开发性能调优 标签(空格分隔): Spark –Write By Vin 1. 分配资源调优 Spark性能调优的王道就是分配资源,即增加和分配更多的资源对性能速度的提升是显而易见的,基本上 ...

  6. 全面的GC原理及调优

    本文介绍 GC 基础原理和理论,GC 调优方法思路和方法,基于 Hotspot jdk1.8,学习之后你将了解如何对生产系统出现的 GC 问题进行排查解决. 图片来自 Pexels 内容主要如下: G ...

  7. 第二十一期:老大难的GC原理及调优,这全说清楚了

    本文介绍 GC 基础原理和理论,GC 调优方法思路和方法,基于 Hotspot jdk1.8,学习之后你将了解如何对生产系统出现的 GC 问题进行排查解决. 本文介绍 GC 基础原理和理论,GC 调优 ...

  8. Apache Spark Jobs 性能调优(二)

    Apache Spark Jobs 性能调优(二) 调试资源分配 调试并发 压缩你的数据结构 数据格式 在这篇文章中,首先完成在 Part I 中提到的一些东西.作者将尽量覆盖到影响 Spark 程序 ...

  9. Apache Spark Jobs 性能调优(一)

    Apache Spark Jobs 性能调优(一) Spark 是如何执行程序的 选择正确的 Operator 什么时候不发生 Shuffle 什么情况下 Shuffle 越多越好 二次排序 结论 当 ...

最新文章

  1. ​【安全牛学习笔记】操作系统识别
  2. linux java mysql 乱码_Linux下MySQL的字符集乱码问题总结
  3. 53多项式08——多元多项式、齐次多项式和对称多项式
  4. 30岁的职场危机,人生下半场的困局
  5. linux时间相关结构体和函数整理
  6. Exception:No identifier specified for entity
  7. ENVI5.3下载与安装
  8. Axure8.0的注册码
  9. 微信公众号关注自动回复内容php ci,如何实现微信公众号“一键关注”功能?运营神器...
  10. RabbitMQ系列8 TTL 死信队列 延迟队列
  11. 史上最详细金卡介绍以及金卡制作教程(附风暴数码CID转换码链接)
  12. python回车和换行的区别_换行\ n与回车\ r的区别
  13. IT4IT 标准助力 IT 经理控制乱局
  14. html隐藏汉堡按钮,12种汉堡包图标按钮变形动画特效
  15. 综合布线系统带宽与计算机网络带宽计算题,计算机网络思考与练习题.doc
  16. 地中海文明卢浮宫特展全攻略(国家博物馆),通俗版
  17. QPM-PHP多进程开发-Supervisor配置参考
  18. android 官方bootloader,安卓系统bootloader模式是什么?如何进入bootloader模式
  19. 空间面板回归模型(stata操作)
  20. ssm+vue+elementUI 基于微信小程序的游戏美术外包管理信息系统-#毕业设计

热门文章

  1. Dobble的学习视频地址
  2. 【闲言碎语】学习 文本编辑器vim及其插件、ranger、C语言、WSL配置、X11等等
  3. mysql数据迁移工具_MySQL数据迁移工具的设计与实现
  4. ElementUI解决循环出多个Popover点击其中一个则其他关闭
  5. AIE红色/蓝色/绿色荧光聚苯乙烯微球/比色-荧光双信号ALE荧光微球的相关研究
  6. Kubernetes是解药还是毒药?
  7. 如何搭建Java开发环境?
  8. 关于Sublime text 2中Emmet的安装
  9. 业务逻辑漏洞--注册-登录-改密码页面总结
  10. 普及json格式相关问题