spark-调优(代码)

在编写代码时可以进行优化

  1. 避免创建重复的RDD
  2. 尽可能复用同一个RDD
  3. 对多次使用的RDD进行持久化
  4. 尽量避免使用shuffle类算子
  5. 使用map-side预聚合的shuffle操作
  6. 使用高性能的算子
  7. 广播大变量
  8. 使用Kryo优化序列化性能
  9. 优化数据结构
  10. 使用高性能的库fastutil

1.对多次使用的RDD进行持久化

默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大, 可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避 免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作 ,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传 送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种 策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化 级别,会导致JVM的OOM内存溢出异常。

如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用 MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个 partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别 比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算 子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上, 如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。

如何选择一种最合适的持久化策略
如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是 MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无 法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优 先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写 ,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将 所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性 能开销,除非是要求作业的高可用性,否则不建议使用。

package com.shujia.spark.optimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevelobject Demo1Cache {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions", 1).getOrCreate()val sc: SparkContext = spark.sparkContextval studentsRDD: RDD[String] = sc.textFile("data/students.txt")/*** 当对同一个rdd进行多次使用的时候可以将rdd缓存起来**///缓存级别是MEMORY_ONLY//studentsRDD.cache()//内存放不下放磁盘,同时会对数据做序列化,将一个分区的数据序列化从一个字节数组studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)/*** rdd: rdd.cache* df : df.cache* sql: cache table student,  uncache table studnet*//*** 统计班级的的人数**/studentsRDD.map(stu => (stu.split(",")(3), 1)).reduceByKey(_ + _).map {case (clazz: String, num: Int) =>s"$clazz\t$num"}.saveAsTextFile("data/cache/clazz_num")/*** 统计性别的人数**/studentsRDD.map(stu => (stu.split(",")(3), 1)).reduceByKey(_ + _).map {case (gender: String, num: Int) =>s"$gender\t$num"}.saveAsTextFile("data/cache/gender_num")/*** 清空缓存*/studentsRDD.unpersist()while (true) {}}
}

2.使用高性能的算子

  1. 使用reduceByKey/aggregateByKey替代groupByKey
  2. 使用mapPartitions替代普通map Transformation算子
  3. 使用foreachPartitions替代foreach Action算子
  4. 使用filter之后进行coalesce操作
  5. 使用repartitionAndSortWithinPartitions替代repartition与sort类操 作 代码
  6. repartition:coalesce(numPartitions,true) 增多分区使用这个
  7. coalesce(numPartitions,false) 减少分区 没有shuffle只是合并 partition

2.1aggregateByKey案例:

package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSessionobject Demo2AggregateByKey {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").appName("Demo2AggregateByKey").config("spark.sql.shuffle.partitions", 1).getOrCreate()val sc: SparkContext = spark.sparkContextval studentsRDD: RDD[String] = sc.textFile("data/students.txt")val clazzKvDS: RDD[(String, Int)] = studentsRDD.map(stu => (stu.split(",")(4), 1))/*** aggregateByKey: 需要两个函数,一个是map端预聚合的函数,一个reduce端汇总的函数* reduceByKey map端和reduce端聚合函数是一样,* 如果map端和reduce端要写不一样的聚合函数可以使用aggregateByKey**/val countRDD: RDD[(String, Int)] = clazzKvDS.aggregateByKey(0)((u: Int, i: Int) => u + i,//在map端做聚合函数(u1: Int, u2: Int) => u1 + u2//在reduce端做聚合的函数)countRDD.foreach(println)}
}

2.2 mapPartitions案例

package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSessionimport java.text.SimpleDateFormat
import java.util.Dateobject Demo3MapPartition {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions", 1).getOrCreate()val sc: SparkContext = spark.sparkContextval dataRDD: RDD[String] = sc.textFile("data/ant_user_low_carbon.txt")val kvRDD: RDD[(String, String, String)] = dataRDD.mapPartitions(iter => {iter.map(line => {//如果只是简单的拆分数据,使用map和mappartition没有区别val split: Array[String] = line.split("\t")(split(0), split(1), split(2))})})val resultRDD: RDD[(String, Long, String)] = kvRDD.mapPartitions(iter => {/**** 可以将一些初始化的代码房子mapPartitions中,减少占用的内存空间*///将时间字段转换成时间戳//在这里创建的对象,是一个分区创建一个val format = new SimpleDateFormat("yyyy/MM/dd")iter.map {case (id: String, sdate: String, p: String) =>val dateObj: Date = format.parse(sdate)val ts: Long = dateObj.getTime(id, ts, p)}})resultRDD.foreach(println)}
}

