供稿 | eBay DSS Team

作者 | 田川晓阳

编辑 | 顾欣怡

本文4490字,预计阅读时间14分钟

导读

新一代数据开发分析平台ZetaeBay DSS(Data Services and Solutions) 团队自主研,主要针对在Spark SQL运行过程中可能存在的性能隐患及Spark执行计划图的缺,提出相应解决方案,旨在降低Spark SQL优化门槛,助力eBay用户解放分析效能,也希望对同业人员有所启发和帮助。

1. 背景介绍

eBay的大部分数据仓库之前一直构建于商业数据仓库系统Teradata之上,从2017年开始,eBay决定完全基于Hadoop平台来构建数据仓库,并开始了恢弘的数据搬迁之旅。然而随着以Spark为主要计算引擎的新开源平台的落户,复杂的数据访问模式和调优分析以及未经结构化的数据展现等问题开始变成全量投产的瓶颈。

为了解决这个困境,eBay DSS(Data Services & Solutions) 部门挺身而出,自主研发了Zeta——新一代数据开发分析平台,旨在为数据工程师、数据分析师和数据科学家提供跨平台且涵盖全生命周期的数据服务。平台支持基于元数据的数据探索、大数据开发、数据测试、数据分析以及终端的数据可视化等功能。目前,平台已累计服务超过2000多名用户,现有1000多名数据工程师、分析师及产品经理高度依赖Zeta平台来完成日常的数据开发处理和分析工作。

2. 痛点分析

随着大量分析场景的涌入和使用的激增,目前Hadoop平台每天要运行上万个Job,处理超过500PB的数据。与此同时,低质量Spark SQL的大量提交导致性能问题屡屡发生,这严重影响了平台性能,导致了资源浪费。然而,让用户自己对Spark SQL进行优化存在很高的技术门槛。对于缺少Spark经验的使用者来说,进行自主排查并解决问题无疑是一项痛苦甚至不可能完成的任务。

为了能够尽早地发现性能隐患、降低SQL优化的门槛、助力eBay用户解放分析效能,Zeta团队基于以往的实践对这些问题进行了一些探索并研发出了独有的解决方案,接下来本文将对此进行详细阐述。

3. 行业分析

3.1 关于性能分析的行业解决方案

1)Doctor Elephant:作为一个开源的Hadoop和Spark性能监控调优工具,主要是通过采集Spark进程运行过程中记录下来的log来进行性能分析并给出建议;但是采集的数据粒度较粗,并且分析过程滞后,无法实时地看到Job的运行情况的分析,而且给出的建议是针对Spark Job运行过程中底层计算框架级别信息的,对于Spark SQL的Job来说不具有针对性,还需要结合Spark内核知识来进行解读。

2)Ganglia等监控工具:这类工具主要通过采集Spark Metrics和log等在运行过程中暴露出来的内部数据来进行可视化展示,以供用户进行性能分析,但是缺点是太过于底层还需要用户自己做大量的统计分析工作,不够直接。

3)Spark UI中提供的SQL Physical Plan执行计划图:最贴近用户实际需求,并且可以实时地看到Job运行的状态,但是存在一些问题,下文会详细描述。

3.2 现有Spark执行计划图的缺陷

Spark UI 中SQL的物理执行计划图是我们观察Job运行状态最直接的一个重要途径,但这个物理执行计划图存在很多缺陷,比如缺失了很多和Spark进程内部情况相关的度量数据以及和JVM相关的重要信息,而这些缺失的信息都是帮助我们洞察性能瓶颈时不可或缺的线索。针对这种情况,Zeta团队进行了一系列的优化改造。

其实Spark执行计划图的主要缺陷在于用户很难直接从图上提供的信息中找出性能瓶颈。在执行Spark SQL的时候,Spark UI中存在两个可以用来观察分析自己Job运行情况的Tab,分别是Task level的运行状态图(图1)和SQL 的Physical Plan DAG(图2):

图1 Task Level

(点击可查看大图)

