说明

在hive数仓里,实现一个行转列是很常见的操作,那么如何在 FlinkSQL 中也实现类似的功能呢?以下用以一个样本示例数据来说明。

样本数据

以下数据模拟学生的考试成绩数据:

name list
andy [{“course”:“flink”,“score”:“99”},{“course”:“spark”,“score”:“88”},{“course”:“hadoop”,“score”:“77”}]

数据格式化:

{"name": "andy","list": [{"course": "flink","score": 99}, {"course": "spark","score": 88}, {"course": "hadoop","score": 77}]
}

数据ETL要求

要求最终的行转列数据格式为:

name course score
andy flink 99
andy spark 88
andy hadoop 77

这是一个典型的列转行或者一行转多行的场景,需要将 list 列进行拆分成为多行多列,下面介绍两种实现方式.

  • 方式①、使用 Flink 自带的 unnest 函数(反嵌套)解析
  • 方式②、使用自定义 UDTF 函数解析

FlinkSql建表语句

CREATE TABLE ods_kafka_student_scores (`name` string,`list` ARRAY<ROW<course STRING,score INT>>
)
WITH ('connector' = 'kafka',                                           -- 使用 kafka connector'topic' = 'ods.kafka_student_scores',                             -- kafka topic'properties.bootstrap.servers' = 'dn3:9092,dn4:9092,dn5:9092',   -- broker连接信息'properties.group.id' = 'andy_flink_test',                        -- 消费kafka的group_id'scan.startup.mode' = 'earliest-offset',                            -- 读取数据的位置'format' = 'json',                                               -- 数据源格式为 json'json.fail-on-missing-field' = 'false',                          -- 字段丢失任务不失败'json.ignore-parse-errors' = 'true'                                -- 解析失败则跳过
);

注意:

  • 这里在定义 list 字段类型的时候需要定义为 ARRAY 类型,因为 unnest 函数需要一个数组类型的参数

1、使用 UNNEST 解析

select name,course,score
from ods_kafka_student_scores
CROSS JOIN UNNEST(`list`) AS t (course,score);select name,course,score
from ods_kafka_student_scores, UNNEST(`list`) AS t (course,score);select name,course,score
from ods_kafka_student_scores
LEFT JOIN UNNEST(`list`) AS t (course,score) on true;

2、使用自定义 UDTF 解析

UDTF(自定义表值函数),自定义表值函数。

将 0 个、1 个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是 1 个值。返回的行可以由 1 个或多个列组成。调用一次函数输出多行或多列数据。

必须继承 TableFunction 基类,并实现一个或者多个名为 eval 的方法。

在使用 UDTF 时,需要带上 LATERAL TABLE两个关键字.

@FunctionHint(output = @DataTypeHint("ROW<course STRING,score INT>"))
public class ParserJsonArrayTest extends TableFunction<Row> {private static final Logger LOG = Logger.getLogger(ParserJsonArrayTest.class);public void eval(String value) {try {JSONArray arrays = JSONArray.parseArray(value);Iterator<Object> iterator = arrays.iterator();while (iterator.hasNext()) {JSONObject jsonObject = (JSONObject) iterator.next();String course = jsonObject.getString("course");Integer score = jsonObject.getInteger("score");collect(Row.of(course,score));}} catch (Exception e) {LOG.error("Parser json failed :" + e.getMessage());}}
}

自定义 UDTF 解析的时候,就不需要把 list 字段定义成 ARRAY 类型了,直接定义成 STRING 类型就可以了,并且这种方式会更加的灵活,比如还需要过滤数据或者更复杂的一些操作时都可以在 UDTF 里面完成.

在FlinkSql里使用UDTF

select name,course,url
from ods_kafka_student_scores
CROSS JOIN lateral TABLE (ParserJsonArrayTest(`list`)) AS t (course,score);select name,course,url
from ods_kafka_student_scores, lateral TABLE (ParserJsonArrayTest(`list`)) AS t (course,score);select name,course,url
from ods_kafka_student_scores
left join lateral TABLE (ParserJsonArrayTest(`list`)) AS t (course,score) on true;

unnest 和 自定义 UDTF 函数在使用的时候都有 3 种写法,前面两种写法的效果其实是一样的,第三种写法相当于 left join 的用法.
区别在于 CROSS JOIN/INNER JOIN: 对于左侧表的每一行,右侧 UDTF 不输出,则这一行不输出.
LEFT JOIN: 对于左侧表的每一行,右侧 UDTF 不输出,则这一行会输出,右侧 UDTF 字段为 null

程序运行结果:

2> andy,flink,99
2> andy,spark,88
2> andy,hadoop,77

说明总结

在实际使用的时候:

  • 如果 unnest 可以满足需求就直接用 unnest 不需要带来额外的开发;
  • 如果 unnest 函数满足不了需求,那么就自定义 UDTF 去完成.

