背景

本文基于spark 3.2
driver内存 2G

问题描述

在基于复杂的sql运行中,或者说是存在多个join操作的sql中,如果说driver内存不是很大的情况下,我们经常会遇到如下报错:

Caused by: org.apache.spark.SparkException: Could not execute broadcast in 800 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1at org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec$$anon$1.run(QueryStageExec.scala:217)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)

其实从字面上就可以理解:broadcast的数据超时了,这种经常是由于广播的数据量太大引起的(还有部分是由于SPARK-33933所说的问题引起的),spark中默认的广播大小不是只有10M才会进行广播么?
spark.sql.autoBroadcastJoinThreshold默认为10M
为什么还会存在广播的数据量很大呢?

问题分析

直接说重点:
在spark中 SMJ转BHJ 在两个阶段会发生:

  1. 正常的物理计划的生成阶段,也就是SparkPlanner中的JoinSelection规则中
  2. AQE阶段,也就是AdaptiveSparkPlanExec中的reOptimize方法
    其实这两个阶段调用的方法都是一样的都是调用了sparkPlanner的JoinSelection规则

我们说说第一阶段,也就是正常的物理计划的生成阶段,即JoinSelection规则

这里的重要的方法是canBroadcastBySize:

def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold}

具体的逻辑阶段的统计信息,可以参考spark logicalPlan Statistics (逻辑计划阶段的统计信息),这里如果我们是基于文件读取的话(大部分就是基于文件的读取),如果说我们的的sql是

##其中tableA有很多字段,我们只选取a,b两个字段
select a,b from tableA

这里的逻辑阶段的数据统计就是一个大概的计算:

