文章目录

  • 背景
  • iceberg简介
  • flink实时写入
    • 准备sql client环境
    • 创建catalog
    • 创建db
    • 创建table
    • 插入数据
    • 查询
    • 代码版本
  • 总结

背景

随着大数据处理结果的实时性要求越来越高,越来越多的大数据处理从离线转到了实时,其中以flink为主的实时计算在大数据处理中占有重要地位。

Flink消费kafka等实时数据流。然后实时写入hive,在大数据处理方面有着广泛的应用。此外由于列式存储格式如parquet或者orc在查询性能方面有着显著的提高,所以大家都会优先选择列式存储作为我们的存储格式。

传统的这种架构看似不错,但是还是有很多没有解决的问题:

  • 实时写入造成大量小文件,需要单独的程序来进行合并
  • 实时的写入,读取,还有合并小文件在同时进行,那么如何保证事务,读取数据的时候不会出现脏读。
  • Hdfs的数据一般是一次写入。多次读写,但是如果因为程序出错导致数据错了,确实要修改某一条数据改怎么办
  • 消费kafka的数据落地到hive,有一天kafka的数据多了几个字段,如何同步到hive?必须删了重建吗?
  • 订单等业务数据一般存储在传统数据库,如mysql等。如何实时同步这些cdc数据到hive仓库呢,包括ddl和dml

如果你有上面的需求,那么你可以考虑一下数据湖了,目前开源的数据湖技术主要有以下几个:delta、hudi、iceberg,但是侧重点有所不同,我上面说的问题也不完全都能实现,但是这些都是数据湖要做的东西,随着社区的不断发展,这些功能都会有的。

但是目前世面上这些数据湖技术都与spark紧密绑定。而我们目前实时计算主要以flink为主,而且我个人觉得未来实时计算也将以flink为主,所以我选择了iceberg为我们的数据湖,虽然他有一些功能不是很完善,但是有着良好的抽象,并且不强制绑定spark,所以对于iceberg没有的功能,我们可以自己给补全,再回馈给社区,一起成长。

iceberg简介

其实对于iceberg,官方的定义是一种表格式。

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.

我们可以简单理解为他是基于计算层(flink , spark)和存储层(orc,parqurt)的一个中间层,我们在hive建立一个iceberg格式的表。用flink或者spark写入iceberg,然后再通过其他方式来读取这个表,比如spark,flink,presto等。

当然数据湖的概念远不止这点,我们今天就先简单的这么理解,后续写一篇文章专门介绍一下iceberg。

flink实时写入

准备sql client环境

目前官方的测试版本是基于scala 2.12版本的flink。所以我们也用和官方同步的版本来测试下,下载下面的两个jar放到flink的lib下面,然后启动一下flink集群,standalone模式。

  • 下载flink :flink-1.11.2-bin-scala_2.12.tgz

  • 下载 iceberg-flink-runtime-xxx.jar

  • 下载flink 集成hive的connector,flink-sql-connector-hive-2.3.6_2.12-1.11.2.jar

  • 目前官方的hive测试版本是 2.3.7,其他的版本可能有不兼容

注意要配置flink的checkpoint,因为目前flink提交iceberg的信息是在每次checkpoint的时候提交的。在sql client配置checkpoint的方法如下:

在flink-conf.yaml添加如下配置

execution.checkpointing.interval: 10s   # checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 10  # checkpoint 失败容忍次数

创建catalog

目前系统提供的catalog有hivecatalog和hadoopcatalog以及自定义catlog

CREATE CATALOG iceberg WITH ('type'='iceberg','catalog-type'='hive','uri'='thrift://localhost:9083','clients'='5','property-version'='1','warehouse'='hdfs://nn:8020/warehouse/path'
);

执行完之后,显示如下:

Flink SQL> show catalogs;
default_catalog
iceberg

如果不想每次启动sql client都重新执行ddl,可以在sql-client-defaults.yaml 里面皮遏制一下:

catalogs: # empty list
# A typical catalog definition looks like:- name: hivetype: hivehive-conf-dir: /Users/user/work/hive/confdefault-database: default- name: icebergtype: icebergwarehouse: hdfs://localhost/user/hive2/warehouseuri: thrift://localhost:9083catalog-type: hive

创建db

use catalog iceberg;
CREATE DATABASE iceberg_db;
USE iceberg_db;

创建table

CREATE TABLE iceberg.iceberg_db.iceberg_001 (id BIGINT COMMENT 'unique id',data STRING
) WITH ('connector'='iceberg','write.format.default'='ORC');

插入数据

我们依然创建一个datagen的connector。

CREATE TABLE sourceTable (userid int,f_random_str STRING
) WITH ('connector' = 'datagen','rows-per-second'='100','fields.userid.kind'='random','fields.userid.min'='1','fields.userid.max'='100',
'fields.f_random_str.length'='10'
)

这时候我们看到有两个表了


Flink SQL> show tables;
iceberg_001
sourcetable

然后执行insert into插入数据:

insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable

查询

我们这里使用presto来查询

presto的配置iceberg.properties 如下:

connector.name=iceberg
hive.metastore.uri=thrift://localhost:9083

代码版本

public class Flink2Iceberg{public static void main(String[] args) throws Exception{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(10000);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);tenv.executeSql("CREATE CATALOG iceberg WITH (\n" +"  'type'='iceberg',\n" +"  'catalog-type'='hive'," +"  'hive-conf-dir'='/Users/user/work/hive/conf/'" +")");tenv.useCatalog("iceberg");tenv.executeSql("CREATE DATABASE iceberg_db");tenv.useDatabase("iceberg_db");tenv.executeSql("CREATE TABLE sourceTable (\n" +" userid int,\n" +" f_random_str STRING\n" +") WITH (\n" +" 'connector' = 'datagen',\n" +" 'rows-per-second'='100',\n" +" 'fields.userid.kind'='random',\n" +" 'fields.userid.min'='1',\n" +" 'fields.userid.max'='100',\n" +"'fields.f_random_str.length'='10'\n" +")");tenv.executeSql("insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable");}
}