2.3 foreachPartitions案例

package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.sql.{Connection, DriverManager, PreparedStatement}object Demo4foreachPartitions {def main(args: Array[String]): Unit = {val startTIme: Long = System.currentTimeMillis()val spark: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions", 1).getOrCreate()val sc: SparkContext = spark.sparkContextval studentsRDD: RDD[String] = sc.textFile("data/students.txt")/*** 将rdd的数据保存到mysql中**//*** foreach: 每一条数据都需要创建一个网络链接* 不能将网络链接放在算子外(网络链接不能在网络中传输)**//* studentsRDD.foreach(stu => {val split: Array[String] = stu.split(",")//1、加载启动Class.forName("com.mysql.jdbc.Driver")val start: Long = System.currentTimeMillis()//2、创建链接val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8", "root", "123456")val end: Long = System.currentTimeMillis()println(s"创建数据库的链接用了:${end - start}")//3、编写插入数据的sqlval stat: PreparedStatement = con.prepareStatement("insert into students(id,name,age,gender,clazz) values(?,?,?,?,?)")//4、设置列值stat.setLong(1, split(0).toLong)stat.setString(2, split(1))stat.setLong(3, split(2).toLong)stat.setString(4, split(3))stat.setString(5, split(4))//5、执行插入stat.execute()//6、关闭链接stat.close()con.close()})*//*** foreachPartition: 一次遍历一个分区的数据* 如果需要将rdd的数据保存到外部数据库中,比如mysql,hbase,redis, 需要使用foreachPartition*/studentsRDD.foreachPartition(iter => {//1、加载启动Class.forName("com.mysql.jdbc.Driver")val start: Long = System.currentTimeMillis()//2、创建链接val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8", "root", "123456")val end: Long = System.currentTimeMillis()println(s"创建数据库的链接用了:${end - start}")//3、编写插入数据的sqlval stat: PreparedStatement = con.prepareStatement("insert into students(id,name,age,gender,clazz) values(?,?,?,?,?)")iter.foreach(stu => {val split: Array[String] = stu.split(",")//4、设置列值stat.setLong(1, split(0).toLong)stat.setString(2, split(1))stat.setLong(3, split(2).toLong)stat.setString(4, split(3))stat.setString(5, split(4))//5、执行插入stat.execute()})//6、关闭链接stat.close()con.close()})val endTIme: Long = System.currentTimeMillis()println(s"共用了:${endTIme - startTIme}")}
}

2.4 repartition(重分区)案例

package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSessionobject Demo5RePartition {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions", 1).getOrCreate()val sc: SparkContext = spark.sparkContextval studentsRDD: RDD[String] = sc.textFile("data/students.txt")println(s"studentsRDD分区数据:${studentsRDD.getNumPartitions}")/*** repartition: 对rdd重分区,返回一个新的rdd,  会产生shuffle* repartition可以用于增加分区和减少分区,* 增加分区可以增加并行度,在资源充足的情况下, 效率更高* 减少分区可以减少产生的小文件的数量**/val rePartRDD: RDD[String] = studentsRDD.repartition(10)println(s"rePartRDD分区数据:${rePartRDD.getNumPartitions}")/*** coalesceL 重分区,,可以设置是否产生shuffle* 如果指定shuffle为true,可以用于增加分区和减少分区* 如果指定shuffle为false,只能用于减少分区**/val coalesceRDD: RDD[String] = rePartRDD.coalesce(100, shuffle = true)println(s"coalesceRDD分区数据:${coalesceRDD.getNumPartitions}")/*** 当处理好的数据需要保存到磁盘的时候,如果产生了很多的小文件,可以使用coalesce合并小文件* 合并的标准:保证合并之后的每一个文件的大小在128M左右** 比如数据保存的数据是10G, 最好的情况是合并为80个** shuffle = false: 不产生shuffle,效率更好**/coalesceRDD.coalesce(1, shuffle = false) //合并小文件.saveAsTextFile("data/coalesce")}
}

3.广播大变量

开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如 100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提 升性能

