缩减分区

Test 1:

package test.wyh.wordcountimport org.apache.spark.{SparkConf, SparkContext}object TestCoalEsce {def main(args: Array[String]): Unit = {//建立Spark连接val sparkConf = new SparkConf().setMaster("local").setAppName("TestWordCountApp")val sc = new SparkContext(sparkConf)//原本4个分区val rdd = sc.makeRDD(List(1, 2, 3, 4), 4)//缩减为两个分区val coalesceRDD = rdd.coalesce(2)coalesceRDD.saveAsTextFile("output")//关闭连接sc.stop()}}

运行结果:

原来RDD中的前两个元素被分到了一个分区,后两个元素被分到了一个分区。

Test 2:

package test.wyh.wordcountimport org.apache.spark.{SparkConf, SparkContext}object TestCoalEsce {def main(args: Array[String]): Unit = {//建立Spark连接val sparkConf = new SparkConf().setMaster("local").setAppName("TestWordCountApp")val sc = new SparkContext(sparkConf)//设置3个分区val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)//缩减为两个分区val coalesceRDD = rdd.coalesce(2)coalesceRDD.saveAsTextFile("output")//关闭连接sc.stop()}}

运行结果:

好像并没有像我们想的那样会将原来RDD中的前三个元素放在一个分区,后三个元素放在一个分区。

出现这样的结果是因为,coalesce算子默认不会打乱原来分区内的数据,即不会拆散原来某个分区内的数据重新组合,这样也就会可能导致数据倾斜,所以我们可以加上第二个参数设置为true,来表示在缩减分区时允许数据重新组合(是否shuffle)来实现数据均衡。

Test 3:

package test.wyh.wordcountimport org.apache.spark.{SparkConf, SparkContext}object TestCoalEsce {def main(args: Array[String]): Unit = {//建立Spark连接val sparkConf = new SparkConf().setMaster("local").setAppName("TestWordCountApp")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)//参数二表示是否shuffleval coalesceRDD = rdd.coalesce(2, true)coalesceRDD.saveAsTextFile("output")//关闭连接sc.stop()}}

运行结果:

这样,缩减分区后的数据相对来讲就不会出现不平衡的情况。

增大分区

Test 4:

package test.wyh.wordcountimport org.apache.spark.{SparkConf, SparkContext}object TestCoalEsce {def main(args: Array[String]): Unit = {//建立Spark连接val sparkConf = new SparkConf().setMaster("local").setAppName("TestWordCountApp")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)//将分区个数扩大为3个val coalesceRDD = rdd.coalesce(3)coalesceRDD.saveAsTextFile("output")//关闭连接sc.stop()}}

运行结果:

可以发现,虽然我们使用coalesce算子将分区数设置为3,但是实际上生成的分区个数还是2个,这还是因为前面提到的coalesce算子默认是不会将分区内的数据打乱重新组合,那么此时扩大分区个数的配置相当于是无效的。

将参数二设置为true,使其进行shuffle,对数据进行重新组合,以实现分区个数扩大。

Test 5:

package test.wyh.wordcountimport org.apache.spark.{SparkConf, SparkContext}object TestCoalEsce {def main(args: Array[String]): Unit = {//建立Spark连接val sparkConf = new SparkConf().setMaster("local").setAppName("TestWordCountApp")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)//将分区个数扩大为3个,参数二设置为运行shuffleval coalesceRDD = rdd.coalesce(3, true)coalesceRDD.saveAsTextFile("output")//关闭连接sc.stop()}}

运行结果:

此时就可以实现分区个数扩大的功能。

对于扩大分区个数的功能,我们也可以直接使用repartition的方法,因为repartition方法中已经默认将shuffle设置为true了。

Test 6:

package test.wyh.wordcountimport org.apache.spark.{SparkConf, SparkContext}object TestCoalEsce {def main(args: Array[String]): Unit = {//建立Spark连接val sparkConf = new SparkConf().setMaster("local").setAppName("TestWordCountApp")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)//将分区个数扩大为3个,repartition底层调用的是coalesce,但是将shuffle默认设置为true了。val coalesceRDD = rdd.repartition(3)coalesceRDD.saveAsTextFile("output")//关闭连接sc.stop()}}

运行结果:

Spark coalesce算子相关推荐

  1. Spark源码之coalesce算子

    1.问题背景 总所周知,spark的rdd编程中有两个算子repartition和coalesce.公开的资料上定义为,两者都是对spark分区数进行调整的算子. repartition会经过shuf ...

  2. spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子

    目录 天小天:(一)Spark Streaming 算子梳理 - 简单介绍streaming运行逻辑 天小天:(二)Spark Streaming 算子梳理 - flatMap和mapPartitio ...

  3. Spark学习之Spark RDD算子

    个人主页zicesun.com 这里,从源码的角度总结一下Spark RDD算子的用法. 单值型Transformation算子 map /*** Return a new RDD by applyi ...

  4. Spark action算子案例

    在上篇文章中,我们对Spark中几种常用的transformation算子通过Java和Scala两种代码分别进行了案例演示,Spark transformation算子案例  而在本文中,我们将继续 ...

  5. Spark RDD算子(八)mapPartitions, mapPartitionsWithIndex

    Spark RDD算子(八) mapPartitions scala版本 java版本 mapPartitionsWithIndex scala版本 java版本 mapPartitions mapP ...

  6. Spark部分算子及使用

    Spark部分算子及使用 案例一:flatmap算子 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppN ...

  7. Spark转换算子大全以及案例实操

    1.RDD 转换算子 RDD转换算子实际上就是换了名称的RDD方法 RDD 根据数据处理方式的不同将算子整体上分为 Value 类型.双 Value 类型和 Key-Value 类型 算子:Opera ...

  8. Spark Core 算子总结

    目录 Transformation算子 map filter flatMap mapPartitions mapPartitionsWithIndex sample glom union inters ...

  9. Spark _30_SparkStreaming算子操作Driver HA

    SparkStreaming算子操作 foreachRDD output operation算子,必须对抽取出来的RDD执行action类算子,代码才能执行. import org.apache.sp ...

最新文章

  1. 十一、springboot WebMvcConfigurer与HandlerInterceptorAdapter使用
  2. Android View系统解析(上)
  3. Linux find指令
  4. 任务调度与上下文切换时间测试
  5. 八段锦八个动作名称_八段锦工间操“动”起来 全民健身精气神“燃”起来
  6. oracle sql 导入mysql数据库备份_使用PL/SQL连接oracle数据库,并将数据进行导出备份和导入恢复...
  7. 网站日志分析工具:WebLog Expert Lite
  8. 为什么 Math.min() 比 Math.max() 大?
  9. mac Parallels Desktop安装ubuntu教程
  10. Java下载excel文件并且添加水印效果
  11. CDR真实图片转水墨画效果制作教程
  12. linux 内核死机 堆栈,高通平台Linux kernel死机解题心得
  13. 第三方直播SDK对比|直播SDK如何选型
  14. 理解BPDU Filtering的意义(BPDU Filtering在全局配置与接口配置上的区别)
  15. 收发电子邮件属于计算机在方面的应用( ),收发电子邮件属于计算机在( )方面的应用...
  16. 社区发现(社团检测)模块度Modularity详细介绍
  17. 手机上怎么在线生成gif?1分钟教你手机图片合成gif
  18. 1 入门:投身新领域
  19. 【1】DICOM图像格式
  20. html css好看的提示框,div对话框,js+div+css实现好看的提示框效果(转)

热门文章

  1. Jmeter接口测试+参数化
  2. 面向对象的原则之替换原则
  3. 获取指定N个工作日后的日期
  4. 免费下载Word模板 自己动手做日历
  5. 烧写linux系统到开发板中,【嵌入式开发】向开发板中烧写Linux系统
  6. Smart3D,CCC软件制作三维模型快速筛选出重点区域或大面积项目分割建模方法
  7. 常见的计算机系统可靠性数学模型,系统可靠性与失效率(串并联)的计算题
  8. 读书笔记——《腾讯传》
  9. 【FPGA基础】基于 Pango Design Suite(PDS) 的FPGA开发流程
  10. shell脚本打印三角形