FlinkSQL实现行转列相关推荐

  1. FlinkSQL使用自定义UDTF函数行转列-IK分词器

    一.背景说明 优惠券网 www.cps3.cn 本文基于IK分词器,自定义一个UDTF(Table Functions),实现类似Hive的explode行转列的效果,以此来简明开发过程. 如下图Fl ...

  2. 2021年大数据Hive(五):Hive的内置函数(数学、字符串、日期、条件、转换、行转列)

    全网最详细的Hive文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 系列历史文章 前言 Hive的内置函数 一.数学函数 1. 取整函数: round ...

  3. MySQL 学习笔记(16)— 子查询(单行单列、一行多列、多行多列、 ALL、ANY、SOME 运算符、EXISTS 操作符)

    1. 子查询概念 子查询是指嵌套在其他语句(SELECT . INSERT . UPDATE . DELETE 等)中的 SELECT 语句:子查询也称为内查询( inner query )或者嵌套查 ...

  4. 【合并单元格】纵向合并单元格之前对数组处理【针对饿了么element的table的span-method合并行或列的计算方法】

    <template><el-table :span-method="spanMethod"><el-table-column label=" ...

  5. Algs4-1.1.13编写一段代码,打印出一个M行N列的二维数组的转置(交换行和列)

    1.1.13编写一段代码,打印出一个M行N列的二维数组的转置(交换行和列). public  class Test {     public static void main(String[] arg ...

  6. sqlserver 行转列

    还写了一篇Linq 实现 DataTable 行转列有时间大家可以看一下 sqlserver把行转成列在我们编码中是经常遇到的我做一个小例子大家看一下 1 --创建一个表 2 create table ...

  7. hive中array嵌套map以及行转列的使用

    1. 数据源信息 {"student": {"name":"king","age":11,"sex" ...

  8. SQL SERVER特殊行转列案列一则

    今天有个同事找我,他说他有个需求,需要进行行转列,但是又跟一般的行转列有些区别,具体需求如下所说,需要将表1的数据转换为表2的显示格式. 我想了一下,给出了一个解决方法,具体如下所示(先给出测试数据) ...

  9. hive 列转行_掌握这个SQL技巧超越80%的人——行转列/列转行

    在做特征工程的时候,会经常会碰到一个场景,比如手上有一张用户表user,记录了用户某款产品每一天各个功能的使用次数,存储方式类似key-value键值结构.具体如下: 用户使用行为统计表user 此时 ...

最新文章

  1. 【工业4.0】深度报告:独家解密工业4.0真正图谋?跟踪软件帝国的十年
  2. python详细安装教程环境配置-python环境安装详细步骤
  3. opencv 正脸和侧脸检测
  4. Codeforces Round #281 (Div. 2) C. Vasya and Basketball 二分
  5. 前端真的能做到彻底权限控制吗?
  6. 企业实战(Jenkins+GitLab+SonarQube)_09_jenkins发布项目到测试环境
  7. abstract class和interface有什么区别?_程序员必须掌握了解的21个Java核心技术,还在等什么?...
  8. autolayout教程Android,AndroidAutoLayout的简单阅读
  9. Chaotica for Mac(分形艺术作品创作工具)
  10. 配置路由器交换机常见的坑
  11. LVGL8制作简易时钟
  12. 我如何建立热线电话喀拉拉邦并为抗洪救灾做出了贡献
  13. ggplot绘制小提琴图
  14. 当前台式计算机新款,苹果发布全线新款台式Mac机
  15. rtthread 线程
  16. 销售宝:没有销售技巧,能做软件销售么?大神一针见血解答
  17. 复旦大学计算机网络期末考试试题,复旦大学学习计算机科学技术学院期末试题练习题.doc...
  18. Boss直聘网爬虫 基于requests 请求的源码
  19. 【数字设计验证】System Verilog(sv)稍微进阶的笔记(一)
  20. MYSQL 查询指定范围内的经纬度

热门文章

  1. 包工协议书样本_工程分包协议书 样本
  2. url的post请求 Content-Type:application/json类型 Java后端接收(^_^)
  3. 飞翔的小鸟--easyx版
  4. java实现智能拼图_JAVA实现拼图游戏
  5. I5 4590 台式机安装黑苹果再次入坑记(2019.12.23)
  6. u-boot for tiny210 ver2.2.1(by liukun321咕唧咕唧)
  7. 小心!疯狂科技“秒变”疯狂骗局
  8. 网聊是不是就要劈腿上床?
  9. 美国 Sinclair 电视台网络全面瘫痪,罪魁祸首系勒索软件
  10. 计算机毕设(附源码)JAVA-SSM建筑工地环保监控系统研究