groupBy

返回集合类为元组,元组的第一个元素为分组元素,第二个元素为Iterable类型,这种类型可以转List什么的

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

例子
一维数组

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
//依据是否能被2整除进行分组,分组字段可以自定义
scala> a.groupBy(x=>{if(x%2==0) "even" else "odd"}).collect
res0: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6, 8)), (odd,CompactBuffer(1, 3, 5, 7, 9)))
//分组后的数据不能用map处理
scala> a.groupBy(x=>{if(x%2==0) "even" else "odd"}).map(_.toList).collect
<console>:27: error: value toList is not a member of (String, Iterable[Int])a.groupBy(x=>{if(x%2==0) "even" else "odd"}).map(_.toList).collect^
//分组后的数据可以用mapValues进一步处理
scala> a.groupBy(x=>{if(x%2==0) "even" else "odd"}).mapValues(_.toList).collect
res2: Array[(String, List[Int])] = Array((even,List(2, 4, 6, 8)), (odd,List(1, 3, 5, 7, 9)))scala> a.groupBy(x=>{x%2==0}).mapValues(_.toList).collect
res3: Array[(Boolean, List[Int])] = Array((false,List(1, 3, 5, 7, 9)), (true,List(2, 4, 6, 8)))
scala> val rdd1=sc.parallelize(List((1,"a"),(2,"b"),(1,"c"),(2,"d")),2)//依据元组的第一个元素进行分组,注意分组后value是个元组.(分组元素,迭代器) 如果要对该迭代器进一步处理,要用mapValues,如果用map,处理的就是整个元组,而不只是元组的第二个元素了
scala> rdd1.groupBy(_._1).collect
res55: Array[(Int, Iterable[(Int, String)])] = Array((2,CompactBuffer((2,b), (2,d))), (1,CompactBuffer((1,a), (1,c))))//处理groupBy后生成的迭代器.转为List
scala> rdd1.groupBy(_._1).mapValues(_.toList).collect
res56: Array[(Int, List[(Int, String)])] = Array((2,List((2,b), (2,d))), (1,List((1,a), (1,c))))scala> rdd1.groupBy(_._1).mapValues(_.toArray).collect
res57: Array[(Int, Array[(Int, String)])] = Array((2,Array((2,b), (2,d))), (1,Array((1,a), (1,c))))scala> rdd1.groupBy(_._1).mapValues(_.toSet).collect
res58: Array[(Int, scala.collection.immutable.Set[(Int, String)])] = Array((2,Set((2,b), (2,d))), (1,Set((1,a), (1,c))))

xxByKey

  • groupByKey是groupBy的特殊形式,就是key不用指定,因为kv对,k是已知的,所以参数不用传输k
  • reduceByKey 分组后还可以聚合,聚合函数可以自定义
  • aggregateByKey分组后还可以聚合,聚合函数可以自定义,关键还可以有初始值
scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)//依据list构建kv对
scala> val b = a.keyBy(_.length)
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[10] at keyBy at <console>:26scala> b.collect
res4: Array[(Int, String)] = Array((3,dog), (5,tiger), (4,lion), (3,cat), (6,spider), (5,eagle))
//依据kv对中k进行分组,形成新的kv对
scala> b.groupByKey.collect
res5: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))
//转化新的kv对中的v为List
scala> b.groupByKey.mapValues(_.toList).collect
res6: Array[(Int, List[String])] = Array((4,List(lion)), (6,List(spider)), (3,List(dog, cat)), (5,List(tiger, eagle)))
//转化新的kv对中的v为Array
scala> b.groupByKey.mapValues(_.toArray).collect
res7: Array[(Int, Array[String])] = Array((4,Array(lion)), (6,Array(spider)), (3,Array(dog, cat)), (5,Array(tiger, eagle)))//reduceByKey 分组后还可以聚合,聚合函数可以自定义
scala> b.reduceByKey(_+_).collect
res8: Array[(Int, String)] = Array((4,lion), (6,spider), (3,dogcat), (5,tigereagle))
//reduceByKey 分组后还可以聚合,聚合函数可以自定义
scala> b.reduceByKey(_+" "+_).collect
res9: Array[(Int, String)] = Array((4,lion), (6,spider), (3,dog cat), (5,tiger eagle))scala> b.aggregate
aggregate   aggregateByKey
//aggregateByKey分组后还可以聚合,聚合函数可以自定义,关键还可以有初始值
scala> b.aggregateByKey("|")(_+_).collect
<console>:29: error: not enough arguments for method aggregateByKey: (seqOp: (String, String) => String, combOp: (String, String) => String)(implicit evidence$3: scala.reflect.ClassTag[String])org.apache.spark.rdd.RDD[(Int, String)].
Unspecified value parameter combOp.b.aggregateByKey("|")(_+_).collect^
// 2个参数列表 不是3个
scala> b.aggregateByKey("|")(_+_)(_+_).collect
<console>:29: error: not enough arguments for method aggregateByKey: (seqOp: (String, String) => String, combOp: (String, String) => String)(implicit evidence$3: scala.reflect.ClassTag[String])org.apache.spark.rdd.RDD[(Int, String)].
Unspecified value parameter combOp.b.aggregateByKey("|")(_+_)(_+_).collect^
//2个参数列表,初始值,分区内聚合函数,分区间聚合函数
scala> b.aggregateByKey("|")(_+_,_+_).collect
res12: Array[(Int, String)] = Array((4,|lion), (6,|spider), (3,|dog|cat), (5,|tiger|eagle))

