UDF:用户自定义函数。

可以自定义类实现UDFX接口。

javaAPI:

package com.udf;import javafx.scene.chart.PieChart;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;/*** @author George* @description**/
public class Udf {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setAppName("udf");conf.setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> rdd = sc.parallelize(Arrays.asList("George","GeorgeDage","kangkang"));JavaRDD<Row> map = rdd.map(new Function<String, Row>() {public Row call(String v1) throws Exception {return RowFactory.create(v1);}});List<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));StructType schema = DataTypes.createStructType(fields);Dataset<Row> df = sqlContext.createDataFrame(map, schema);df.show();/*** +----------+* |      name|* +----------+* |    George|* |GeorgeDage|* |  kangkang|* +----------+*/df.registerTempTable("user");/*** 根据UDF函数参数的个数来决定是实现哪一个UDF  UDF1,UDF2。。。。UDF1xxx*/sqlContext.udf().register("StrLen", new UDF1<String, Integer>() {public Integer call(String s) throws Exception {return s.length();}},DataTypes.IntegerType);sqlContext.sql("select name,StrLen(name) as length from user").show();/*** +----------+------+* |      name|length|* +----------+------+* |    George|     6|* |GeorgeDage|    10|* |  kangkang|     8|* +----------+------+*/sqlContext.udf().register("StrLen", new UDF2<String, Integer, Integer>() {public Integer call(String s, Integer integer) throws Exception {return s.length()+integer;}}, DataTypes.IntegerType);sqlContext.sql("select name,StrLen(name,10) as length from user").show();/*** +----------+------+* |      name|length|* +----------+------+* |    George|    16|* |GeorgeDage|    20|* |  kangkang|    18|* +----------+------+*/sc.stop();}
}

scalaAPI:

package com.udfimport org.apache.spark.sql.SparkSession/*** UDF用户自定义函数*/
object UdfScalaDemo {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local").appName("udf").getOrCreate()val list = List[String]("George","lucy","kk","lmdhk")import sparkSession.implicits._val frame = list.toDF("name")frame.createOrReplaceTempView("students")frame.show()/*** +------+* |  name|* +------+* |George|* |  lucy|* |    kk|* | lmdhk|* +------+*/sparkSession.udf.register("STRLEN",(n:String)=>{n.length})sparkSession.sql("select name,STRLEN(name) as length from students sort by length desc").show(100)/*** +------+------+* |  name|length|* +------+------+* |George|     6|* | lmdhk|     5|* |  lucy|     4|* |    kk|     2|* +------+------+*/sparkSession.stop()}
}


UDAF:用户自定义聚合函数。

  • 实现UDAF函数如果要自定义类要继承UserDefinedAggregateFunction类

javaAPI:

package com.udf;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;import java.util.ArrayList;
import java.util.Arrays;/*** @author George* @description*用户自定义聚合函数。*实现UDAF函数如果要自定义类要继承UserDefinedAggregateFunction类**/
public class Udaf {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("udaf");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("George", "kangkang", "GeorgeDage", "limu","George","GeorgeDage"));JavaRDD<Row> map = parallelize.map(new Function<String, Row>() {public Row call(String v1) throws Exception {return RowFactory.create(v1);}});ArrayList<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));StructType schema = DataTypes.createStructType(fields);Dataset<Row> frame = sqlContext.createDataFrame(map, schema);frame.show();/*** +----------+* |      name|* +----------+* |    George|* |  kangkang|* |GeorgeDage|* |      limu|* +----------+*/frame.registerTempTable("user");/*** 注册一个UDAF函数,实现统计相同值得个数* 注意:这里可以自定义一个类继承UserDefinedAggregateFunction类也是可以的*/sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {/*** 指定输入字段的字段及类型*/@Overridepublic StructType inputSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name",DataTypes.StringType, true)));}@Overridepublic DataType dataType() {return DataTypes.IntegerType;}@Overridepublic boolean deterministic() {return true;}/*** 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑* buffer.getInt(0)获取的是上一次聚合后的值* 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合* 大聚和发生在reduce端.* 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算*/@Overridepublic void update(MutableAggregationBuffer buffer, Row input) {buffer.update(0,buffer.getInt(0)+1);}@Overridepublic StructType bufferSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, true)));}/*** 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来* buffer1.getInt(0) : 大聚和的时候 上一次聚合后的值* buffer2.getInt(0) : 这次计算传入进来的update的结果* 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作*/@Overridepublic void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0,buffer1.getInt(0) + buffer2.getInt(0));}/*** 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果*/@Overridepublic void initialize(MutableAggregationBuffer buffer) {buffer.update(0, 0);}/*** 最后返回一个和DataType的类型要一致的类型,返回UDAF最后的计算结果*/@Overridepublic Object evaluate(Row buffer) {return buffer.getInt(0);}});sqlContext.sql("select name ,StringCount(name) from user group by name").show();/*** +----------+------+* |      name|(name)|* +----------+------+* |      limu|     1|* |    George|     2|* |GeorgeDage|     2|* |  kangkang|     1|* +----------+------+*/sc.stop();}
}

scalaAPI:

package com.udfimport org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._class MyUDAF extends UserDefinedAggregateFunction{// 聚合操作时,所处理的数据的类型def bufferSchema: StructType = {DataTypes.createStructType(Array(DataTypes.createStructField("aaa",IntegerType, true)))}// 最终函数返回值的类型def dataType: DataType = {DataTypes.IntegerType}def deterministic: Boolean = {true}// 最后返回一个最终的聚合值     要和dataType的类型一一对应def evaluate(buffer: Row): Any = {buffer.getAs[Int](0)}// 为每个分组的数据执行初始化值def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0}//输入数据的类型def inputSchema: StructType = {DataTypes.createStructType(Array(DataTypes.createStructField("input", StringType, true)))}// 最后merger的时候,在各个节点上的聚合值,要进行merge,也就是合并def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getAs[Int](0)+buffer2.getAs[Int](0)}// 每个组,有新的值进来的时候,进行分组对应的聚合值的计算def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getAs[Int](0)+1}
}
package com.udfimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.UserDefinedAggregateFunctionobject UdafScalaDemo {def main(args: Array[String]): Unit = {val session = SparkSession.builder().master("local").appName("udaf").getOrCreate()val list = List[String]("George","lucy","kk","lmdhk","kk")import session.implicits._val frame = list.toDF("name")frame.createOrReplaceTempView("students")/*** 注册UDAF函数*/session.udf.register("NAMECOUNT",new MyUDAF())session.sql("select name,NAMECOUNT(name) as count from students group by name").show(100)/*** +------+-----+* |  name|count|* +------+-----+* |  lucy|    1|* |    kk|    2|* |George|    1|* | lmdhk|    1|* +------+-----+*/session.stop()}
}

图解UDAF:

Spark _27_自定义函数UDF和UDAF相关推荐

  1. Hive自定义函数UDF、UDAF、UDTF

    0.依赖 <dependencies><!--添加hive依赖--><dependency><groupId>org.apache.hive</g ...

  2. spark SQL自定义函数:

    spark SQL 自定义函数: 自定义函数: 第一种:  U D F  (用户自定义函数)函数 特点:  一对一的关系,输入一个值以后输出一个值  (一进一出) 大部分的内置函数都是U D F函数 ...

  3. 【Flink】Flink Table SQL 用户自定义函数: UDF、UDAF、UDTF

    本文总结Flink Table & SQL中的用户自定义函数: UDF.UDAF.UDTF. UDF: 自定义标量函数(User Defined Scalar Function).一行输入一行 ...

  4. T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@DateFirst、语言版本影响...

    CSDN 的 Blog 太滥了!无时不刻地在坏! 开始抢救性搬家 ... ... 到这里重建家园 /* T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@D ...

  5. hive 元数据 自定义_如何在Hive中创建自定义函数UDF及如何直接通过Impala的同步元数据重用UDF的jar文件-阿里云开发者社区...

    如何在Hive中创建自定义函数UDF及使用 如何在Impala中使用Hive的自定义函数 UDF函数开发 使用Intellij工具开发Hive的UDF函数,进行编译: 1.使用Intellij工具通过 ...

  6. T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@DateFirst、语言版本影响!...

    原文:T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@DateFirst.语言版本影响! CSDN 的 Blog 太滥了!无时不刻地在坏! 开始抢救性搬家 ...

  7. 案例解析丨Spark Hive自定义函数应用

    摘要:Spark目前支持UDF,UDTF,UDAF三种类型的自定义函数. 1. 简介 Spark目前支持UDF,UDTF,UDAF三种类型的自定义函数.UDF使用场景:输入一行,返回一个结果,一对一, ...

  8. Spark SQL自定义函数_第五章

    1.自定义函数分类 类似于hive当中的自定义函数, spark同样可以使用自定义函数来实现新的功能. spark中的自定义函数有如下3类 1.UDF(User-Defined-Function) 输 ...

  9. Spark SQL自定义函数

    文章目录 自定义函数分类 自定义UDF 自定义UDAF[了解] 自定义函数分类 类似于hive当中的自定义函数, spark同样可以使用自定义函数来实现新的功能. spark中的自定义函数有如下3类 ...

最新文章

  1. java 抛异常给上级_java异常处理机制(示例代码)
  2. 【CentOS 7LAMP架构7】,Apache用户认证#171220
  3. [leetcode]110.平衡二叉树
  4. CMOS图像传感器——2021产品选谈
  5. 豆瓣评分9.4分!这部大片你不应该错过,每一秒都是不敢看的残忍!
  6. php 固话验证,收货地址参数校验:收货人、邮编、地址、手机、固话等
  7. VS中发布并调试IIS程序(非附加进程!!!)
  8. 信息学奥赛一本通 1030:计算球的体积 | OpenJudge NOI 1.3 12
  9. Nginx学习之十一-Nginx启动框架处理流程
  10. 火狐浏览器使用copper插件无反应问题
  11. 多行溢出文字省略号显示(HTML、CSS)
  12. nginx 查看当前的连接数
  13. 初步实现免费下载百度文库word文档(只限纯文本文档)----------------(浏览器控制台执行js代码)
  14. Mimikatz的攻击及防御
  15. 超级鸡马虚拟服务器,超级鸡马按键操作图文教程_超级鸡马怎么玩_牛游戏网
  16. 小米路由开启SSH访问权限
  17. 涂鸦 opengl简单应用1
  18. 泰坦尼克号任务-模型建立和评估
  19. rono在oracle的作用_细节见真章,OPPO Reno多项品质测试,这才是最真实表现
  20. PHPCMS留言板制作

热门文章

  1. Enterprise Library Step By Step系列(一):配置应用程序块——入门篇
  2. CodeForces - 1453E Dog Snacks(树形dp+贪心)
  3. 牛客 - Final Exam(贪心)
  4. LightOJ - 1222 Gift Packing(最大费用最大流/KM)
  5. 中石油训练赛 - 小A盗墓(线段树+异或结论)
  6. python基础语法-三大内建数据结构之列表(list)
  7. 迷你linux设备,ComputeLab发布MintBox迷你PC:专为Linux系统玩家打造
  8. POJ2236(并查集)
  9. 视音频编解码学习工程:TS封装格式分析器
  10. live555 源码分析:ServerMediaSession