图2 Physical Plan DAG

(点击可查看大图)

Task level的Tab内容更加偏底层一些,我们可以获取到很多重要的数据信息,比如当前已处理的数据数量以及和JVM相关的重要数据。但问题是,在SQL特别复杂的时候很难将状态信息和自己的SQL的逻辑计划对应起来。SQL Tab 展示了物理执行计划图,更贴近用户的逻辑计划图,但是缺少Spark执行过程中偏底层的状态信息,比如task粒度的信息。如果发生了数据倾斜,无法直接在这张图上看出问题所在。

因此,在现有的Spark UI 布局下,对于用户来说,想要debug就必须在不同的图之间反复切换,同时还要结合Spark内核原理来分析状态信息从而诊断自己Job的问题所在,这对大部分数据分析师和产品经理来说具有一定的学习成本。而且即便发现了问题,这部分状态信息也不一定能给用户提供解决问题的具体方向,Spark当前所暴露出来的内部信息满足不了解决问题的需求。

4. Zeta 解决方案

4.1 核心问题

一句 SQL从解析到实际被Spark进程执行的过程中会历经如下几个阶段:

图3(点击可查看大图)

如图3所示,SQL最终会被转换成底层的RDD,整个任务执行的DAG图会被分解成一个或者多个具有依赖关系的stage并最终以task为执行单元发送到Spark Executor的进程中去执行。大部分情况下,到了这个阶段就已经无法再从task执行的上下文中找出这个task到底是在执行SQL上对应的哪块逻辑,因为经过Codegen等一系列优化之后理论上已经无法在实际的物理执行过程和最初的逻辑计划上建立映射关系。

对于数据倾斜这种情况,我们可以通过观察Spark UI来进行判定。如果某个stage执行了很长时间,其中少部分task处理的数据又比其他task多很多,那么就证明出现了数据倾斜。以多张表做join为例,如果在shuffle的过程中产生了数据倾斜,为了尽可能将数据分散到不同的进程中进行处理,从而达到平衡工作负载的目的,比较通用的有以下几个方法:修改逻辑,将shuffle时的key尽可能打散;加大shuffle的分区数量从而使数据分散到更多的分区中去;单独找出产生了极大倾斜的key,在逻辑中单独处理最后再和其他部分union起来。

在准备开始解决这个问题之前,我们必须要回答两个核心问题:

1)如何找出SQL逻辑中发生了倾斜的那个部分?

2)如果发生了倾斜,又该如何知道到底倾斜在了哪一些key上呢?

对于问题1),一般来说数据倾斜都发生在会产生shuffle的操作上,比如join和group by等操作。对Spark内核比较熟悉的用户可以根据Spark UI上DAG的实时计划图大致推断出对应在SQL上的逻辑操作。

而问题2)就需要用户自己花额外的功夫来对表的数据做统计分析,比如算出表中用来做join的字段中每个值的数量,并按照值的大小进行排序,由此可以观察出主要有哪些造成倾斜最严重的值,从而进行针对优化。

如果这两个问题Spark能够自动在执行任务的时候解决,并通过一张图的形式很直观地表达出来,然后在用户执行Spark SQL的过程中实时推送给用户,那就可以即时地帮助用户发现执行过程中存在的性能问题,也就能节省大量分析推断和数据调研的时间,大大提高开发效率。

4.2  定制化DAG图

经过内部用户的调研和分析,我们决定通过修改Spark源码来满足以上需求。在现有Spark UI的基础上构建一张能同时包含具有这两张图关键特性的DAG图,既要反映出实际Spark进程中的关键状态信息,又要尽可能地帮助用户,将出现问题的状态映射到SQL中的某个逻辑块上。

Spark SQL经过解析后的Physical Plan中,每个物理算子节点都实现了对RDD的转换过程。所以当最终SQL的执行过程经过一系列转换变成RDD的转换过程后,一个物理算子就可以映射到RDD DAG图中的某一段路径上,然后根据RDD DAG图划分stage的规则,从而将stage映射到SQL Physical plan的某个或者多个相邻节点上,再将Task level的状态映射到Physical Plan的节点上了。

