UDAF是Hive中用户自定义的聚集函数,Hive内置UDAF函数包括有sum()与count(),UDAF实现有简单与通用两种方式,简单UDAF因为使用Java反射导致性能损失,而且有些特性不能使用,已经被弃用了;在这篇博文中我们将关注Hive中自定义聚类函数-GenericUDAF,UDAF开发主要涉及到以下两个抽象类:

点击(此处)折叠或打开

org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver

org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator

如果你想浏览代码:fork it on Github:https://github.com/rathboma/hive-extension-examples

示例数据准备

首先先创建一张包含示例数据的表:people,该表只有name一列,该列中包含了一个或多个名字,该表数据保存在people.txt文件中。

点击(此处)折叠或打开

~$ cat ./people.txt

John Smith

John and Ann White

Ted Green

Dorothy

把该文件上载到HDFS目录/user/matthew/people中:

点击(此处)折叠或打开

hadoop fs -mkdir people

hadoop fs -put ./people.txt people

下面要创建Hive外部表,在Hive shell中执行

点击(此处)折叠或打开

CREATE EXTERNAL TABLE people (name string)

ROW FORMAT DELIMITED FIELDS

TERMINATED BY '\t'

ESCAPED BY ''

LINES TERMINATED BY '\n'

STORED AS TEXTFILE

LOCATION '/user/matthew/people';

相关抽象类介绍

创建一个GenericUDAF必须先了解以下两个抽象类:

点击(此处)折叠或打开

org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver

org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator

为了更好理解上述抽象类的API,要记住hive只是mapreduce函数,只不过hive已经帮助我们写好并隐藏mapreduce,向上提供简洁的sql函数,所以我们要结合Mapper、Combiner与Reducer来帮助我们理解这个函数。要记住在hadoop集群中有若干台机器,在不同的机器上Mapper与Reducer任务独立运行。

所以大体上来说,这个UDAF函数读取数据(mapper),聚集一堆mapper输出到部分聚集结果(combiner),并且最终创建一个最终的聚集结果(reducer)。因为我们跨域多个combiner进行聚集,所以我们需要保存部分聚集结果。

AbstractGenericUDAFResolver

Resolver很简单,要覆盖实现下面方法,该方法会根据sql传人的参数数据格式指定调用哪个Evaluator进行处理。

点击(此处)折叠或打开

public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException;

GenericUDAFEvaluator

UDAF逻辑处理主要发生在Evaluator中,要实现该抽象类的几个方法。

在理解Evaluator之前,必须先理解objectInspector接口与GenericUDAFEvaluator中的内部类Model。

ObjectInspector

作用主要是解耦数据使用与数据格式,使得数据流在输入输出端切换不同的输入输出格式,不同的Operator上使用不同的格式。可以参考这两篇文章:first post on Hive UDFs、Hive中ObjectInspector的作用,里面有关于objectinspector的介绍。

Model

Model代表了UDAF在mapreduce的各个阶段。

点击(此处)折叠或打开

public static enum Mode {

/**

* PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合

* 将会调用iterate()和terminatePartial()

*/

PARTIAL1,

/**

* PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:

* 将会调用merge() 和 terminatePartial()

*/

PARTIAL2,

/**

* FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合

* 将会调用merge()和terminate()

*/

FINAL,

/**

* COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合

* 将会调用 iterate()和terminate()

*/

COMPLETE

};

一般情况下,完整的UDAF逻辑是一个mapreduce过程,如果有mapper和reducer,就会经历PARTIAL1(mapper),FINAL(reducer),如果还有combiner,那就会经历PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。

而有一些情况下的mapreduce,只有mapper,而没有reducer,所以就会只有COMPLETE阶段,这个阶段直接输入原始数据,出结果。

点击(此处)折叠或打开

GenericUDAFEvaluator的方法

// 确定各个阶段输入输出参数的数据格式ObjectInspectors

public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;

// 保存数据聚集结果的类

abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;

// 重置聚集结果

public void reset(AggregationBuffer agg) throws HiveException;

// map阶段,迭代处理输入sql传过来的列数据

public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;

// map与combiner结束返回结果,得到部分数据聚集结果

public Object terminatePartial(AggregationBuffer agg) throws HiveException;

// combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。

public void merge(AggregationBuffer agg, Object partial) throws HiveException;

// reducer阶段,输出最终结果

public Object terminate(AggregationBuffer agg) throws HiveException;

图解Model与Evaluator关系

实例

下面将讲述一个聚集函数UDAF的实例,我们将计算people这张表中的name列字母的个数。

下面的函数代码是计算指定列中字符的总数(包括空格)

pom文件如下:

点击(此处)折叠或打开

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

TotalNumOfLetters

com.xxxx.udaf

1.0-SNAPSHOT

org.apache.hive

hive-exec

2.6.0