具体见:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/iceberg/src/main/java/com/Flink2Iceberg.java

总结

总结一下,iceberg的资料比较少,很多设计或者讨论等需要关注issues,然后再去撸源码,可能对于刚入门的小伙伴来说有点困难。后续我也会多分享一些关于iceberg的文章,欢迎大家关注我公众号【大数据技术与应用实战】。

Flink集成数据湖之实时数据写入iceberg相关推荐

  1. phython在file同时写入两个_Flink集成数据湖之实时数据写入iceberg

    背景 iceberg简介 flink实时写入 准备sql client环境 创建catalog 创建db 创建table 插入数据 查询 代码版本 总结 背景 随着大数据处理结果的实时性要求越来越高, ...

  2. Flink 如何实时分析 Iceberg 数据湖的 CDC 数据

    简介:数据湖的架构中,CDC 数据实时读写的方案和原理 本文由李劲松.胡争分享,社区志愿者杨伟海.李培殿整理.主要介绍在数据湖的架构中,CDC 数据实时读写的方案和原理.文章主要分为 4 个部分内容: ...

  3. 数据湖在大数据场景下应用和实施方案调研笔记(增强版)

    点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 在读本文前你应该看过这些: <我看好数据湖的未来,但不看好数据湖的现在> <数据 ...

  4. 数据湖在大数据典型场景下应用调研个人笔记

    点击上方蓝色字体,选择"设为星标" 回复"资源"获取更多资源 数据湖是一种不断演进中.可扩展的大数据存储.处理.分析的基础设施:以数据为导向,实现任意来源.任意 ...

  5. 数据湖概念以及数据湖产生的背景和价值

    一.数据湖的概念 数据湖是一个集中式存储库,允许以任意规模存储所有结构化和非结构化数据.您可以按原样存储数据(无需先对数据进行结构化处理),并运行不同类型的分析 – 从控制面板和可视化到大数据处理.实 ...

  6. 数据湖04:数据湖技术架构演进

    系列专题:数据湖系列文章 1. 背景 国内的大型互联网公司,每天都会生成几十.几百TB,甚至几PB的原始数据.这些公司通常采用开源的大数据组件来搭建大数据平台.大数据平台经历过"以Hadoo ...

  7. 腾讯云首次披露云原生智能数据湖全景图,数据湖之争再起波澜

    (图片下载自视觉中国) 数据湖并不是一个很有历史感的概念,从2010年才首次被Pentaho创始人兼首席技术官詹姆斯·狄克逊(James Dixon)提出,至今发展仅有十多年,但已经成为一个谈到大数据 ...

  8. 深度|从数据仓库到数据湖——浅谈数据架构演进

    转载自https://mp.weixin.qq.com/s/321mkZsuxqXOme5hw_83mQ 网管产品需要从数据仓库的角度来看,才能获得完整的视图.数据集成真正从大数据的角度来看,才能明白 ...

  9. 云原生数据湖解决方案打破数据孤岛,大数据驱动互娱行业发展

    简介: 数据湖是以集中.统一方式存储各种类型数据,数据湖可以与多种计算引擎直接对接,我们使用OSS作为数据湖底座,数据统一存储在OSS中,有效消除了数据孤岛现象,多种计算与处理分析引擎能够直接对存储在 ...

最新文章

  1. vi/vim 删除:一行, 一个字符, 单词, 每行第一个字符 命令
  2. XHTML5 与 HTML 4.01的差异
  3. SQLServer存储过程/函数加/解密(轉)
  4. 原始 H.264 码流播放
  5. 前后端分离项目部署上线详细教程
  6. Spring: (一) -- 春雨润物之 核心IOC
  7. DataReader对象的基本使用 c#
  8. 晨风机器人对接php_php封装实现钉钉机器人报警接口的示例代码
  9. macos 此服务器的证书无效_网易出现重大失误,忘记续费HTTPS证书导致大量用户受影响...
  10. gtihub第二次上传项目_国道岱山项目双合大桥墩柱桩基打桩施工突破100根
  11. kafka学习总结之集群部署和zookeeper
  12. oracle全局高速缓存,Oracle技术之设置系统全局区SGA命令
  13. Kerberos安装及拖管Ambari 2.7
  14. 深度解析服务器需要虚拟化的两大条件
  15. js 调用摄像头拍照
  16. 去年我国出生率跌破1%,有什么影响?
  17. 使用 Apache FOP 2.3 + docbook-xsl-ns-1.79.1 转换 Docbook 5.1 格式的 XML 文档成 PDF/RTF 文件
  18. Norms in Matrix and Vector
  19. 循环结构验证哥德巴赫猜想
  20. java中 enum什么意思_enum在java中是什么意思

热门文章

  1. Android Studio 4.1不报红
  2. SQL(结构化查询语言)介绍
  3. LAMP动静分离(分布式)部署
  4. android ellipsize 多行,android TextView多行文本(超过3行)使用ellipsize属性无效问题的解决方法...
  5. 音视频封装格式转换器(支持avi格式转换),基于FFmpeg4.1实现(音视频学习笔记二)
  6. UNIX/LINUX 平台可执行文件格式分析
  7. “老板,对不起!我胃不好,您给的饼我消化不来”,我去腾讯吃“软饭”了!
  8. linux项目实施总结,[转载]SAP项目实施总结
  9. 打工人必备面试神器,值得一看!
  10. 【react】antd