聚合

DataFrames可以提供共同聚合,例如count(),countDistinct(),avg(),max(),min()等。虽然这些功能是专为DataFrames,星火SQL还拥有类型安全的版本,在其中的一些 斯卡拉和 Java的使用强类型数据集的工作。此外,用户不限于预定义的聚合函数,并且可以创建自己的聚合函数。

无用户定义的聚合函数

扩展UserDefinedAggregateFunction 抽象类以实现自定义无类型聚合函数。
例如,用户定义的平均值:
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._object MyAverage extends UserDefinedAggregateFunction {// 集合函数的输入类型def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)// 集合缓冲区的数据类型def bufferSchema: StructType = {StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)}//返回值的数据类型def  dataType : DataType  =  DoubleType
//确定是否返回相同的输出def deterministic: Boolean = true
//初始化给定的聚合缓冲区。缓冲区本身是一个`Row`,除了标准方法之外,比如在索引处检索值(例如,get(),getBoolean()),提供了更新其值的机会。请注意,缓冲区内的数组和映射仍然是不可变的。def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0Lbuffer(1) = 0L}// 更新数据到指定的聚合缓冲区`bufferdef update(buffer: MutableAggregationBuffer, input: Row): Unit = {if (!input.isNullAt(0)) {buffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1}}// 合并两个聚合缓冲剂和存储更新的缓冲器值回`buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}// 计算最终结果def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}// 注册访问的函数
spark.udf.register("myAverage", MyAverage)val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

用户定义聚合函数

强类型数据集的用户定义聚合围绕Aggregator抽象类。
例如,类型安全的用户定义平均值所示:
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregatorcase class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)object MyAverage extends Aggregator[Employee, Average, Double] {// 此聚合的零值。应该满足任何b + zero = b def zero: Average = Average(0L, 0L)// 组合两个值以产生新值。为了提高性能,该函数可以修改`buffer` //并返回它而不是构造一个新的对象def reduce(buffer: Average, employee: Employee): Average = {buffer.sum += employee.salarybuffer.count += 1buffer}// 合并两个中间值def merge(b1: Average, b2: Average): Average = {b1.sum += b2.sumb1.count += b2.countb1}// 转换减少def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count// 指定中间件类型def bufferEncoder: Encoder[Average] = Encoders.product// 为最终输出值类型指定编码器def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+// 将函数转换为`TypedColumn`并给它命名
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

Spark Sql 聚合相关推荐

  1. spark sql 查看分区_Spark 3.0 中七个必须知道的 SQL 性能优化

    本文来自 IBM 东京研究院的高级技术人员 Kazuaki Ishizaki 博士在 Spark Summit North America 2020 的 <SQL Performance Imp ...

  2. Spark基础:(六)Spark SQL

    1.相关介绍 Datasets:一个 Dataset 是一个分布式的数据集合 Dataset 是在 Spark 1.6 中被添加的新接口, 它提供了 RDD 的优点(强类型化, 能够使用强大的 lam ...

  3. Spark SQL玩起来

    标签(空格分隔): Spark [toc] 前言 Spark SQL的介绍只包含官方文档的Getting Started.DataSource.Performance Tuning和Distribut ...

  4. Spark SQL Catalyst源代码分析Optimizer

    /** Spark SQL源代码分析系列*/ 前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将具体解说S ...

  5. Spark SQL 函数全集

    org.apache.spark.sql.functions是一个Object,提供了约两百多个函数. 大部分函数与Hive的差不多. 除UDF函数,均可在spark-sql中直接使用. 经过impo ...

  6. Spark性能优化 -- Spark SQL、DataFrame、Dataset

    本文将详细分析和总结Spark SQL及其DataFrame.Dataset的相关原理和优化过程. Spark SQL简介 Spark SQL是Spark中 具有 大规模关系查询的结构化数据处理 模块 ...

  7. 如何查询spark版本_掌握Spark SQL中的查询执行

    了解您的查询计划 自从Spark 2.x以来,由于SQL和声明性DataFrame API,在Spark中查询数据已成为一种奢侈. 仅使用几行高级代码就可以表达非常复杂的逻辑并执行复杂的转换. API ...

  8. Spark SQL之queryExecution运行流程解析Logical Plan(三)

    1.整体运行流程 使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程 // sc is an existing SparkCont ...

  9. spark sql中的窗口函数

    2019独角兽企业重金招聘Python工程师标准>>> databricks博客给出的窗口函数概述 Spark SQL supports three kinds of window ...

最新文章

  1. Win10 太火,骗子也来打劫了
  2. 数据结构 结构的声明 一个结构作为另一个结构的成员 单向链表的实现 双向链表的实现
  3. 【夸QT十一】外来物品:通用脚本帮助Web运行基础Linux命令
  4. Buuctf(pwn) ciscn_2019_n_5
  5. 【机器学习基础】数学推导+纯Python实现机器学习算法26:随机森林
  6. 许式伟《Go语言编程》章节摘录:Go语言简史
  7. android gradle权威指南pdf_干货 | 携程 Android 10适配踩坑指南
  8. 躲开职业生涯的“甜蜜陷阱”
  9. RedisTemplate中opsForValue()中的方法
  10. 泰坦尼克号数据_数据分析-泰坦尼克号乘客生存率预测
  11. springboot 微服务_使用 Docker 部署 Spring Boot微服务
  12. 轻松实现 CTreeCtrl 的全选与反选
  13. 2021-06-01 深入分析锁升级流程的基础
  14. OEL修改字符集失败 -bash: /root: is a directory
  15. textarea 滚动条属性设置
  16. hasp hl加密狗驱动
  17. WORD排版视频教程
  18. 408计算机考试科目英语数学,关于计算机考研408的那些事儿
  19. vs2015 社区版 + Qt 5.13.2 安装
  20. Linux系统中文件颜色分别代表什么?

热门文章

  1. 181124 逆向-2018“柏鹭杯”厦大邀请赛初赛(Re1、2)
  2. 工薪族巧理财之定期存款中整存整取、零存整取、存本取息之间的微妙区别
  3. av_dump_format函数使用说明
  4. 阿里大数据之路:数据管理篇大总结
  5. PhpStorm 远程连接服务器
  6. 【Recsys2021】推荐系统论文整理和导读
  7. 发送邮箱验证码进行注册验证
  8. evga x58服务器芯片组,何谓板皇?四大顶级X58主板巅峰对决
  9. SpringBoot中@Value读取不到值的解决方案
  10. 3.WebSocket编程—发送与接收JAVA对象