org.apache.hadoop

hadoop-client

2.6.0

org.apache.maven.plugins

maven-jar-plugin

com.xxxx.udaf.xxxx

com.jolira

onejar-maven-plugin

1.4.4

true

onejar

one-jar

org.apache.maven.plugins

maven-compiler-plugin

7

7

代码

点击(此处)折叠或打开

package com.xxxx.udaf;

import org.apache.hadoop.hive.ql.exec.Description;

import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;

import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;

import org.apache.hadoop.hive.ql.metadata.HiveException;

import org.apache.hadoop.hive.ql.parse.SemanticException;

import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;

import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;

import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;

@Description(name = "letters", value = "__FUNC__(expr) - return the total count chars of the column(返回该列中所有字符串的字符总数)")

public class TotalNumOfLettersGenericUDAF extends AbstractGenericUDAFResolver {

@Override

public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {

if (parameters.length != 1) { // 判断参数长度

throw new UDFArgumentLengthException("Exactly one argument is expected, but " +

parameters.length + " was passed!");

}

ObjectInspector objectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);

if (objectInspector.getCategory() != ObjectInspector.Category.PRIMITIVE) { // 是不是标准的java Object的primitive类型

throw new UDFArgumentTypeException(0, "Argument type must be PRIMARY. but " +

objectInspector.getCategory().name() + " was passed!");

}

// 如果是标准的java Object的primitive类型,说明可以进行类型转换

PrimitiveObjectInspector in putOI = (PrimitiveObjectInspector) objectInspector;

// 如果是标准的java Object的primitive类型,判断是不是string类型,因为参数只接受string类型

if (in putOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {

throw new UDFArgumentTypeException(0, "Argument type must be Strig, but " +

in putOI.getPrimitiveCategory().name() + " was passed!");

}

return new TotalNumOfLettersEvaluator();

}

public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {

PrimitiveObjectInspector in putIO;

ObjectInspector outputIO;

PrimitiveObjectInspector IntegerIO;

int total = 0;

@Override

public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {

assert (parameters.length == 1);

super.init(m, parameters);

/**

* PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合

* 将会调用iterate()和terminatePartial()

* PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:

* 将会调用merge() 和 terminatePartial()

* FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合

* 将会调用merge()和terminate()

* COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合

* 将会调用 iterate()和terminate()

*/

//map阶段读取sql列,输入为String基础数据格式

if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {

in putIO = (PrimitiveObjectInspector) parameters[0];

} else { //其余阶段,输入为Integer基础数据格式

IntegerIO = (PrimitiveObjectInspector) parameters[0];

}

// 指定各个阶段输出数据格式都为Integer类型

outputIO = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,

ObjectInspectorFactory.ObjectInspectorOptions.JAVA);

return outputIO;

}

/**

* 存储当前字符总数的类

*/

static class LetterSumAgg implements AggregationBuffer {

int sum = 0;

void add(int num) {

sum += num;

}

}

@Override

public AggregationBuffer getNewAggregationBuffer() throws HiveException {

LetterSumAgg result = new LetterSumAgg();

return result;

}

@Override

public void reset(AggregationBuffer aggregationBuffer) throws HiveException {

LetterSumAgg myAgg = new LetterSumAgg();

}

private boolean warned = false;

@Override

public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {

assert (parameters.length == 1);

if (parameters[0] != null) {

LetterSumAgg myAgg = (LetterSumAgg) agg;

Object p = in putIO.getPrimitiveJavaObject(parameters[0]);

myAgg.add(String.valueOf(p).length());

}

}

@Override

public Object terminatePartial(AggregationBuffer agg) throws HiveException {

LetterSumAgg myAgg = (LetterSumAgg) agg;

total += myAgg.sum;

return total;

}

@Override

public void merge(AggregationBuffer agg, Object partial) throws HiveException {

if (partial != null) {

LetterSumAgg myAgg1 = (LetterSumAgg) agg;

Integer partialSum = (Integer) IntegerIO.getPrimitiveJavaObject(partial);

LetterSumAgg myAgg2 = new LetterSumAgg();

myAgg2.add(partialSum);

myAgg1.add(myAgg2.sum);

}

}

@Override

public Object terminate(AggregationBuffer agg) throws HiveException {

LetterSumAgg myAgg = (LetterSumAgg) agg;

total = myAgg.sum;

return myAgg.sum;

}

}

}

使用自定义函数

点击(此处)折叠或打开

ADD JAR ./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;

CREATE TEMPORARY FUNCTION letters as 'com.xxxx.udaf.TotalNumOfLettersGenericUDAF';

SELECT letters(name) FROM people;

OK

44

Time taken: 20.688 seconds

