Key-Value类型

partitionBy案例

  1. 作用:对pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区,否则会生成ShuffleRDD,即会产生shuffle过程。
  2. 需求:创建一个4个分区的RDD,对其重新分区

(1)创建一个RDD

scala> val rdd = sc.parallelize(Array((1,”aaa”),(2,”bbb”),(3,”ccc”),(4,”ddd”)),4)

rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at <console>:24

(2)查看RDD的分区数

scala> rdd.partitions.size

res24: Int = 4

(3)对RDD重新分区

scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))

rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at partitionBy at <console>:26

(4)查看新RDD的分区数

scala> rdd2.partitions.size

res25: Int = 2

reduceByKey(func, [numTasks]) 案例

  1. 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
  2. 需求:创建一个pairRDD,计算相同key对应值的相加结果

(1)创建一个pairRDD

scala> val rdd = sc.parallelize(List((“female”,1),(“male”,5),(“female”,5),(“male”,2)))

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:24

(2)计算相同key对应值的相加结果

scala> val reduce = rdd.reduceByKey((x,y) => x+y)

reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at <console>:26

(3)打印结果

scala> reduce.collect()

res29: Array[(String, Int)] = Array((female,6), (male,7))

groupByKey案例

  1. 作用:groupByKey也是对每个key进行操作,但只生成一个seq。
  2. 需求:创建一个pairRDD,将相同key对应值聚合到一个seq中,并计算相同key对应值的相加结果。

(1)创建一个pairRDD

scala> val words = Array(“one”, “two”, “two”, “three”, “three”, “three”)

words: Array[String] = Array(one, two, two, three, three, three)

scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))

wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26

(2)将相同key对应值聚合到一个Seq中

scala> val group = wordPairsRDD.groupByKey()

group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:28

(3)打印结果

scala> group.collect()

res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))

(4)计算相同key对应值的相加结果

scala> group.map(t => (t._1, t._2.sum))

res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:31

(5)打印结果

scala> res2.collect()

res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3))

reduceByKey和groupByKey的区别

  1. reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。
  2. groupByKey:按照key进行分组,直接进行shuffle。
  3. 开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。

aggregateByKey案例

参数:(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)

  1. 作用:在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
  2. 参数描述:

(1)zeroValue:给每一个分区中的每一种key一个初始值;

(2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;

(3)combOp:函数用于合并每个分区中的结果。

  1. 需求:创建一个pairRDD,取出每个分区相同key对应值的最大值,然后相加
  2. 需求分析

图1-aggregate案例分析

(1)创建一个pairRDD

scala> val rdd = sc.parallelize(List((“a”,3),(“a”,2),(“c”,4),(“b”,3),(“c”,6),(“c”,8)),2)

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

(2)取出每个分区相同key对应值的最大值,然后相加

scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)

agg: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[1] at aggregateByKey at <console>:26

(3)打印结果

scala> agg.collect()

res0: Array[(String, Int)] = Array((b,3), (a,3), (c,12))

foldByKey案例

参数:(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

  1. 作用:aggregateByKey的简化操作,seqop和combop相同
  2. 需求:创建一个pairRDD,计算相同key对应值的相加结果

(1)创建一个pairRDD

scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[91] at parallelize at <console>:24

(2)计算相同key对应值的相加结果

scala> val agg = rdd.foldByKey(0)(_+_)

agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[92] at foldByKey at <console>:26

(3)打印结果

scala> agg.collect()

res61: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))

combineByKey[C] 案例

参数:(createCombiner: V => C,  mergeValue: (C, V) => C,  mergeCombiners: (C, C) => C)

  1. 作用:针对相同K,将V合并成一个集合。
  2. 参数描述:

(1)createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值

(2)mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并

(3)mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。

  1. 需求:创建一个pairRDD,根据key计算每种key的均值。(先计算每个key出现的次数以及可以对应值的总和,再相除得到结果)
  2. 需求分析:

图2- combineByKey案例分析

(1)创建一个pairRDD

scala> val input = sc.parallelize(Array((“a”, 88), (“b”, 95), (“a”, 91), (“b”, 93), (“a”, 95), (“b”, 98)),2)

input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[52] at parallelize at <console>:26

(2)将相同key对应的值相加,同时记录该key出现的次数,放入一个二元组

scala> val combine = input.combineByKey((_,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2))

combine: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[5] at combineByKey at <console>:28

(3)打印合并后的结果

scala> combine.collect

res5: Array[(String, (Int, Int))] = Array((b,(286,3)), (a,(274,3)))

(4)计算平均值

scala> val result = combine.map{case (key,value) => (key,value._1/value._2.toDouble)}

result: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[54] at map at <console>:30

(5)打印结果

scala> result.collect()

res33: Array[(String, Double)] = Array((b,95.33333333333333), (a,91.33333333333333))

sortByKey([ascending], [numTasks]) 案例

  1. 作用:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
  2. 需求:创建一个pairRDD,按照key的正序和倒序进行排序

(1)创建一个pairRDD

scala> val rdd = sc.parallelize(Array((3,”aa”),(6,”cc”),(2,”bb”),(1,”dd”)))

rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:24

(2)按照key的正序

scala> rdd.sortByKey(true).collect()

res9: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))

(3)按照key的倒序

scala> rdd.sortByKey(false).collect()

res10: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))

mapValues案例

  1. 针对于(K,V)形式的类型只对V进行操作
  2. 需求:创建一个pairRDD,并将value添加字符串”|||”

(1)创建一个pairRDD