函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络 传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如 100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节 点的Executor中占用过多内存导致的频繁GC(垃圾回收),都会极大地影响性能

如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播 后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的 task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本 的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低 GC的频率

广播大变量发送方式:Executor一开始并没有广播变量,而是task运行需要用到广 播变量,会找executor的blockManager要,bloackManager找Driver里面的 blockManagerMaster要

package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}object Demo6MapJoin {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions", 1).getOrCreate()val studentDF: DataFrame = spark.read.format("csv").option("sep", ",").schema("id STRING,name STRING,age INT,gender STRING,clazz STRING").load("data/students.txt")val scoreDF: DataFrame = spark.read.format("csv").option("sep", ",").schema("id STRING,cid STRING,score DOUBLE").load("data/score.txt")/*** studentDF.hint("broadcast"): 将小表广播出去** 当一个大表关联小表的时候,可以将小表广播出去,使用mapjoin* mapjoin 不会产生shuffle,可以提高关联的效率,小表一般要在1G以内** mapjoin 会产生两个job* 1、第一个job是将小表的数据拉取到Driver端,从Driver端广播到Executor端* 2、关联的job**/val joinDF: DataFrame = scoreDF.join(studentDF.hint("broadcast"), "id")joinDF.show()while (true) {}}
}

4.使用Kryo优化序列化性能(一般重要)

在Spark中,主要有三个地方涉及到了序列化:

  1. 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输
  2. 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,SXT是自定义类型),所有自 定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现 Serializable接口。
  3. 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个 partition都序列化成一个大的字节数组。
package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevelobject Demo7Kyyo {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions", 1)//序列化方式.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//指定注册序列化的类,自定义.config("spark.kryo.registrator", "com.shujia.spark.opt.Demo8KryoRegister").getOrCreate()val sc: SparkContext = spark.sparkContextval studentsRDD: RDD[String] = sc.textFile("data/students.txt")/*** 将rdd中一行数据转换成Student的对象**/val stuRDD: RDD[Student] = studentsRDD.map(stu => {val split: Array[String] = stu.split(",")Student(split(0), split(1), split(2).toInt, split(3), split(4))})/*** 不做使用序列化,数据是280K* 使用默认的序列化的方式: 数据是55K* 使用kryo进行序列化: 数据大小:43k*** spark sql 默认已经使用了kryo进行序列化, rdd没有使用,需要自己实现**/stuRDD.persist(StorageLevel.MEMORY_ONLY_SER)stuRDD.map(stu => (stu.clazz, 1)).reduceByKey(_ + _).map {case (clazz: String, num: Int) =>s"$clazz\t$num"}.foreach(println)/*** 统计性别的人数**/stuRDD.map(stu => (stu.gender, 1)).reduceByKey(_ + _).map {case (gender: String, num: Int) =>s"$gender\t$num"}.foreach(println)while (true) {}}case class Student(id: String, name: String, age: Int, gender: String, clazz: String)
}
上述的代码需要的工具类: KryoRegistrator
package com.shujia.spark.opt
import com.esotericsoftware.kryo.Kryo
import com.shujia.spark.opt.Demo7Kyyo.Student
import org.apache.spark.serializer.KryoRegistratorclass Demo8KryoRegister extends KryoRegistrator {override def registerClasses(kryo: Kryo): Unit = {/*** 在这个方法中将需要使用kryo进行序列化的类做一个注册**/kryo.register(classOf[Student])kryo.register(classOf[String])kryo.register(classOf[Int])}
}

