Flink+Iceberg环境搭建及生产问题处理
全网最全大数据面试提升手册!
概述
作为实时计算的新贵,Flink受到越来越多公司的青睐,它强大的流批一体的处理能力可以很好地解决流处理和批处理需要构建实时和离线两套处理平台的问题,可以通过一套Flink处理完成,降低成本,Flink结合数据湖的处理方式可以满足我们实时数仓和离线数仓的需求,构建一套数据湖,存储多样化的数据,实现离线查询和实时查询的需求。目前数据湖方面有Hudi和Iceberg,Hudi属于相对成熟的数据湖方案,主要用于增量的数据处理,它跟spark结合比较紧密,Flink结合Hudi的方案目前应用不多。Iceberg属于数据湖的后起之秀,可以实现高性能的分析与可靠的数据管理,目前跟Flink集合方面相对较好。
安装
本次主要基于flink+iceberg进行环境搭建。
1.安装flink
安装并启动hadoop、hive等相关环境。
下载flink安装包,解压后安装:
下载地址: https://archive.apache.org/dist/flink/flink-1.11.3/
wget https://downloads.apache.org/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.12.tgz
tar xzvf flink-1.11.1-bin-scala_2.12.tgz
导入hadoop的环境包,flink-sql会使用到hdfs和hive等相关依赖包进行通讯。
export HADOOP_CLASSPATH=$HADOOP_HOME/bin/hadoop classpath
启动flink集群
./bin/start-cluster.sh
注:这里会遇到第一个坑,iceberg-0.11.1支持的是flink1.11的版本,如果使用过高的版本,会报一堆找不到类和方法的异常(因为flink1.12版本删掉了许多API)。请使用Flink1.11.x版本进行安装。
2.下载Iceberg环境包
主要是/iceberg-flink-runtime-xxx.jar和flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar两个jar包。
下载地址:
https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/0.11.1/
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive
3.启动Flink-sql
执行命令启动flink-sql。
./bin/sql-client.sh embedded
-j /iceberg-flink-runtime-xxx.jar
-j /flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar
shell
4.创建Catalog
Flink支持hadoop、hive、自定义三种Catalog。这里以Hive为例。
注:这里会遇到第二个坑,iceberg和flink当前版本支持的是hive2.3.x的版本,推荐安装hive2.3.8版本。不然也会遇到一堆找不到方法和类的异常。
执行命令,创建hive类型的Catalog。
CREATE CATALOG hive_catalog WITH ('type'='iceberg','catalog-type'='hive','uri'='thrift://server1:9083','clients'='5','property-version'='1','warehouse'='hdfs://server1/user/hive/warehouse'
);
创建成功后的提示
5.创建表
创建DataBase:
create iceberg_db;
use iceberg_db;
创建表:
CREATE TABLE test (id BIGINT COMMENT 'unique id',busi_date STRING
)
6.插入数据和Flink任务执行情况
执行sql插入数据。
可以在Flink任务中看到相应的Job。
7.Iceberg组件介绍
IcebergStreamWriter
主要用来写入记录到对应的 avro、parquet、orc 文件,生成一个对应的 Iceberg DataFile,并发送给下游算子。
另外一个叫做 IcebergFilesCommitter,主要用来在 checkpoint 到来时把所有的 DataFile 文件收集起来,并提交 Transaction 到 Apache Iceberg,完成本次 checkpoint 的数据写入,生成 DataFile。
IcebergFilesCommitter
为每个 checkpointId 维护了一个 DataFile 文件列表,即 map,这样即使中间有某个 checkpoint 的 transaction 提交失败了,它的 DataFile 文件仍然维护在 State 中,依然可以通过后续的 checkpoint 来提交数据到 Iceberg 表中。
在Flink的任务日志中,可以看到对应IcebergStreamWriter和IcebergFilesCommitter的信息,以及snap的ID(3509023638495847835)。
8.Iceberg文件结构介绍
在HDFS系统中观察Iceberg的整个目录结构,可以看到分为data和metadata两个目录,对应开篇介绍的Iceberg文件结构。
下图中可看到Iceberg文件包含了数据文件、元数据和快照、manifest清单和manifest。
观察Iceberg的表元数据文件
hadoop dfs text /user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/00004-afffb920-e788-437e-80f0-4187a42ae74b.metadata.json
可以看到对应的快照信息,表的版本、更新时间戳、manifest清单文件地址等信息。具体的字段描述可以参考官网介绍:https://iceberg.apache.org/spec/#iceberg-table-spec
这里可以看到刚刚Flink任务插入的快照信息(3509023638495847835)
观察manifest清单和manifest文件
9.分区表
采集分区表并插入数据。
CREATE TABLE t_partition (id BIGINT COMMENT 'unique id',busi_date STRING
) PARTITIONED BY (busi_date);
可以看到表文件通过分区目录进行了划分,提高查询效率。
10.Iceberg执行计划
11.通过Flink代码的方式操作Iceberg
package com.hyr.flink.icebergimport org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject IcebergDemo {def main(args: Array[String]): Unit = {val conf: Configuration = new Configuration()// 自定义web端口conf.setInteger(RestOptions.PORT, 9000)val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)streamEnv.setParallelism(1)val tenv = StreamTableEnvironment.create(streamEnv)// add hadoop config filetenv.executeSql("CREATE CATALOG hive_catalog WITH (\n 'type'='iceberg',\n 'catalog-type'='hive',\n 'uri'='thrift://server1:9083',\n 'clients'='5',\n 'property-version'='1',\n 'warehouse'='hdfs://server1:8020/user/hive/warehouse'\n)");tenv.useCatalog("hive_catalog");tenv.executeSql("show databases").print()tenv.useDatabase("iceberg_db")tenv.executeSql("show tables").print()tenv.executeSql("select id from test").print() }
}
完整的一个表元数据信息文件:
{"format-version" : 1,"table-uuid" : "cfa12929-0f4c-475c-aca0-7c9cc411a1ac","location" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test","last-updated-ms" : 1622771727393,"last-column-id" : 2,"schema" : {"type" : "struct","fields" : [ {"id" : 1,"name" : "id","required" : false,"type" : "long"}, {"id" : 2,"name" : "data","required" : false,"type" : "string"} ]},"partition-spec" : [ ],"default-spec-id" : 0,"partition-specs" : [ {"spec-id" : 0,"fields" : [ ]} ],"default-sort-order-id" : 0,"sort-orders" : [ {"order-id" : 0,"fields" : [ ]} ],"properties" : { },"current-snapshot-id" : 555628243696744305,"snapshots" : [ {"snapshot-id" : 8531001366494199026,"timestamp-ms" : 1622770732247,"summary" : {"operation" : "append","flink.job-id" : "371316a9b274ff09f85957afe730e25d","flink.max-committed-checkpoint-id" : "9223372036854775807","added-data-files" : "1","added-records" : "1","added-files-size" : "637","changed-partition-count" : "1","total-records" : "1","total-data-files" : "1","total-delete-files" : "0","total-position-deletes" : "0","total-equality-deletes" : "0"},"manifest-list" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/snap-8531001366494199026-1-63278140-aca4-4cd7-bffc-1e7d0e4b4b1b.avro"}, {"snapshot-id" : 626484522728673979,"parent-snapshot-id" : 8531001366494199026,"timestamp-ms" : 1622770733546,"summary" : {"operation" : "append","flink.job-id" : "bbdcfb52195a0b0b556c6a167fc3de9f","flink.max-committed-checkpoint-id" : "9223372036854775807","added-data-files" : "1","added-records" : "1","added-files-size" : "636","changed-partition-count" : "1","total-records" : "2","total-data-files" : "2","total-delete-files" : "0","total-position-deletes" : "0","total-equality-deletes" : "0"},"manifest-list" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/snap-626484522728673979-1-a79e0789-dbc0-4a45-b57d-73575cdccb1d.avro"}, {"snapshot-id" : 4382866461439510817,"parent-snapshot-id" : 626484522728673979,"timestamp-ms" : 1622770735121,"summary" : {"operation" : "append","flink.job-id" : "947829c23ca09fba470204f5b146c191","flink.max-committed-checkpoint-id" : "9223372036854775807","added-data-files" : "1","added-records" : "1","added-files-size" : "637","changed-partition-count" : "1","total-records" : "3","total-data-files" : "3","total-delete-files" : "0","total-position-deletes" : "0","total-equality-deletes" : "0"},"manifest-list" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/snap-4382866461439510817-1-1d78fbb1-7c97-4c3a-b47d-deef81273d0e.avro"}, {"snapshot-id" : 555628243696744305,"parent-snapshot-id" : 4382866461439510817,"timestamp-ms" : 1622771727393,"summary" : {"operation" : "append","flink.job-id" : "f30e7cd040204f737ba8aaf0350340f7","flink.max-committed-checkpoint-id" : "9223372036854775807","added-data-files" : "1","added-records" : "1","added-files-size" : "637","changed-partition-count" : "1","total-records" : "4","total-data-files" : "4","total-delete-files" : "0","total-position-deletes" : "0","total-equality-deletes" : "0"},"manifest-list" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/snap-555628243696744305-1-194acdcb-abdf-4acd-8bc1-5de6d4bb76b0.avro"} ],"snapshot-log" : [ {"timestamp-ms" : 1622770732247,"snapshot-id" : 8531001366494199026}, {"timestamp-ms" : 1622770733546,"snapshot-id" : 626484522728673979}, {"timestamp-ms" : 1622770735121,"snapshot-id" : 4382866461439510817}, {"timestamp-ms" : 1622771727393,"snapshot-id" : 555628243696744305} ],"metadata-log" : [ {"timestamp-ms" : 1622770665028,"metadata-file" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/00000-8bec1008-0d2d-4dac-82a6-387d9354b2bc.metadata.json"}, {"timestamp-ms" : 1622770732247,"metadata-file" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/00001-66552574-1458-40a6-8ebc-2d5f2c58a65e.metadata.json"}, {"timestamp-ms" : 1622770733546,"metadata-file" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/00002-1b1fb277-d165-46cd-a3ee-f2b6c358213f.metadata.json"}, {"timestamp-ms" : 1622770735121,"metadata-file" : "hdfs://master/user/iceberg_hive/warehouse/iceberg_db.db/test/metadata/00003-56ffa5b6-bac5-4a1b-9e8d-2f36fe379610.metadata.json"} ]
}
如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!
2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)
互联网最坏的时代可能真的来了
我在B站读大学,大数据专业
我们在学习Flink的时候,到底在学习什么?
193篇文章暴揍Flink,这个合集你需要关注一下
Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS
Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点
我们在学习Spark的时候,到底在学习什么?
在所有Spark模块中,我愿称SparkSQL为最强!
硬刚Hive | 4万字基础调优面试小总结
数据治理方法论和实践小百科全书
标签体系下的用户画像建设小指南
4万字长文 | ClickHouse基础&实践&调优全视角解析
【面试&个人成长】2021年过半,社招和校招的经验之谈
大数据方向另一个十年开启 |《硬刚系列》第一版完结
我写过的关于成长/面试/职场进阶的文章
当我们在学习Hive的时候在学习什么?「硬刚Hive续集」
Flink+Iceberg环境搭建及生产问题处理相关推荐
- Flink开发环境搭建(maven)
1.下载scala sdk http://www.scala-lang.org/download/ 直接到这里下载sdk,(https://downloads.lightbend.com/scala/ ...
- Flink+Iceberg搭建实时数据湖实战
点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 全网最全大数据面试提升手册! 第一部分:Iceberg 核心功能原理剖析 : Apache Ice ...
- Flink教程(03)- Flink环境搭建
文章目录 01 引言 02 Local本地单机模式 2.1 工作原理 2.2 安装部署 2.3 测试验证 03 Standalone独立集群模式 3.1 工作原理 3.2 安装部署 3.3 测试验证 ...
- 小知识点:ARM 架构 Linux 大数据集群基础环境搭建(Hadoop、MySQL、Hive、Spark、Flink、ZK、Kafka、Nginx、Node)
换了 M2 芯片的 Mac,以前 x86 版本的 Linux 大数据集群基础环境搭建在 ARM 架构的虚拟机集群上有些用不了了,现在重新写一份基于 ARM 架构的,少数不兼容之外其他都差不多,相当 ...
- Ubuntu下使用valet搭建laravel生产环境
Ubuntu下使用valet搭建laravel生产环境 1.安装系统所需软件 更新软件列表 sudo apt update 2.更新软件 echo y | sudo apt upgrade 如果觉得时 ...
- 生产环境下的LAMP环境搭建
生产环境下的LAMP环境搭建 V20 学习猿地 ww.lmonkey.com 一.LAMP环境介绍 Web服务器的主要功能是提供网上信息浏览服务.所有网页的集合被称为网站,网站也只有发布到网上才能被 ...
- 搭建 K8S 环境:Centos7安装生产环境可用的K8S集群图文教程指南
搭建 K8S 环境:Centos7安装生产环境可用的K8S集群图文教程指南 一. K8S 简介 二. K8S 学习的几大拦路虎 2.1 K8S 安装对硬件要求比较高 2.2. K8S 对使用者来说要求 ...
- Flink环境搭建(standalone模式)
本文开头附:Flink 学习路线系列 ^ _ ^ 1.Flink环境搭建 1.1 架构说明(standalone模式) standalone 是 Flink 自带的一个分布式集群,它不依赖其他的资源调 ...
- [官方Flink入门笔记 ] 三、开发环境搭建和应用的配置、部署及运行
一.Flink 开发环境部署和配置 Flink 是一个以 Java 及 Scala 作为开发语言的开源大数据项目,代码开源在 GitHub 上,并使用 Maven 来编译和构建项目.对于大部分使用 F ...
最新文章
- spss分析qpcr数据_SPSS 数据分析,掌握这 6 大模块就够了!
- 我在学python-我在大学毕业后学习Linux、python的一些经验
- Android最佳性能实践(二)——分析内存的使用情况
- 使用tensorflow训练数据时遇到的问题总结
- Azure 上使用 Windows Server Core 运行 ASP.NET Core 网站
- 七、Web服务器——Junit单元测试 反射 注解学习笔记
- PostgreSQL在何处处理 sql查询之八
- Alex 的 Hadoop 菜鸟教程: 第7课 Hbase 使用教程
- 逆向工具IDA安装教程
- 使用AMOS图形建立和检测模型(3)
- 有一分数序列:2/1,3/2,5/3...求出这个数列的前20项之和(C语言原理详解)。
- echarts多坐标轴图表
- 华为路ws5200设置虚拟服务器,华为WS5200无线路由器怎么设置?
- Vue-纯前端导出word文档 Can‘t find end of central directory:is this a zip file?
- android屏幕适配之点9图片
- 如何利用GPT来发论文!!
- CSS(红色标记:待练习效果)
- 世界各国领土面积排行(第二个版本)
- 计算机管理打不开什么情况,Win7 计算机管理 打不开的解决方法
- 【Python实现】解析Drugbank文件中的XML
热门文章
- 关闭使用 xshell 按 tab 键自动补全时响个不停的警告铃声
- 获取硬盘序列号、CPU序列号
- 微信公众号最佳实践 ( 8.2)星座运势
- shp格式全国基础数据 公路 铁路 水系 国界 省界等
- html中正方形圆角框,CSS高级技巧:圆角矩形
- 计算机研究热点发展趋势,人工智能研究热点有哪些?原来这才是人工智能现在的发展方向...
- 如何构建数据化管理体系
- 关于 Google Cloud 谷歌云那些不得不说的事
- 关于QQ音乐的音乐下载
- 推荐两个高仿抖音 GitHub 开源项目( iOS 和 Android)