Task level的状态信息非常丰富,包括输入数据的大小以及和shuffle相关的状态信息等,这些信息都是帮助判定是否在某个算子上发生了倾斜的重要线索。同时,SQL Physical Plan的物理算子中,有的算子恰好保留了和逻辑执行计划及对应操作相关的上下文,如SortMergeJoinExec、ShuffledHashJoinExec算子保留了join操作时用到的key的上下文。因此,这样一来就解决了4.1中的问题1)

对于问题2),因为要计算key的值就必须引入额外的计算,而在实际的计算中key的基数又是很大的,所以为了不对Job的整体性能造成过大的影响,我们只需要计算Top N的那几个key即可。从优化倾斜的角度来讲,我们往往只需要找到倾斜最严重的部分key就可以了,而且这些key应该也只是少数。

接下来需要考虑的,就是该如何插入这段计算逻辑,以及如何让这段逻辑覆盖大部分情况下的倾斜度计算。在Spark的shuffle write阶段,其实writer写的时候就已经遍历了每一条数据,但是这个阶段太抽象,想在该阶段对数据进行统计计算并还原成实际处理数据的key并不是很容易,需要做非常多的workaround,这样就不能直接地解决问题。

因此我们还是决定从物理算子入手,通过修改部分物理算子的算法,来达到在做原有计算逻辑的同时也对数据做统计计算。于是我们在这当中对少部分物理算子的算法做了较大的重构。由于做了重构设计,因此当动态计算key的功能启动之时,这少部分原本支持Codegen的物理算子将无法支持Codegen。

在实际的计算过程中,如果开启动态计算key的功能,将会为每一个TaskSet创建一个定制化的AdvancedTaskSetManager,主要作用一是执行原有物理算子的逻辑,二是当发现某些task存在数据倾斜的时候,会额外启动一个TaskSet’来执行统计计算的逻辑,如图4所示:

图4(点击可查看大图)

这个TaskSet’的执行逻辑和正常TaskSet的执行逻辑一样,都是修改过算法后的执行逻辑。不同的是AdvancedTaskSetManager会为这两个TaskSet分别注入不通的TaskContext,从而控制实际Task 在Runtime中执行不同的逻辑分支,一部分进行正常的计算,另外一部分进行统计计算并将结果返回到Driver端进行聚合,从而达到统计汇总的目的。这个改动对Spark原有代码有一定的侵入性。

好了,这下我们需要的运行时的数据都拿到了,接下来要做的就是构建这幅图了。Spark在Driver的初始化进程中会创建一个Spark UI对象,Spark UI会启动一个Jetty的web服务来供外部访问,Driver内部的状态存储对象AppStatusStore会为不同的Tab提供后端Render页面的数据,运行时Spark UI内部的状态如图5所示:

图5(点击可查看大图)

构建的第一步便是记录构建图形所需要的数据。Spark会在运行过程的某些逻辑中构建对应的事件,以便记录上下文并异步发送到Spark Driver内部的消息总线LiveListenerBus中。而且会有特定的Listener在总线的队列中监听特定的事件,当SQL被解析完毕并且准备开始执行的时候,会发出SparkListenerSQLExecutionStart事件。该事件中包含了SQL的物理计划执行图,像SparkPlanInfo,Driver内部的SQLAppStatusListener会监听这个事件并根据SparkPlanInfo准备将来做SQL Physical Plan后端渲染的数据。

因此,我们在SQL执行前物理计划树的遍历阶段记录下每个算子和对应RDD的上下文信息(图6),并以事件的形式发送到消息总线中,再由我们定制化的Listener监听捕捉并和已有的物理计划图进行整合即可(图7)。

图6(点击可查看大图)

图7(点击可查看大图)

