数据湖技术Iceberg0.12预研文档
Iceberg0.12预研
本次预研场景主要为kafka=>flink sql=>iceberg=>hive=>hdfs=>trino(presto)
本次预研使用组件如下:
名称 | 版本 | 描述 |
---|---|---|
flink | 1.12.1 | 通过parcel包部署于cdh6.3.2中 |
cdh | 6.3.2 | 开源版本 |
hive | 2.3.7 | 包含cdh中(更换jar升级替换) |
hadoop | 3.0.0 | cdh原生版本 |
presto | 2.591 | 开源版本 |
trino | 360 | 开源版本 |
iceberg | 0.12.0 | 于2021年8月15日发布 |
release-notes 地址
https://iceberg.apache.org/releases/#0120-release-notes
依赖jar列表
iceberg-flink-runtime-0.12.0.jar
flink-sql-connector-hive-2.3.6_2.12-1.12.1.jar
flink-sql-connector-kafka_2.12-1.12.1.jar
flink-json-1.12.1.jar
sql-client-default.yaml配置
catalogs:- name: icebergtype: icebergproperty-version: 2warehouse: hdfs://nameservice1/data/iceberguri: thrift://cdh2:9083catalog-type: hive
## sql-client 开启checkpoint配置,如果不开启,kafka-iceberg就不会提交元数据信息
configuration:execution.checkpointing.interval: 60000state.checkpoints.num-retained: 10execution.checkpointing.mode: EXACTLY_ONCEexecution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATIONstate.backend: rocksdbstate.checkpoints.dir: hdfs:///user/flink/checkpointsstate.savepoints.dir: hdfs:///user/flink/checkpoints
进入sql-client
sql-client.sh embedded -l /flink/soft/
测试iceberg on hive-catalog
##如果不设置sql-client-default.yaml,可通过语句创建
CREATE CATALOG iceberg WITH ('type'='iceberg','catalog-type'='hive','uri'='thrift://cdh2:9083','clients'='5','property-version'='1','warehouse'='hdfs://nameservice1/data/iceberg'
);
##查看所有的catalogs
show catalogs;
##进入iceberg-catalogs中
use catalog iceberg;
##查看所有的databases
show databases;
##创建databases;
create database iceberg_test;
##进入iceberg_test(database)
use iceberg_test;
##创建iceberg_v1表
CREATE TABLE students_v1 (addre string,lxdh string,xm string,dz string,byyx string,id string COMMENT 'id',age int
)PARTITIONED BY (age)
WITH (
'connector'='iceberg',
'write.format.default'='orc',
'format-version'='1', --指定表版本为v1
'write.metadata.delete-after-commit.enabled'='true' --删除最旧的版本元数据文件默认保存最近一百个
);
##查看所有表
show tables;
##删除表
drop tables table_name
##插入数据
insert into iceberg.iceberg_test.students_v1 values('郑巷141号','17341533143','丁晟睿','福海市','西南大学','1',31);
insert into iceberg.iceberg_test.students_v1 values('郑巷141号','17341533143','丁晟','福海市','西南大学','2',30);
##测试查询
select * from students_v1;
select * from students_v1 where age = 31;
##创建iceberg_v2表
CREATE TABLE students_v2 (addre string,lxdh string,xm string,dz string,byyx string,id string PRIMARY KEY NOT ENFORCED COMMENT 'PRIMARY KEY' ,age int
)
WITH (
'connector'='iceberg',
'format-version'='2', -- 指定表版本为v2 v2不支持orc方式写入
'write.distribution-mode'='hash', -- 避免小文件产生
'write.metadata.delete-after-commit.enabled'='true' --删除最旧的版本元数据文件默认保存最近一百个
);
##插入数据
insert into iceberg.iceberg_test.students_v2 values('郑巷141号','17341533143','丁晟睿','福海市','西南大学','1',31);
insert into iceberg.iceberg_test.students_v2 values('郑巷141号','17341533143','丁晟','福海市','西南大学','2',30);
##测试查询
select * from students_v2;
select * from students_v2 where age = 31;
创建kafka-flink
kafka常用命令集
- 查询Topic
kafka-topics --list --zookeeper cdh2:2181,cdh3:2181
- 创建Topic test
kafka-topics --create --topic iceberg --partitions 3 --replication-factor 1 --zookeeper cdh3:2181
- 从Topic test消费消息
#加了--from-beginning 重头消费所有的消息
kafka-console-consumer --bootstrap-server cdh2:9092 --topic iceberg --from-beginning
- 往Topic test生产消息
kafka-console-producer --broker-list cdh2:9092 --topic iceberg
{"addre":"孔桥90714号","lxdh":"17298981101","xm":"张弘文","dz":"包宁市","byyx":"东北技术大学","id":49,"age":76}
{"addre":"韦巷57841号","lxdh":"18660990864","xm":"戴建辉","dz":"西阳市","byyx":"西北科技大学","id":50,"age":49}
{"addre":"宋路4975号","lxdh":"15974172825","xm":"汪智辉","dz":"济乡县","byyx":"西南大学","id":51,"age":95}
{"addre":"侯巷6339号","lxdh":"17596361794","xm":"夏明轩","dz":"衡京市","byyx":"南技术大学","id":51,"age":95}
{"addre":"秦街6371号","lxdh":"15110361098","xm":"王立辉","dz":"安沙市","byyx":"东科技大学","id":51,"age":35}
{"addre":"贺中心10436号","lxdh":"13305794462","xm":"侯智辉","dz":"海都市","byyx":"西南体育大学","id":54,"age":87}
{"addre":"黄中心0号","lxdh":"13956637216","xm":"宋浩轩","dz":"珠阳市","byyx":"西北农业大学","id":55,"age":57}
- 删除topic
kafka-topics --zookeeper cdh2:2181 --delete --topic iceberg
创建kafka on flink 表
CREATE TABLE kafka_test (addre string,lxdh string,xm string,dz string,byyx string,id int,age int
) WITH ('connector.type' = 'kafka', -- 使用 kafka connector'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本'connector.topic' = 'iceberg', -- kafka topic名称'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取'connector.properties.bootstrap.servers' = 'cdh2:9092', -- kafka broker 地址'connector.properties.group.id' = 'testgroup1', 'format.type' = 'json', 'format.ignore-parse-errors' = 'true' -- 解析失败跳过
);
select * from kafka_test;
测试 kafka=>iceberg
##测试v1插入
insert into iceberg.iceberg_test.students_v1 select addre,lxdh,xm,dz,byyx,cast(id as string) as id,age from default_catalog.default_database.kafka_test;##测试v2插入
insert into iceberg.iceberg_test.students_v2 select addre,lxdh,xm,dz,byyx,cast(id as string) as id,age from default_catalog.default_database.kafka_test;
trino(presto)-catalog配置
connector.name=iceberg
hive.metastore.uri=thrift://cdh2:9083
iceberg.file-format=orc
hive.config.resources=/etc/alternatives/hadoop-conf/core-site.xml,/etc/alternatives/hadoop-conf/hdfs-site.xml
trino常用命令
v1表查询
##trino 查询所有数据
select * from iceberg.iceberg_test.students_v1
##按照分区删除
delete from iceberg_test.students_v1 where age = 30
##查询所有快照
SELECT * FROM iceberg_test."students_v1$snapshots" ORDER BY committed_at
##快照回滚
CALL iceberg.system.rollback_to_snapshot('iceberg_test', 'students_v1', 512151592674641227)v2表目前查询失败
##trino 查询所有数据
select * from iceberg_test.students_v2
##按照分区删除
delete from iceberg_test.students_v2 where age = 30
##查询所有快照
SELECT * FROM iceberg_test."students_v2$snapshots" ORDER BY committed_at
##快照回滚
CALL iceberg.system.rollback_to_snapshot('iceberg_test', 'students_v2', 512151592674641227)
maven build(坑了很久)
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.wk.iceberg.iceberg.KafkaOnIceberg</mainClass></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins></build>
小文件合并与过期文件删除
//归并小文件为设置文件大小Actions.forTable(table).rewriteDataFiles().maxParallelism(5).targetSizeInBytes(128 * 1024 * 1024) .execute();//自动删除以前的过期数据文件Snapshot snapshot = table.currentSnapshot();if (snapshot != null){long time = snapshot.timestampMillis();table.expireSnapshots().expireOlderThan(time).commit();}
总结如下
- v2表单个任务内可实现按照key更新去重和聚合运算,多个任务插入即会出现重复数据
- 小文件合并需手动调用java api实现,且小文件根据checkpoint时间成倍增长
- 如不需要快照文件,可通过java api的方式清理当前日期过期数据及元数据文件
- kafka=>iceberg一定要开启checkpoint否则元数据不会写入
V1 | V2 | |
---|---|---|
使用场景 | insert流 | upsert流 |
trino(presto) | Y | 不支持 |
读取方式 | 批流 | 批 |
存储格式 | orc/parquet | parquet |
小文件合并 | Y | N |
快照回滚 | Y | 暂未测试(jar冲突) |
过期数据删除 | Y | N |
分区 | Y | Y(主键+分区相同数据不会显示) |
数据湖技术Iceberg0.12预研文档相关推荐
- java预研项目_YAML预研文档
YAML预研文档 YAML概要 YAML是"YAML Ain't a Markup Language"(YAML不是一种置标语言)的递归缩写,早先YAML的意思其实是:" ...
- MTK VILTE预研文档
本文是在乐视做Phone开发时,预研视频通话功能的文档 简介 本文作为mtk的viLTE的预研文档,主要从从视频通话流程的角度阐述viLTE相关内容,为后续开发做充分的理论准备.从上层代 ...
- 大数据架构师——数据湖技术(二)
文章目录 数据湖技术 数据湖技术之Iceberg Spark 与 Iceberg 整合 1. Spark3.2.1 与 Iceberg0.13.2整合 添加依赖 Spark 设置 Catalog 配置 ...
- 数据湖技术之Hudi 集成 Spark
数据湖技术之Hudi 集成 Spark 数据湖框架Hudi,从诞生之初支持Spark进行操作,后期支持Flink,接下来先看看与Spark整合使用,并且在0.9.0版本中,提供SparkSQL支持,编 ...
- 【推荐】数据湖技术及实践与案例资料汇总合集47篇
数据湖或hub的概念最初是由大数据厂商提出的,表面上看,数据都是承载在基于可向外扩展的HDFS廉价存储硬件之上的.但数据量越大,越需要各种不同种类的存储.最终,所有的企业数据都可以被认为是大数据,但并 ...
- 数据湖技术 Iceberg 的探索与实践
随着大数据存储和处理需求的多样化,如何构建一个统一的数据湖存储,并在其上进行多种形式的数据分析成了企业构建大数据生态的一个重要方向.Netflix 发起的 Apache Iceberg 项目具备 AC ...
- 数据湖04:数据湖技术架构演进
系列专题:数据湖系列文章 1. 背景 国内的大型互联网公司,每天都会生成几十.几百TB,甚至几PB的原始数据.这些公司通常采用开源的大数据组件来搭建大数据平台.大数据平台经历过"以Hadoo ...
- Java计算机毕业设计体育馆数据管理系统源码+系统+数据库+lw文档
Java计算机毕业设计体育馆数据管理系统源码+系统+数据库+lw文档 Java计算机毕业设计体育馆数据管理系统源码+系统+数据库+lw文档 本源码技术栈: 项目架构:B/S架构 开发语言:Java语言 ...
- java计算机毕业设计H5醉美南湾湖网站设计MyBatis+系统+LW文档+源码+调试部署
java计算机毕业设计H5醉美南湾湖网站设计MyBatis+系统+LW文档+源码+调试部署 java计算机毕业设计H5醉美南湾湖网站设计MyBatis+系统+LW文档+源码+调试部署 本源码技术栈: ...
最新文章
- quartus中pin planner中分配引脚的对话框不见了,怎么找回(附方法)
- WEB服务器、应用程序服务器、HTTP服务器的区别
- html登录后记住用户名,完成登录功能,用session记住用户名
- 10 Equality constrained minimization
- 推荐一条高效的Python爬虫学习路径!
- 北师大网络教育计算机试题一的答案,北师大网络教育《专科英语一》作业3部分答案...
- 如何禁用python警告
- iOS中利用UISearchBar实现搜索
- opencv学习笔记06
- shell 的AWK
- sqlserver2012 学习总结笔记
- 3dmax2020渲染器下载3dmax2020渲染器VRay4.2下载安装教程
- 中兴笔试c语言,中兴c语言笔试题
- 马哥linux脚本,马哥全套linux运维教程
- html黑洞效果,HTML5 Canvas炫酷宇宙黑洞引力特效
- 动态规划(dynamic programming)初步入门
- 月薪9K程序员,写完这段代码就被辞退了
- 关于主机的思维导图_关于开展思维导图培训的通知
- java汉字转换拼音,获取汉字串拼音首字母
- Delphi 编写数字签名验证并获取签名信息
热门文章
- 病毒36otray.exe(中间是字母o,不是360安全卫士的数字0)和ntldr.exe
- Markdown 教程:这一篇博客就让你学会制作高逼格的文档
- 魅族16t无法点击计算机传输,魅族16T手机中隐藏这个功能你一定要开启,非常的实用哦!...
- WebRTC 直播时代
- win7桌面的计算机在哪里,win7更改桌面路径,win7桌面文件在哪里
- macOS 安装 Adobe 全家桶之 Lightroom Classic
- CA体系与证书——TLS安全加密的基础
- FOFA(一): FOFA入门
- 两台设备搭建lvs高可用
- 0-01--python3 logging基本用法