scala> val rdd3 = sc.parallelize(Array((1,”a”),(1,”d”),(2,”b”),(3,”c”)))

rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at <console>:24

(2)对value添加字符串”|||”

scala> rdd3.mapValues(_+”|||”).collect()

res26: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))

join(otherDataset, [numTasks]) 案例

  1. 作用:在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
  2. 需求:创建两个pairRDD,并将key相同的数据聚合到一个元组。

(1)创建第一个pairRDD

scala> val rdd = sc.parallelize(Array((1,”a”),(2,”b”),(3,”c”)))

rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at parallelize at <console>:24

(2)创建第二个pairRDD

scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(4,6)))

rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24

(3)join操作并打印结果

scala> rdd.join(rdd1).collect()

res13: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))

cogroup(otherDataset, [numTasks]) 案例

  1. 作用:在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
  2. 需求:创建两个pairRDD,并将key相同的数据聚合到一个迭代器。

(1)创建第一个pairRDD

scala> val rdd = sc.parallelize(Array((1,”a”),(2,”b”),(3,”c”)))

rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[37] at parallelize at <console>:24

(2)创建第二个pairRDD

scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))

rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:24

(3)cogroup两个RDD并打印结果

scala> rdd.cogroup(rdd1).collect()

res14: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。

大数据培训之Key-Value类型相关推荐

  1. 大数据培训ClickHouse表引擎

    表引擎 表引擎(即表的类型)决定了: 1)数据的存储方式和位置,写到哪里以及从哪里读取数据 2)支持哪些查询以及如何支持. 3)并发数据访问. 4)索引的使用(如果存在). 5)是否可以执行多线程请求 ...

  2. 大数据培训Spark 高频面试考点分享

    1.Spark 如何保证宕机迅速恢复? 适当增加 spark standby master 编写 shell 脚本,定期检测 master 状态,出现宕机后对 master 进行重启操作 2. Spa ...

  3. 大数据培训:Spark 性能调优详解

    调优概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时Spark作业的性能会比期望差很多.数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的 ...

  4. 深圳大数据培训:Transformation算子演示

    深圳大数据培训:Transformation算子演示 val conf = new SparkConf().setAppName("Test").setMaster("l ...

  5. 大数据培训课程之countByKey()案例

    大数据培训课程saveAsObjectFile(path) 作用:用于将RDD中的元素序列化成对象,存储到文件中. 大数据培训课程 countByKey()案例 作用:针对(K,V)类型的RDD,返回 ...

  6. 大数据培训:HiveSQL技术优化与面试

    Hive SQL 编译成MapReduce过程 编译 SQL 的任务是在上节中介绍的 COMPILER(编译器组件)中完成的.Hive将SQL转化为MapReduce任务,整个编译过程分为六个阶段: ...

  7. 北京大数据培训 | 电商用户行为分析之实时流量统计

    模块创建和数据准备 在 UserBehaviorAnalysis 下 新 建 一 个 maven module 作 为 子 项 目 , 命 名 为NetworkFlowAnalysis.在这个子模块中 ...

  8. 泰迪云课堂大数据培训平台业务介绍

    泰迪云课堂大数据培训业务分为几个类型,包括就业培训班.在线实习.大数据推荐课程.图书配套视频课程 .技能提升等方面.        就业培训班包括:学徒班.线下就业班.项目班      学徒班包括:大 ...

  9. 大数据培训:Hadoop HDFS 实现原理

    一.HDFS体系结构 1.1 HDFS 简介 Hadoop分布式文件系统 (HDFS) 是运行在通用硬件(commodity hardware)上的分布式文件系统(Distributed File S ...

最新文章

  1. 新特效火爆抖音!各路神仙齐唱《蚂蚁呀嘿》,网友:短短几秒需一生来治愈...
  2. ARM NEON 编程简单入门1
  3. 【转载】程序员有哪些电脑技能让外行感到神奇?
  4. 1.25 包(package)详解
  5. 十、Go协程的调度,互斥锁,计数器和线程池
  6. 【存储过程】MySQL存储过程/存储过程与自定义函数的区别
  7. 靠一强过Excel的工具,3年变成女领导,还把报表做成养老工作
  8. python-面向对象-05_面向对象封装案例 II
  9. 大页内存的使用:HugePages(大内存页)的原理与使用
  10. 区块链第三方支付已登陆菲律宾
  11. 利用客户端cookie保存用户信息
  12. Tiny服务的开发配套的工具来了
  13. java拦截器放行_java拦截器放行某些请求
  14. 10个炫酷特效的网页写法(附源码),拿去就能用,nice
  15. mui 框架跨域_MUI框架学习(5)–AJAX跨域问题
  16. 关于使用Cobalt Strike制作宏病毒
  17. Lua 程序设计——Lua 教程01
  18. Windows下用命令行注销用户(包括注销其他用户)
  19. VS Code 呈现缩进参考线以及语法高亮改变
  20. 使用wireshark抓取聊天信息与爬虫入门

热门文章

  1. 计算机网络复习--名词解释
  2. 上海联想计算机维修,联想笔记本在上海的官方修理点在哪里?
  3. 计算机图形Opengl的实验报告,opengl计算机图形学实验报告之3D漫游世界.doc
  4. 蓝桥杯嵌入式--LCD屏幕使用提升
  5. 自然语言处理与信息与计算科学专业的联系
  6. 软件工程 第七章:活动图
  7. What‘s Wrong with Copying?
  8. 数字化战略,如何解读企业财务报表
  9. 统计 flv视频总时长
  10. HTML和CSS之移动端