而在每个stage开始执行和执行完毕的时候,也会发出相应的事件,这些事件中就包含了上文提到的各种统计信息和额外被注入的诊断信息。因此这些事件也会被监听,并用来update当前我们定制化的DAG图的状态。以一个实际生产中的案例作为参考,原始DAG图(图8)和经过定制化后的DAG图(图9)分别如下所示:

图8 社区版DAG图

(点击可查看大图)

图9 经过定制后的DAG图

(点击可查看大图)

此案例是在实际生产中两张表进行join的时候产生了数据倾斜,可见社区版原始的DAG图中只展示了少量的和内存以及数据量等相关的信息,而这些数据并不足以帮助我们观察出内在的性能问题。反观经过定制化的DAG图,可以看到在这个阶段运行过程中检测到了数据倾斜,并且显示倾斜发生在join操作上,还提示了join的字段为user_id,并同时计算出了倾斜最严重的值为1。而这个倾斜最严重的值在相关业务场景中其实是脏数据,于是用户根据这个信息更改了SQL,将user_id为1的数据filter掉了,从而大大减少了shuffle时候的数据量,大幅缩短了整体的运行时间,解决了内存溢出的问题。

5. 线上效果

5.1 减少事故发生率

在数据平台上上线这个定制化DAG图的Spark版本后,来自数据分析师和产品经理的support request变少了,同时许多数据开发人员在Job正式上线到生产环境之前,会通过数据平台运行生产中的Job并使用这个DAG图来分析执行过程中的性能问题,从而提前采取措施来对Job进行优化,减少了生产中的事故。

5.2 性能影响

需要注意的是,开启了自动计算倾斜 key值后,Job性能会有一定的下降,最坏的情况下整体Job的执行时间相较于不开启的时候多了30%左右,同时整体的CPU时间也提升了一些,性能影响最小的一个Job整体时间大概增加了5%。总体趋势是当Spark开启了DynamicAllocation之后,随着数据量的增大,对性能的影响会逐渐加强,这主要是由于额外的统计分析计算消耗了CPU时间,而且由于需要关闭特定算子的Codegen,所以相对于之前会产生更多的虚函数调用。此外,为了从Yarn申请更多的container来启动Executor运行额外的TaskSet,也增加了更多的调度时间。

6. 未来优化方向及社区跟进

当前DAG图的内部实现原理还有很多可以优化的地方,比如在计算极大倾斜的key的阶段,我们可以使用采样的方法来替代全体扫描。因为对于大部分情况来说,倾斜的key往往是少部分,采样虽然存在误差,但是基本能够找出发生了问题的key,这样可以节省不少CPU时间。同时也可以根据当前采集到的信息对SQL的物理执行过程进行动态优化,比如可以结合社区最新版本的AdaptiveExecution功能,做更加深入的动态调优从而提升整体的执行性能。在开发定制化DAG的时候Spark 3.0还没有正式发布,在3.0版本中社区对DAG图也做了更进一步的优化和增强,未来会考虑与其进行整合。

猜你喜欢

1、Hive-SQL面试?看这篇足够!附答案

2、Kafka 内核知识梳理,附思维导图

3、Elasticsearch如何做到亿级数据查询毫秒级返回?

4、大数据平台架构设计没思路?来看这篇就知道了!

过往记忆大数据微信群,请添加微信:fangzhen0219,备注【进群】