hive udaf_Hive UDAF 函数的编写相关推荐

  1. hive udaf_Hive自定义函数

    为什么需要自定义函数 hive的内置函数满足不了所有的业务需求. hive提供很多的模块可以自定义功能,比如:自定义函数.serde.输入输出格式等. 常见自定义函数UDF分三种: UDF(User ...

  2. Hive 之 用户自定义函数 UDF UDAF UDTF

    一 什么是UDF UDF是UserDefined Function 用户自定义函数的缩写.Hive中除了原生提供的一些函数之外,如果还不能满足我们当前需求,我们可以自定义函数. 除了UDF 之外,我们 ...

  3. 【大数据开发】SparkSQL——Spark对接Hive、Row类、SparkSQL函数、UDF函数(用户自定义函数)、UDAF函数、性能调优、SparkSQL解决数据倾斜

    文章目录 一.Spark对接Hive准备工作 1.1 集群文件下载 1.2 导入依赖 1.3 打开集群metastore服务 二.Spark对接Hive 2.1 查询Hive 2.2 读取MySQL中 ...

  4. Hive 内置函数及自定义函数

    1.内置函数 使用如下命令查看当前hive版本支持的所有内置函数 show functions; 部分截图: 可以使用如下命令查看某个函数的使用方法及作用,比如查看 upper函数 desc func ...

  5. 大数据入门教程系列之Hive内置函数及自定义函数

    本篇文章主要介绍Hive内置函数以及自定义UDF函数和UDFT函数,自定义UDF函数通过一个国际转换中文的例子说明. 操作步骤: ①.准备数据和环境 ②.演示Hive内置函数 ③.自定义UDF函数编写 ...

  6. hive 的udf 函数使用

    1)依据课程讲解UDF编程案例,完成练习,总结开发UDF步骤,代码贴图,给予注释,重点 2)更改emp 表中名字的大写给为小写. 一:hive 的udf 函数: 1.1 hive UDF 函数概述: ...

  7. hive内置函数_flink教程flink modules详解之使用hive函数

    modules概念 通过hive module使用hive函数 内置函数 自定义函数 sql 客户端的使用 原理分析和源码解析 实现 modules概念 flink 提供了一个module的概念,使用 ...

  8. UDF函数和UDTF函数的图解举例,追加UDAF函数

    简述UDF/UDAF/UDTF是什么,各自解决问题及应用场景 - 玩转大数据 - 博客园 自定义UDF和UDTF函数的两个作用点: 1.埋点log打印日志,方便任务出现问题后进行调试 2.有一些SQL ...

  9. hive的UDF函数的使用。常见UDF函数

    UDF的话一般是hive提供的函数功能满足不了业务需要,我们就会自己来写UDF函数来辅助完成,对于我们常用的函数而言还是哪些常见的聚合函数,如:count.sum.avg.max.min等,其他的话就 ...

最新文章

  1. 迷失只是暂时 2011-03-13
  2. RabbitMQ-c在Linux上编译
  3. Vscode----热门插件超实用插件汇总(史上最全)
  4. oracle临时表空间占用率过高,ORACLE 临时表空间使用率过高的原因及临时解决方案...
  5. 【推荐】SQL Server 2008 R2 中英文 开发版/企业版/标准版 下载
  6. 一次Web请求返回406原因与解决方案
  7. php 检查货币类型_php 判断函数是否为费用类型(金额/货币:6.02)有小数点
  8. FreeMarker FTL标签
  9. VUE项目中使用阿里图标iconfont
  10. 初学者之路—————Cycle GAN
  11. doom3的UI系统
  12. 经典语录总结:识人篇
  13. 普通最小二乘法平面直线回归问题的三种实现(Python)
  14. 盗版升级win10仍是盗版
  15. 麒麟操作系统之光盘刻录
  16. Note(读书笔记)
  17. html name选择器,iframe标签的name属性
  18. Tortoisegit 远端版本回退
  19. App地推统计:最全面的业绩统计方案
  20. idea配置 Tomcat Deployment添加时没有Artifact的完美解决方式!较全面

热门文章

  1. yii mysql 查询 类型转换_Yii2.0 API改造(返回数据库对应字段数据类型)
  2. 没有找到dllregisterserver输入点_Excel教程:框内打的几种输入方法,值得收藏
  3. 限定位宽比较大小_自己之前买电脑整理的一些电脑知识,比较粗糙,仅供参考。...
  4. linux网卡握手速率模式,一种基于Linux平台下的网卡速率和双工模式测试的方法与流程...
  5. 同时买票是怎么实现的_腰包怎么背才更新潮?教你3种时尚背法,告别土味老气风...
  6. orabbix监控oracle11g,orabbix 监控oracle
  7. matlab thetal,基於matlab的車道和車道線檢測樣例
  8. mysql三高讲解(二):2.8 mysql视图相关概念
  9. 服务器端添加网站,服务器添加网站
  10. 谷歌驱动和谷歌版本必须一一对应吗_谷歌:华为别用我的安卓系统!自己研发一个去...