private def visitUnaryNode(p: UnaryNode): Statistics = {// There should be some overhead in Row object, the size should not be zero when there is// no columns, this help to prevent divide-by-zero error.val childRowSize = EstimationUtils.getSizePerRow(p.child.output)val outputRowSize = EstimationUtils.getSizePerRow(p.output)// Assume there will be the same number of rows as child has.var sizeInBytes = (p.child.stats.sizeInBytes * outputRowSize) / childRowSizeif (sizeInBytes == 0) {// sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero// (product of children).sizeInBytes = 1}// Don't propagate rowCount and attributeStats, since they are not estimated here.Statistics(sizeInBytes = sizeInBytes)
}
...
def getSizePerRow(attributes: Seq[Attribute],attrStats: AttributeMap[ColumnStat] = AttributeMap(Nil)): BigInt = {// We assign a generic overhead for a Row object, the actual overhead is different for different// Row format.8 + attributes.map { attr =>if (attrStats.get(attr).map(_.avgLen.isDefined).getOrElse(false)) {attr.dataType match {case StringType =>// UTF8String: base + offset + numBytesattrStats(attr).avgLen.get + 8 + 4case _ =>attrStats(attr).avgLen.get}} else {attr.dataType.defaultSize}}.sum}

我们看到其实就是对于从一个表中读取一个字段的大小是基于改字段类型所占avgLen的大小除以该表中所有字段总的类型的avgLen的一个比值,或者是默认值
如: select a from tableA, 假如tableA是300M 有a,b,c,d,e等30个字段,其中a,b,c,d,e等30字段都是string类型,且defaultSize是20。
则该sql算出来的size就是 20/(30*20)*300M=10M 这样这个sql所对应的临时表就能进行广播。
但是正如spark里说的:

Statistics collected for a column.
1. The JVM data type stored in min/max is the internal data type for the corresponding Catalyst data type. For example, the internal type of DateType is Int, and that the internal type of TimestampType is Long. 2. There is no guarantee that the statistics collected are accurate. Approximation algorithms (sketches) might have been used, and the data collected can also be stale.
Params:
distinctCount – number of distinct values
min – minimum value
max – maximum value
nullCount – number of nulls
avgLen – average length of the values. For fixed-length types, this should be a constant.
maxLen – maximum length of the values. For fixed-length types, this should be a constant.
histogram – histogram of the values
version – version of statistics saved to or retrieved from the catalog

这些统计信息有可能是不准确的(实际上String是变长类型),所以在计算的时候,有可能broadcast的数据就相对比较大,如果存在这种join有很多的情况下,就会导致driver端很卡,甚至OOM

我们再说说第二阶段,也就是AQE阶段

其实这个阶段核心还是getFinalPhysicalPlan方法中的createQueryStages方法和reOptimize方法,
在createQueryStages方法中如果方法已经是BroadcastExchangeExec的话,就会直接包装成ShuffleQueryStageExec,
如果是ShuffleExchangeExec的话,在下一个阶段会经过reOptimize方法根据运行时的统计信息大小,来进行是否可以进行SMJ到BHJ的转换
这里在的阈值判断是通过spark.sql.adaptive.autoBroadcastJoinThreshold来判断的,默认也是10M,

所以在spark UI上有时候能看到broadcast 的datasize有50M甚至100多M,而明明broadcast的阈值是10M,却变成了BroadCastHashJoin。
如下如所示:

结论

所以在大数据量,以及在复杂的sql情况下,禁止broadcasthashjoin是明确的选择,毕竟稳是一切运行的条件,但是也是可以根据单个任务个别开启。

spark在生产中是否要禁止掉BHJ(BroadcastHashJoin)相关推荐

  1. 在生产中使用Istio,我们学到了什么?

    在生产中使用Istio,我们学到了什么? 灵雀云 https://www.jianshu.com/p/cf4d4258b7f6 首先,给大家简单介绍一下Istio,Istio是一个Service Me ...

  2. 读书笔记{11} VLAN及其在生产中的应用

    1 LAN 简介 LAN,Local Area Network,直译为本地区域网,可能是因为为了方便传播,大家基本上都叫它局域网,既然大家都这么叫,如果我们搞特例那就是不明智了. 局域网这个词的侧重其 ...

  3. Hibernate:hbm2ddl.auto =在生产中更新?

    本文翻译自:Hibernate: hbm2ddl.auto=update in production? 是否可以运行使用hbm2ddl.auto=update配置的Hibernate应用程序来更新生产 ...

  4. 在生产中使用Java 11:需要了解的重要事项

    来源:SpringForAll社区 如果您正考虑更新最新版本的Java,阅读本文以了解有关Oracle Java 11的最重要信息. 如果您及时了解Java社区的新闻,您可能听说Oracle改变了他们 ...

  5. [导入]如何禁止掉SharePoint页面个性化?(续)

    摘要:在之前的文章里面,写了一个方法,来禁止掉SharePoint页面的个性化.但当时就有人问了,如果我只想管理员能够个性化页面,而非管理员不能做这个操作,又应该如何做呢?下面就是方法.注意:这个方法 ...

  6. mlflow_在生产中设置MLflow

    mlflow This is the first article in my MLflow tutorial series: 这是我的MLflow教程系列的第一篇文章: Setup MLflow in ...

  7. selenium自动化测试_为什么在生产中进行Selenium自动化测试对于您的下一个版本至关重要?...

    selenium自动化测试 您是否认为仅仅是因为您的Web应用程序在过渡环境中以飞快的速度通过,它对于生产环境也将是相同的? 您可能需要重新考虑! 特别是,如果我们指的是跨浏览器测试 ,则需要确保跨各 ...

  8. aws eks_在生产中配置和使用AWS EKS

    aws eks 到现在,我们已经完成了向Amazon EKS ( 工作地点)的迁移,并且集群已经投入生产. 过去,我已经写了一些要点的简短摘要,您可以在这里找到. 当系统正在处理实际流量时,我有了一些 ...

  9. vue中生产模式和调试模式_为什么在生产中进行调试是如此诱人?

    vue中生产模式和调试模式 生产调试 为什么在生产中进行调试是如此诱人? 在我的第一份工作中,我要做的任务之一是修复一个错误,该错误过去在非常复杂的生产系统中有时会发生. 那很简单! - 我想. 我将 ...

最新文章

  1. 剑指offer:面试题19. 正则表达式匹配
  2. 用这种方法实现无监督端到端图像分类!(附论文)
  3. 某生鲜电商平台的监控模块设计
  4. IT专案管理中的风险控制。
  5. 关于浏览器前进键和后退键样样式表冲突的问题
  6. 做java项目_初学者做java项目的流程
  7. 计算机设置了分组用户怎么切换,怎么弄微信小号-不单止换小号,另外这些微信小技巧也一定让你“相见恨晚”...
  8. xp计算机怎样看ip地址,XP电脑ip地址怎么查看?XP系统怎么样查看电脑的IP地址?...
  9. Android之Material Dialogs详解(非原创)
  10. 不同时区时间换算_Java中如何显示不同时区的时间(原理详解)
  11. Processing绘制随风飘扬的名画
  12. php session fixation,Session Fixation 原理与防御
  13. sql语句中count(*),count(1),count(id)区别详解
  14. egret白鹭 基于eui组件的一些动画类 抽屉效果
  15. MIoU(均交并比)的计算
  16. 【MVC-自定义过滤器】
  17. zzulioj1094c语言版答案,ZZULIOJ 1094: 统计元音(函数专题)
  18. 常用的外贸英语口语汇总
  19. 情感溢出:读《浣溪沙》
  20. 2020年国防科大CS预推免

热门文章

  1. 自动编码器重建图像及Python实现
  2. Visio文件编辑Visio Viewer
  3. python 代码_实战代码 | 30 个Python小代码,收藏。
  4. Docker系列四DockerFile打造Python镜像
  5. 使用DW编辑简单的图文混淆
  6. C语言中exit(0) 、exit(1) 和return的区别
  7. STM32的Vcap的问题及解决---原来经验也害人
  8. base64图片转换工具类以及base转图片工具
  9. Fiddler笔记(十)
  10. 大工19秋计算机网络技术在线作业1,大工18秋《计算机网络技术》在线作业1.pdf...