Spark coalesce算子
缩减分区
Test 1:
package test.wyh.wordcountimport org.apache.spark.{SparkConf, SparkContext}object TestCoalEsce {def main(args: Array[String]): Unit = {//建立Spark连接val sparkConf = new SparkConf().setMaster("local").setAppName("TestWordCountApp")val sc = new SparkContext(sparkConf)//原本4个分区val rdd = sc.makeRDD(List(1, 2, 3, 4), 4)//缩减为两个分区val coalesceRDD = rdd.coalesce(2)coalesceRDD.saveAsTextFile("output")//关闭连接sc.stop()}}
运行结果:
原来RDD中的前两个元素被分到了一个分区,后两个元素被分到了一个分区。
Test 2:
package test.wyh.wordcountimport org.apache.spark.{SparkConf, SparkContext}object TestCoalEsce {def main(args: Array[String]): Unit = {//建立Spark连接val sparkConf = new SparkConf().setMaster("local").setAppName("TestWordCountApp")val sc = new SparkContext(sparkConf)//设置3个分区val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)//缩减为两个分区val coalesceRDD = rdd.coalesce(2)coalesceRDD.saveAsTextFile("output")//关闭连接sc.stop()}}
运行结果:
好像并没有像我们想的那样会将原来RDD中的前三个元素放在一个分区,后三个元素放在一个分区。
出现这样的结果是因为,coalesce算子默认不会打乱原来分区内的数据,即不会拆散原来某个分区内的数据重新组合,这样也就会可能导致数据倾斜,所以我们可以加上第二个参数设置为true,来表示在缩减分区时允许数据重新组合(是否shuffle)来实现数据均衡。
Test 3:
package test.wyh.wordcountimport org.apache.spark.{SparkConf, SparkContext}object TestCoalEsce {def main(args: Array[String]): Unit = {//建立Spark连接val sparkConf = new SparkConf().setMaster("local").setAppName("TestWordCountApp")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)//参数二表示是否shuffleval coalesceRDD = rdd.coalesce(2, true)coalesceRDD.saveAsTextFile("output")//关闭连接sc.stop()}}
运行结果:
这样,缩减分区后的数据相对来讲就不会出现不平衡的情况。
增大分区
Test 4:
package test.wyh.wordcountimport org.apache.spark.{SparkConf, SparkContext}object TestCoalEsce {def main(args: Array[String]): Unit = {//建立Spark连接val sparkConf = new SparkConf().setMaster("local").setAppName("TestWordCountApp")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)//将分区个数扩大为3个val coalesceRDD = rdd.coalesce(3)coalesceRDD.saveAsTextFile("output")//关闭连接sc.stop()}}
运行结果:
可以发现,虽然我们使用coalesce算子将分区数设置为3,但是实际上生成的分区个数还是2个,这还是因为前面提到的coalesce算子默认是不会将分区内的数据打乱重新组合,那么此时扩大分区个数的配置相当于是无效的。
将参数二设置为true,使其进行shuffle,对数据进行重新组合,以实现分区个数扩大。
Test 5:
package test.wyh.wordcountimport org.apache.spark.{SparkConf, SparkContext}object TestCoalEsce {def main(args: Array[String]): Unit = {//建立Spark连接val sparkConf = new SparkConf().setMaster("local").setAppName("TestWordCountApp")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)//将分区个数扩大为3个,参数二设置为运行shuffleval coalesceRDD = rdd.coalesce(3, true)coalesceRDD.saveAsTextFile("output")//关闭连接sc.stop()}}
运行结果:
此时就可以实现分区个数扩大的功能。
对于扩大分区个数的功能,我们也可以直接使用repartition的方法,因为repartition方法中已经默认将shuffle设置为true了。
Test 6:
package test.wyh.wordcountimport org.apache.spark.{SparkConf, SparkContext}object TestCoalEsce {def main(args: Array[String]): Unit = {//建立Spark连接val sparkConf = new SparkConf().setMaster("local").setAppName("TestWordCountApp")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)//将分区个数扩大为3个,repartition底层调用的是coalesce,但是将shuffle默认设置为true了。val coalesceRDD = rdd.repartition(3)coalesceRDD.saveAsTextFile("output")//关闭连接sc.stop()}}
运行结果:
Spark coalesce算子相关推荐
- Spark源码之coalesce算子
1.问题背景 总所周知,spark的rdd编程中有两个算子repartition和coalesce.公开的资料上定义为,两者都是对spark分区数进行调整的算子. repartition会经过shuf ...
- spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子
目录 天小天:(一)Spark Streaming 算子梳理 - 简单介绍streaming运行逻辑 天小天:(二)Spark Streaming 算子梳理 - flatMap和mapPartitio ...
- Spark学习之Spark RDD算子
个人主页zicesun.com 这里,从源码的角度总结一下Spark RDD算子的用法. 单值型Transformation算子 map /*** Return a new RDD by applyi ...
- Spark action算子案例
在上篇文章中,我们对Spark中几种常用的transformation算子通过Java和Scala两种代码分别进行了案例演示,Spark transformation算子案例 而在本文中,我们将继续 ...
- Spark RDD算子(八)mapPartitions, mapPartitionsWithIndex
Spark RDD算子(八) mapPartitions scala版本 java版本 mapPartitionsWithIndex scala版本 java版本 mapPartitions mapP ...
- Spark部分算子及使用
Spark部分算子及使用 案例一:flatmap算子 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppN ...
- Spark转换算子大全以及案例实操
1.RDD 转换算子 RDD转换算子实际上就是换了名称的RDD方法 RDD 根据数据处理方式的不同将算子整体上分为 Value 类型.双 Value 类型和 Key-Value 类型 算子:Opera ...
- Spark Core 算子总结
目录 Transformation算子 map filter flatMap mapPartitions mapPartitionsWithIndex sample glom union inters ...
- Spark _30_SparkStreaming算子操作Driver HA
SparkStreaming算子操作 foreachRDD output operation算子,必须对抽取出来的RDD执行action类算子,代码才能执行. import org.apache.sp ...
最新文章
- 十一、springboot WebMvcConfigurer与HandlerInterceptorAdapter使用
- Android View系统解析(上)
- Linux find指令
- 任务调度与上下文切换时间测试
- 八段锦八个动作名称_八段锦工间操“动”起来 全民健身精气神“燃”起来
- oracle sql 导入mysql数据库备份_使用PL/SQL连接oracle数据库,并将数据进行导出备份和导入恢复...
- 网站日志分析工具:WebLog Expert Lite
- 为什么 Math.min() 比 Math.max() 大?
- mac Parallels Desktop安装ubuntu教程
- Java下载excel文件并且添加水印效果
- CDR真实图片转水墨画效果制作教程
- linux 内核死机 堆栈,高通平台Linux kernel死机解题心得
- 第三方直播SDK对比|直播SDK如何选型
- 理解BPDU Filtering的意义(BPDU Filtering在全局配置与接口配置上的区别)
- 收发电子邮件属于计算机在方面的应用( ),收发电子邮件属于计算机在( )方面的应用...
- 社区发现(社团检测)模块度Modularity详细介绍
- 手机上怎么在线生成gif?1分钟教你手机图片合成gif
- 1 入门:投身新领域
- 【1】DICOM图像格式
- html css好看的提示框,div对话框,js+div+css实现好看的提示框效果(转)