1.自定义函数分类
类似于hive当中的自定义函数, spark同样可以使用自定义函数来实现新的功能。
spark中的自定义函数有如下3类
1.UDF(User-Defined-Function)
输入一行,输出一行
2.UDAF(User-Defined Aggregation Funcation)
输入多行,输出一行
3.UDTF(User-Defined Table-Generating Functions)
输入一行,输出多行
2、 自定义UDF
需求
有udf.txt数据格式如下:

Hello
abc
study
small

通过自定义UDF函数将每一行数据转换成大写
select value,smallToBig(value) from t_word
代码演示:

def main(args: Array[String]): Unit = {
//1、创建sparksession
val spark: SparkSession = SparkSession.builder().master(“local[*]”).appName(“demo01”).getOrCreate()
//2、创建sparkcontext
val sc: SparkContext = spark.sparkContext
//3、读取数据。并操作
val ttRDD: RDD[String] = sc.textFile(“file:///F:\传智播客\传智专修学院\第二学期\34\05-Spark\资料\udf.txt”)
import spark.implicits._
val UDFDS: Dataset[String] = ttRDD.toDS()
//自定义函数
spark.udf.register(“toUpperAdd123”,(str:String)=>{
//根据业务需求对数据进行加工
str.toUpperCase +" 123"
})
UDFDS.createOrReplaceTempView(“UDF”)
//调用函数
spark.sql(“SELECT value,toUpperAdd123(value) as length_10 FROM UDF”).show()
sc.stop()
spark.stop()
}

3、自定义UDAF[了解]
需求:
有udaf.json数据内容如下

{“name”:“Michael”,“salary”:3000}
{“name”:“Andy”,“salary”:4500}
{“name”:“Justin”,“salary”:3500}
{“name”:“Berta”,“salary”:4000}

求取平均工资
●继承UserDefinedAggregateFunction方法重写说明
inputSchema:输入数据的类型
bufferSchema:产生中间结果的数据类型
dataType:最终返回的结果类型
deterministic:确保一致性,一般用true
initialize:指定初始值
update:每有一条数据参与运算就更新一下中间结果(update相当于在每一个分区中的运算)
merge:全局聚合(将每个分区的结果进行聚合)
evaluate:计算最终的结果
●代码演示:

object Udaf {
//编写计算平均工资的方法SalaryAvg
class SalaryAvg extends UserDefinedAggregateFunction{
//输入的数据的类型
override def inputSchema: StructType = {
StructType(StructField(“input”,LongType)::Nil)
}
//中间结果缓存的类型
override def bufferSchema: StructType = {
//sum缓存总金额
//total缓存总次数
StructType(List(StructField(“sum”,LongType),(StructField(“total”,LongType))))
}
//数据返回的类型
override def dataType: DataType = {
DoubleType
}
//是否有相同的输出 true
override def deterministic: Boolean = {
true
}
/*
List(1,2,3,4,5).reduce((a,b)=>a+b)
1 a=1 b=2
2 a=3 b=3
3 a=6 b=4
4 a=10 b=5
5 a=51
/
//数据的初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
//用于存储总金额
buffer(0)=0L //=> a
//用于存储次数
buffer(1)=0L //=>b
}
//rdd是分区的 此方法是计算一个分区内的数据和 和数据数量
/

{“name”:“Michael”,“salary”:3000}
{“name”:“Andy”,“salary”:4500}
{“name”:“Justin”,“salary”:3500}
{“name”:“Berta”,“salary”:4000}
/
override def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
//计算次分区的总金额
buffer(0)=buffer.getLong(0)+input.getLong(0)
//计算次分区的总数量
buffer(1)=buffer.getLong(1)+1
}
//汇总所有分区内的总金额 和 总次数
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
//汇聚所有分区的总金额
buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)
//汇聚所有分区的总次数
buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)
}
//求最后的平均值
//计算平均薪资
//总金额/总数量
override def evaluate(buffer: Row): Any = {
buffer.getLong(0).toDouble/buffer.getLong(1).toDouble
}
}
def main(args: Array[String]): Unit = {
//1、创建sparksession
val spark: SparkSession = SparkSession.builder().master("local[
]").appName(“demo01”).getOrCreate()
val JsonDatas: DataFrame = spark.read.json(“file:///F:\传智播客\传智专修学院\第二学期\34\05-Spark\资料\udaf.json”)
JsonDatas.createOrReplaceTempView(“UDAFTable”)
//注册程UDAF函数
spark.udf.register(“SalaryAvg”,new SalaryAvg)
//计算平均工资的算法名称为SalaryAvg
spark.sql(“select SalaryAvg(salary) from UDAFTable”).show()
spark.sql(“select avg(salary) from UDAFTable”).show()
spark.stop()
}
}

Spark SQL自定义函数_第五章相关推荐

  1. spark SQL自定义函数:

    spark SQL 自定义函数: 自定义函数: 第一种:  U D F  (用户自定义函数)函数 特点:  一对一的关系,输入一个值以后输出一个值  (一进一出) 大部分的内置函数都是U D F函数 ...

  2. Spark SQL自定义函数

    文章目录 自定义函数分类 自定义UDF 自定义UDAF[了解] 自定义函数分类 类似于hive当中的自定义函数, spark同样可以使用自定义函数来实现新的功能. spark中的自定义函数有如下3类 ...

  3. spark sql自定义UDF函数-java语言

    背景说明 基于spark sql开发过程中,需要一些类似与官网提供的 int().from_json()等自定函数处理数据.下属将简单讲解通过java如何实现spark sql自定义函数 官方UDF接 ...

  4. sql自定义函数学习思路_学习SQL:用户定义的函数

    sql自定义函数学习思路 You can create several user-defined objects in a database. One of these is definitely u ...

  5. sql 自定义函数 示例_SQL滞后函数概述和示例

    sql 自定义函数 示例 In the article SQL Server Lead function overview and examples, we explored Lead functio ...

  6. ArcGIS for Desktop入门教程_第五章_ArcCatalog使用 - ArcGIS知乎-新一代ArcGIS问答社区

    原文:ArcGIS for Desktop入门教程_第五章_ArcCatalog使用 - ArcGIS知乎-新一代ArcGIS问答社区 1 ArcCatalog使用 1.1 GIS数据 地理信息系统, ...

  7. sql 自定义函数 示例_SQL Server Choose()函数介绍和示例

    sql 自定义函数 示例 In the article, a CASE statement in SQL, we explored one of the important logical expre ...

  8. MS SQL自定义函数IsPositiveInteger MS SQL自定义函数IsNumeric 水晶报表使用IEnumerableT数据源...

    MS SQL自定义函数IsPositiveInteger 判断字符串是否为正整数,0开始的的数字不算. SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO ...

  9. sql 自定义函数 示例_SQL Server SESSION_CONTEXT()函数与示例

    sql 自定义函数 示例 This article explores the SQL Server session context function, SESSION_CONTEXT() and pe ...

最新文章

  1. python rpc_python与RPC服务
  2. BOOL 值在 debug 和 release 模式下初始化不一样!!!
  3. Web框架之Django_03 路由层了解(路有层 无名分组、有名分组、反向解析、路由分发 视图层 JsonResponse,FBV、CBV、文件上传)
  4. 这一套磁悬浮PID训练装置,你不能错过。
  5. 代码审查“思维导图”
  6. LeetCode 890. 查找和替换模式(哈希表)
  7. 【折腾】斐讯N1 安装 Docker + GUI
  8. 端到端加密优缺点_基于Filecoin的去中心化文件保存和加密分享平台
  9. tp3.2 隐藏index.php,thinkphp3.2.3隐藏index.php入口文件
  10. 3DEC/PFC离散元入门篇
  11. Chromium OS Autotest 服务端测试
  12. 微信小程序开发:微信小程序里面集成百度地图的步骤
  13. 阿里datav地图json地址
  14. linux /etc/profile文件,linux系统中/etc/profile和.profile的介绍
  15. 手把手和你用原生JS写一个循环播放图片轮播
  16. 时序分析 43 -- 时序数据转为空间数据 (二) 马尔可夫转换场
  17. 18.移动点餐端搭建----点餐系统移动端
  18. 网上教务评教管理系统
  19. cfdpost导出图片_科学网—去除 cfd post 输出eps文件中的莫名其妙的点 - 姚程的博文...
  20. 消除SDK更新时的“https://dl-ssl.google.com refused”错误

热门文章

  1. 获取中国 省、市、县区
  2. 评判好的呼叫中心来电管理系统
  3. markdown-it 重定义渲染规则
  4. 苹果cms播放页html,苹果CMS播放页被劫持跳转至qp网站的解决方案
  5. zigbee zcl规范及其协议栈实现1
  6. 中科院计算机博士学位答辩 顾智宇,中科院论文答辩情况和学位授予决议书.doc...
  7. 计算机实验报告要求,计算机上机实验内容及实验报告要求
  8. 【Microsoft Whiteboard】微软白板 下载
  9. C++ string常用函数(翻转字符串、获得字符串子串)(更新中)
  10. MRP:Workbench启用上Pegging功能,一点击页面就死掉解决方法