spark-调优(代码层面)相关推荐

  1. Spark学习之Spark调优与调试(7)

    Spark学习之Spark调优与调试(7) 1. 对Spark进行调优与调试通常需要修改Spark应用运行时配置的选项. 当创建一个SparkContext时就会创建一个SparkConf实例. 2. ...

  2. rdd数据存内存 数据量_大数据开发-Spark调优常用手段

    Spark调优 spark调优常见手段,在生产中常常会遇到各种各样的问题,有事前原因,有事中原因,也有不规范原因,spark调优总结下来可以从下面几个点来调优. 1. 分配更多的资源 分配更多的资源: ...

  3. 跟我一起学【Spark】之——Spark调优与调试

    第8章 Spark调优与调试 1.总结Spark的配置机制 2.理解Spark应用性能表现的基础知识.设置相关配置项.编写高性能应用设计模式 3.探讨Spark的用户界面.执行的组成部分.日志机制 8 ...

  4. 【Spark调优】大表join大表,少数key导致数据倾斜解决方案

    [Spark调优]大表join大表,少数key导致数据倾斜解决方案 参考文章: (1)[Spark调优]大表join大表,少数key导致数据倾斜解决方案 (2)https://www.cnblogs. ...

  5. 【Spark调优】小表join大表数据倾斜解决方案

    [Spark调优]小表join大表数据倾斜解决方案 参考文章: (1)[Spark调优]小表join大表数据倾斜解决方案 (2)https://www.cnblogs.com/wwcom123/p/1 ...

  6. 【Spark篇】---Spark调优之代码调优,数据本地化调优,内存调优,SparkShuffle调优,Executor的堆外内存调优...

    一.前述 Spark中调优大致分为以下几种 ,代码调优,数据本地化,内存调优,SparkShuffle调优,调节Executor的堆外内存. 二.具体    1.代码调优 1.避免创建重复的RDD,尽 ...

  7. spark调优(一)-开发调优,数据倾斜,shuffle调优

    主要分为开发调优.资源调优.数据倾斜调优.shuffle调优几个部分. 开发调优和资源调优是所有Spark作业都需要注意和遵循的一些基本原则,是高性能Spark作业的基础:数据倾斜调优,主要讲解了一套 ...

  8. spark 写本地文件_(纯干货建议收藏)一次GC引发的Spark调优大全

    上一篇Tungsten On Spark-内存模型设计总结了Spark内存设计相关的知识点,本篇会快速为读者复习一下JVM相关的知识点,然后基于线上的GC调优对spark整体的调优做一个汇总,希望能让 ...

  9. 【Spark】Spark调优 资源调优

    文章目录 1 Spark内存模型 2.执行流程 3.资源调优 1 Spark内存模型 Spark在一个Executor的内存分为三块, 1. 一块是execution内存 2. 一块是Storge 内 ...

  10. Spark 调优技巧总结

    Spark 是大数据处理必备技术之一,在开发工作中必然会面对性能调优和各种问题故障的处理,那么面试官也最爱在这些方面进行机关枪式的提问,本 Chat 就针对当前实际开发工作中常遇到的热门和冷门问题进行 ...

最新文章

  1. c语言 文件截断,c – 是否有一种正统的方法来避免编译器警告C4309 – “二进制文件输出的”常数值的截断“?...
  2. Python3 使用[]提取字符
  3. Web服务器漏洞和安全
  4. Mac android studio升级时提示 :Connection failed. Please check your network connection .
  5. Flowable通过api查询流程返回流程图节点
  6. 实现二叉树的三种非递归遍历算法
  7. Git学习系列(七)Bug和Feature分支管理详解
  8. += 对于可变对象和不可变对象的区别
  9. 教务管理系统C++实现
  10. 两年前,梦开始的地方.
  11. vscode 配置python_VSCode配置Python版本
  12. Linux命令+shell脚本大全:文件系统的检查与修复
  13. 米氏散射多次散射计算程序
  14. 视频爆炸时代,谁在支撑视频生态网高速运行?
  15. ImageNet预训练参数和随机初始化参数训练效果对比
  16. Win10更改用户名
  17. 微型计算机中的i3和i5指的是,i3和i5的处理器有什么区别
  18. Gitbook 插件安装 - 导航目录折叠 chapter-fold
  19. 915Resolution补丁——支持“GM965”,G33, GM45 (GMA 4500MHD), GMA3150
  20. 计算机操作工 试题,计算机系统操作工试题

热门文章

  1. 【08月07日】A股滚动市盈率PE最低排名
  2. Unity5 0 天空盒 CubeMap
  3. 网络安全—2.1—设备原理与操作
  4. 一文玩转NGINX(对于NGINX,你真的了解吗?)
  5. ShardingSphere分库分表核心原理精讲第十二节 数据安全和脱敏详解
  6. 用html制作编写静态日志,[译] 编写一个小型静态网站生成器
  7. [渝粤教育] 西南科技大学 国际经济法 在线考试复习资料
  8. 分布式技术一周技术动态 2016-10-09
  9. 电脑上传网速怎么测试软件,Win7如何测试电脑上传速度?
  10. 学习bash第二版-前言