Apache Spark 在eBay 的优化相关推荐

  1. Apache Spark 2.2中基于成本的优化器(CBO)(转载)

    Apache Spark 2.2最近引入了高级的基于成本的优化器框架用于收集并均衡不同的列数据的统计工作 (例如., 基(cardinality).唯一值的数量.空值.最大最小值.平均/最大长度,等等 ...

  2. Apache Spark在海致大数据平台中的优化实践

    本文来自由海致网络技术公司翟士丹分享.专注于大数据技术领域,Apache Spark Contributor,有丰富的Spark SQL引擎调优经验.海致全称海致网络技术公司,成立于2013年7月.作 ...

  3. 1年将30PB数据迁移到Spark,eBay的经验有何可借鉴之处?

    Teradata在过去的二十年为eBay提供了非常优秀的数仓服务,支撑起了eBay庞大的业务规模.二十多年积累下来的数据已经将数据仓库变得非常庞大,所谓"牵一发而动全身",哪怕只是 ...

  4. Apache Spark中实现的MapReduce设计模式

    该博客是该系列文章的第一篇,讨论了MapReduce设计模式一书中的一些设计模式,并展示了如何在Apache Spark(R)中实现这些模式. 在编写MapReduce或Spark程序时,考虑执行作业 ...

  5. 2019年Apache Spark技术交流社区原创文章回顾

    整理了这一年(本号开通半年)分享过的来自诸多专家的实践经验,希望2020年我们仍然能够互相支持,壮大Spark社区. 感谢持续分享输出优质内容的阿里云EMR团队的王道远,余根茂,彭搏,郑锴,夏立,林武 ...

  6. 大规模数据处理Apache Spark开发

    大规模数据处理Apache Spark开发 Spark是用于大规模数据处理的统一分析引擎.它提供了Scala.Java.Python和R的高级api,以及一个支持用于数据分析的通用计算图的优化引擎.它 ...

  7. 大火的Apache Spark也有诸多不完美

    现在如果你想要选择一个解决方案来处理企业中的大数据并不是难事,毕竟有很多数据处理框架可以任君选择,如Apache Samza,Apache Storm .Apache Spark等等.Apache S ...

  8. 实用 | 从Apache Kafka到Apache Spark安全读取数据

    引言 随着在CDH平台上物联网(IoT)使用案例的不断增加,针对这些工作负载的安全性显得至关重要.本篇博文对如何以安全的方式在Spark中使用来自Kafka的数据,以及针对物联网(IoT)使用案例的两 ...

  9. 新版本来袭:Apache Spark 1.5新特性介绍

    Apache Spark社区2015年9月9日发布了1.5版本,该版本由230+开发人员和80+机构参与,修复了1400多个补丁,该版本可以通过 http://spark.apache.org/dow ...

最新文章

  1. Cstring的使用
  2. linux动态链接库---一篇讲尽
  3. 微软2011 Build大会:Windows 8盛大出场(转)
  4. STM32开发 -- CRC校验码
  5. 54_pytorch GAN(生成对抗网络)、Gan代码示例、WGAN代码示例
  6. Servlet 4.0 入门
  7. 【设计过程】.NET ORM FreeSql WhereDynamicFilter 动态表格查询功能
  8. Spring核心是什么
  9. Android x86 镜像 安装到 PC 机上
  10. C语言输出转置矩阵解题步骤,C语言实现矩阵转置
  11. Win7网络修复,winsock/tcpip
  12. 概率图模型(1)--隐马尔科夫模型(1)
  13. 米氏散射多次散射计算程序
  14. 1036:镂空三角形
  15. matlab 理想低通滤波器函数,基于MATLAB的理想低通滤波器的设计
  16. DP专题考试总结(2)
  17. 【项目介绍】ElasticSearch7+Spark 构建高相关性搜索服务千人千面推荐系统
  18. 软件项目管理 6.7.参数估算法
  19. MATLAB神经网络工具箱中感知器权值和阈值的学习函数learnp
  20. IO-Buffered包装类

热门文章

  1. source insght最佳配色方案
  2. php做宿舍门禁管理系统项目首选公司,一种校园宿舍门禁管理系统的制作方法
  3. 探讨数字人最真实与最有价值的应用场景|BOOK DAO 第二期共建
  4. Linux小知识--原始套接字(raw socket)之模拟ping
  5. 解决Ubuntu18.04使用WPS文字时输入法候选列表无法跟随问题
  6. 在传奇中放火墙之后可以转动吗
  7. CISSP考试大纲将在2021年5月1日更新
  8. 好佳居软装十大品牌 软装的布局有什么讲究
  9. opencv支持多种流行视频格式文件的读取
  10. idea安装lombok - 雨中散步撒哈拉