总结

  • groupByKey是groupBy的特殊形式,groupBy底层调用了groupByKey
  • 如果是kv对.选groupByKey, 也可以用groupBy ,可以用方法取元组的值
  • 如果是一维,选择groupBy.

rdd分组聚合算子xxByKey,xxBy相关推荐

  1. spark笔记之RDD常用的算子操作

    Spark Rdd的所有算子操作,请见<sparkRDD函数详解.docx> 启动spark-shell 进行测试: spark-shell --master spark://node1: ...

  2. spark之RDD的转换算子与行为算子的具体使用

    文章目录 1.Transform算子 1.1 map 1.2 flatmap 1.3 groupBy和groupBykey 1.4 filter 1.5 Mappartitions 1.6 mapVa ...

  3. mysql分组查询和子查询语句_6.MySQL分组聚合查询,子查询

    自己的MySQL阅读笔记,持续更新,直到看书结束. 数据库技术可以有效帮助一个组织或者企业科学.有效的管理数据,也是现在很多企业招聘数据分析师的必备要求之一. 大家如果看过MySQL的书,也可以看我的 ...

  4. 【原创】StreamInsight查询系列(六)——基本查询操作之分组聚合

    上篇博文介绍了StreamInsight基础查询操作中的用户自定义聚合部分.这篇文章将主要介绍如何在StreamInsight查询中使用分组聚合. 测试数据准备 为了方便测试查询,我们首先准备一个静态 ...

  5. pandas使用groupby函数进行分组聚合、使用agg函数指定聚合统计计算的数值变量、并自定义统计计算结果的名称(naming columns after aggregation)

    pandas使用groupby函数进行分组聚合.使用agg函数指定聚合统计计算的数值变量.并自定义统计计算结果的名称(naming columns after aggregation in dataf ...

  6. pandas使用groupby函数进行分组聚合并使用agg函数将每个分组特定变量对应的多个内容组合到一起输出(merging content within a specific column of g

    pandas使用groupby函数进行分组聚合并使用agg函数将每个分组特定变量对应的多个内容组合到一起输出(merging content within a specific column of g ...

  7. pandas使用groupby函数对dataframe进行分组统计、使用as_index参数设置分组聚合的结果中分组变量不是dataframe的索引(index)

    pandas使用groupby函数对dataframe进行分组统计.使用as_index参数设置分组聚合的结果中分组变量不是dataframe的索引(index) 目录

  8. pandas使用groupby函数按照多个分组变量进行分组聚合统计、使用agg函数计算分组的多个统计指标(grouping by multiple columns in dataframe)

    pandas使用groupby函数按照多个分组变量进行分组聚合统计.使用agg函数计算分组的多个统计指标(grouping by multiple columns in dataframe) 目录

  9. pandas使用groupby函数、agg函数获取每个分组聚合对应的标准差(std)实战:计算分组聚合单数据列的标准差(std)、计算分组聚合多数据列的标准差(std)

    pandas使用groupby函数.agg函数获取每个分组聚合对应的标准差(std)实战:计算分组聚合单数据列的标准差(std).计算分组聚合多数据列的标准差(std) 目录

最新文章

  1. windos 2008 vista 下的端口范围改变
  2. PokeCats开发者日志(一)
  3. docker运行mywebsql
  4. 谷歌发布TensorFlow 1.4与TensorFlow Lattice:利用先验知识提升模型准确度 搜狐科技 10-12 15:29 选自:Google Research Blog 参与:李泽南、
  5. 保存Activity的状态
  6. vim环境设置(应用于python编程)
  7. php 数组转json乱码,php将数组转为json涌现中文乱码怎么办_后端开发
  8. JVM学习-类文件结构
  9. 360无痕浏览器_功能强大好用的浏览器
  10. 华为盒子m330能生鸿蒙吗,华为盒子M330你不知道却很流弊的六个功能!
  11. 汇丰银行借沟通CTBS远程管理帐务
  12. NETGEAR R7000 更新固件失败 使用TTL-USB修复教程
  13. 期货开户后需要银期转账绑定
  14. 【论文笔记】AutoML: A survey of the state-of-the-art(下篇)
  15. 网络克隆自动修改计算机名ip,网络克隆之自动更改IP地址和计算机名
  16. stanza和DBPedia的安装与使用
  17. 关于最近word模板以及word转pdf的总结
  18. python 学籍管理系统的简单实现
  19. Yii Framework 开发教程(31) Zii组件-DetailView 示例
  20. 【一种利用插值验证的FL隐私保护框架】VFL: A Verifiable Federated Learning

热门文章

  1. java 模板组件_9Tile模板和Tile组件创建复合式网页
  2. android sdk build tools 版本,SDK Build Tools 版本说明
  3. Android移动应用基础教程【Android事件处理】
  4. php基础语法了解,PHP基础语法
  5. jsonready onload 与_漫谈JSONP以及img的onLoad和onEr
  6. retrofit content-length为0_Retrofit 源码剖析
  7. matlab中k-means算法_机器学习 | KMeans聚类分析详解
  8. 最全下载jar包的网站
  9. java序列化和反序列化对象_java中的序列化与反序列化,还包括将多个对象序列化到一个文件中...
  10. excel设置单元格整数后还是有小数点_一招教你统一解决excel单元格的单位问题!...