Dinky实践系列之FlinkCDC整库实时入仓入湖
摘要:本文介绍了 Dinky 功能实践系列的 Flink CDC 整库实时入仓入湖的分析。内容包括:
前言
环境要求
源库准备
整库同步参数介绍
整库入湖 Hudi
整库入仓 StarRocks
整库入库 MySQL
整库同步 Kafka
整库入库 PostgreSQL
整库入仓 ClickHouse
总结
一、前言
Dinky 整库同步发布已经有一段时间,通过阅读本文,您将会熟悉 Dinky 整库同步的用法。为此Dinky 社区准备了整库同步的系列,方便大家快速上手使用。
因业界中 Sink 端的库较多,特意选择了现在比较通用或者流行的库做为演示。并选择了 mysql-cdc做为 Source 端实现整库同步到 各 Sink 端。当然通过阅读本文,如果你的 Source 端 oracle-cdc,那么将 mysql-cdc替换即可。
二、环境要求
软件 | 版本 |
CDH | 6.2.0 |
Hadoop | 3.0.0-cdh6.2.0 |
Hive | 2.1.1-cdh6.2.0 |
Hudi | 0.11.1 |
Flink | 1.13.6 |
Flink CDC | 2.2.1 |
StarRocks | 2.2.0 |
Dinky | 0.6.6-SNAPSHOT |
MySQL | 5.7 |
PostgreSQL | 13 |
ClickHouse | 22.2.2.1(单机版) |
所需依赖
整库同步需要在 Flink下加载周围组件所需要的 Flink connector 即可。依赖如下:
# hive依赖包
antlr-runtime-3.5.2.jar
hive-exec-2.1.1-cdh6.2.0.jar
libfb303-0.9.3.jar
flink-sql-connector-hive-2.2.0_2.12-1.13.6.jar
hive-site.xml
# hadoop依赖
flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar
# Flink Starrrocks依赖
flink-connector-starrocks-1.2.2_flink-1.13_2.12.jar
# Hudi 依赖
hudi-flink1.13-bundle_2.12-0.11.1.jar
# Dinky hadoop依赖
flink-shaded-hadoop-3-uber-3.1.1.7.2.8.0-224-9.0.jar
# Dinky 整库同步依赖包
dlink-client-1.13-0.6.5.jar
dlink-client-base-0.6.5.jar
dlink-common-0.6.5.jar
# flink cdc依赖包
flink-sql-connector-mysql-cdc-2.2.1.jar
# mysql 驱动依赖
mysql-connector-java-8.0.21.jar
# kafka flink依赖
flink-sql-connector-kafka_2.12-1.13.6.jar
# postgresql jdbc依赖
postgresql-42.2.14.jar
# clickhouse 依赖
clickhouse-jdbc-0.2.6.jar
flink-connector-clickhouse-1.13.6.jar
说明
1.Hive 依赖包放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 下
2.Hadoop 依赖包放置 $FLINK_HOME/lib 下
3.Flink Starrrocks 依赖包放置 $FLINK_HOME/lib 和$DINKY_HOME/plugins 下
4.Hudi 依赖包放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 下
5.Dinky hadoop 依赖包放置 $DINKY_HOME/plugins 下(网盘或者群公告下载)
6.Dinky 整库同步依赖包放置 $FLINK_HOME/lib 下
7.Flink CDC 依赖包放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 下
8.MySQL 驱动依赖放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 下
9.Kafka Flink 依赖 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 下
10.PostgreSQL jdbc 依赖放置 $FLINK_HOME/lib 和$DINKY_HOME/plugins 下
11.ClickHouse 依赖放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 下
以上依赖放入后,重启 Flink 集群和 Dinky。如果中间遇到一些jar包冲突,可自行根据报错解决相关冲突的包。
三、源库准备
MySQL 建表
如下sql脚本采用 Flink CDC 官网
# mysql建表语句(同步到Starocks)
CREATE TABLE bigdata.products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE bigdata.products AUTO_INCREMENT = 101;
INSERT INTO bigdata.products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE bigdata.orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO bigdata.orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
四、整库同步参数介绍
对于 Dinky 整库同步的公共参数,在大多数 Sink 目标端都是适用的。除个别 Sink 目标端,因底层实现方式不同,所以不能一概而论。如 Hudi。公共参数依据 Dinky 提供的语法,如下:
key | value | 上下游 |
connector | mysql-cdc | source 端 |
hostname | 主机名 | source 端 |
port | 端口 | source 端 |
username | 用户名 | source 端 |
password | 密码 | source 端 |
checkpoint | checkpoint 时间间隔 | source 端 |
scan.startup.mode | 全量或增量读取 | source 端 |
parallelism | 1 | source 端 |
database-name | 数据库名称 | source 端 |
table-name | 表名称,支持正则 | source 端 |
sink.* | *代表 sink 端所有参数 | sink 端 |
提示: 对于sink.*,在使用的过程中需要注意的是,sink是必须要写的,'*' 星号代表的是所有sink端的参数,比如原生 Flink Sink建表语句的连接器写"connector",在 Dinky 整库同步语法中必须是"sink.connector"。所有的 Sink 端必须参照此语法规范。
五、整库入湖 Hudi
作业脚本
EXECUTE CDCSOURCE demo_hudi2 WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.4',
'port' = '4406',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'database-name'='bigdata',
'table-name'='bigdata\.products,bigdata\.orders',
'sink.connector'='hudi',
'sink.path'='hdfs://nameservice1/data/hudi/${tableName}',
'sink.hoodie.datasource.write.recordkey.field'='${pkList}',
'sink.hoodie.parquet.max.file.size'='268435456',
--'sink.write.precombine.field'='gmt_modified',
'sink.write.tasks'='1',
'sink.write.bucket_assign.tasks'='2',
'sink.write.precombine'='true',
'sink.compaction.async.enabled'='true',
'sink.write.task.max.size'='1024',
'sink.write.rate.limit'='3000',
'sink.write.operation'='upsert',
'sink.table.type'='COPY_ON_WRITE',
'sink.compaction.tasks'='1',
'sink.compaction.delta_seconds'='20',
'sink.compaction.async.enabled'='true',
'sink.read.streaming.skip_compaction'='true',
'sink.compaction.delta_commits'='20',
'sink.compaction.trigger.strategy'='num_or_time',
'sink.compaction.max_memory'='500',
'sink.changelog.enabled'='true',
'sink.read.streaming.enabled'='true',
'sink.read.streaming.check.interval'='3',
'sink.hive_sync.skip_ro_suffix' = 'true',
'sink.hive_sync.enable'='true',
'sink.hive_sync.mode'='hms',
'sink.hive_sync.metastore.uris'='thrift://bigdata1:9083',
'sink.hive_sync.db'='qhc_hudi_ods',
'sink.hive_sync.table'='${tableName}',
'sink.table.prefix.schema'='true'
)
创建并提交作业
查看 HDFS 目录及 Hive 表
创建 StarRocks Hudi 外部表
在创建外部表之前,在Starrocks上首先保证要将hdfs-site.xml文件分别部署到FE和BE节点的conf目录下。重启FE和BE节点。Hudi 外表是只读的,只能用于查询操作。当前支持 Hudi 的表类型为 Copy on write。
创建和管理 Hudi 资源
CREATE EXTERNAL RESOURCE "hudi0"
PROPERTIES (
"type" = "hudi",
"hive.metastore.uris" = "thrift://bigdata1:9083"
);
SHOW RESOURCES;
创建 Hudi 外部表
CREATE EXTERNAL TABLE qhc_sta.orders (
`order_id` int NULL COMMENT "",
`order_date` datetime NULL COMMENT "",
`customer_name` string NULL COMMENT "",
`price` decimal(10, 5) NULL COMMENT "",
`product_id` int NULL COMMENT "",
`order_status` int NULL COMMENT ""
) ENGINE=HUDI
PROPERTIES (
"resource" = "hudi0",
"database" = "qhc_hudi_ods",
"table" = "bigdata_orders"
);
CREATE EXTERNAL TABLE qhc_sta.products (
id INT,
name STRING,
description STRING
) ENGINE=HUDI
PROPERTIES (
"resource" = "hudi0",
"database" = "qhc_hudi_ods",
"table" = "bigdata_products"
);
查看 Hudi 外部表数据
六、整库入仓 StarRocks
作业脚本
EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.4',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector' = 'starrocks',
'sink.jdbc-url' = 'jdbc:mysql://192.168.0.4:19035',
'sink.load-url' = '192.168.0.4:18035',
'sink.username' = 'devuser',
'sink.password' = '123456',
'sink.sink.db' = 'qhc_ods',
'sink.table.prefix' = 'ods_',
'sink.table.lower' = 'true',
'sink.database-name' = 'qhc_ods',
'sink.table-name' = '${tableName}',
'sink.sink.properties.format' = 'json',
'sink.sink.properties.strip_outer_array' = 'true',
'sink.sink.max-retries' = '10',
'sink.sink.buffer-flush.interval-ms' = '15000',
'sink.sink.parallelism' = '1'
)
创建作业
StarRocks 建表
CREATE TABLE qhc_ods.`ods_orders` (
`order_id` largeint(40) NOT NULL COMMENT "",
`order_date` datetime NOT NULL COMMENT "",
`customer_name` varchar(65533) NULL COMMENT "",
`price` decimal64(10, 5) NOT NULL COMMENT "",
`product_id` bigint(20) NULL COMMENT "",
`order_status` boolean NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`order_id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`order_id`) BUCKETS 10
PROPERTIES (
"replication_num" = "3",
"colocate_with" = "qhc",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
CREATE TABLE qhc_ods.`ods_products` (
`id` largeint(40) NOT NULL COMMENT "",
`name` varchar(65533) NOT NULL COMMENT "",
`description` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"replication_num" = "3",
"colocate_with" = "qhc",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
查看 StarRocks 表
查看Starrocks表中数据是不是为空
提交 Flink 整库同步作业
再次查看 StarRocks
七、整库入库 MySQL
作业脚本
EXECUTE CDCSOURCE cdc_mysql2 WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.4',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector' = 'jdbc',
'sink.url' = 'jdbc:mysql://192.168.0.5:3306/test?characterEncoding=utf-8&useSSL=false',
'sink.username' = 'root',
'sink.password' = '123456',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.table-name' = '${tableName}',
'sink.driver' = 'com.mysql.jdbc.Driver',
'sink.sink.buffer-flush.interval' = '2s',
'sink.sink.buffer-flush.max-rows' = '100',
'sink.sink.max-retries' = '5'
)
创建作业
创建 MySQL 表
drop table test.test_products;
CREATE TABLE test.test_products (
id INTEGER NOT NULL ,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
drop table test.test_orders;
CREATE TABLE test.test_orders (
order_id INTEGER NOT NULL ,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
);
提交 Flink 整库同步作业
查看 MySQL 数据
八、整库同步 Kafka
作业脚本
# cdc作业
EXECUTE CDCSOURCE cdc_kafka WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.4',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector'='datastream-kafka',
'sink.topic'='cdctest',
'sink.brokers'='bigdata2:9092,bigdata3:9092,bigdata4:9092'
)
创建作业
创建 Kafka Topic
创建 topic 可忽略,Dinky 整库同步会自动创建。
# 创建topic
./bin/kafka-topics.sh \
--create \
--zookeeper bigdata2:2181,bigdata3:2181,bigdata4:2181 \
--replication-factor 3 \
--partitions 1 \
--topic cdctest
# 查看topic
./bin/kafka-topics.sh --list \
--zookeeper bigdata2:2181,bigdata3:2181,bigdata4:2181
提交 Flink 整库同步作业
查看消费者
查看是否2张表数据
./bin/kafka-console-consumer.sh --bootstrap-server bigdata2:9092,bigdata3:9092,bigdata4:9092 --topic cdctest --from-beginning --group test_id
九、整库入库 PostgreSQL
作业脚本
EXECUTE CDCSOURCE cdc_postgresql5 WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.4',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector' = 'jdbc',
'sink.url' = 'jdbc:postgresql://192.168.0.5:5432/test',
'sink.username' = 'test',
'sink.password' = '123456',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.table-name' = '${tableName}',
'sink.driver' = 'org.postgresql.Driver',
'sink.sink.buffer-flush.interval' = '2s',
'sink.sink.buffer-flush.max-rows' = '100',
'sink.sink.max-retries' = '5'
)
创建作业
创建 PostgreSQL 表
CREATE schema test;
drop table test.test_products;
CREATE TABLE test.test_products (
id INTEGER UNIQUE NOT NULL ,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
drop table test.test_orders;
CREATE TABLE test.test_orders (
order_id INTEGER UNIQUE NOT NULL ,
order_date timestamp NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NULL,
product_id INTEGER NULL,
order_status INTEGER NOT NULL -- Whether order has been placed
);
提交 Flink 整库同步作业
查看 PostgreSQL 数据
十、整库入仓 ClickHouse
作业脚本
EXECUTE CDCSOURCE cdc_clickhouse WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.4',
'port' = '4406',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector' = 'clickhouse',
'sink.url' = 'clickhouse://192.168.0.5:8123',
'sink.username' = 'default',
'sink.password' = '123456',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.database-name' = 'test',
'sink.table-name' = '${tableName}',
'sink.sink.batch-size' = '500',
'sink.sink.flush-interval' = '1000',
'sink.sink.max-retries' = '3'
)
创建作业
创建 ClickHouse 表
# 创建语句为本地表
create database test;
drop table test.test_products;
CREATE TABLE test.test_products (
id Int64 NOT NULL ,
name String NOT NULL,
description String
)
ENGINE = MergeTree()
ORDER BY id
PRIMARY KEY id;
drop table test.test_orders;
CREATE TABLE test.test_orders (
order_id Int64 NOT NULL ,
order_date DATETIME NOT NULL,
customer_name String NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id Int64 NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
)
ENGINE = MergeTree()
ORDER BY order_id
PRIMARY KEY order_id;
提交 Flink 整库同步作业
查看 ClickHouse 数据
Dinky实践系列之FlinkCDC整库实时入仓入湖相关推荐
- flink 开发平台Dinky 构建 Flink CDC 整库入仓入湖
原文:http://www.senlt.cn/article/866753893.html 摘要:本文介绍了如何使用 Dinky 实时计算平台构建 Flink CDC 整库入仓入湖.内容包括: 背景 ...
- Flink CDC 系列 - Flink CDC 如何简化实时数据入湖入仓
摘要:本文整理自伍翀 (云邪).徐榜江 (雪尽) 在 Flink Forward Asia 2021 的分享,该分享以 5 个章节详细介绍如何使用 Flink CDC 来简化实时数据的入湖入仓, 文章 ...
- 【大数据新手上路】“零基础”系列课程--MySQL 数据整库迁移到 MaxCompute
随着公司业务的增多,云数据库 RDS 下的 MySQL 数据库的表越来越多,想要把它全部迁移到 MaxCompute 中进行计算分析,但又愁要配置太多次同步任务.如何能将大量的数据表一次性上传到 Ma ...
- 【RxSwift 实践系列 2/3】thinking in Rx- Create和Drive
---> 上节 [RxSwift 实践系列 1/3]为什么使用RxSwift RxSwift 是一种编程思想,不是一门语言,学习他最难的部分就是thinking in Reactive Prog ...
- .net core实践系列之短信服务-Api的SDK的实现与测试
前言 上一篇<.net core实践系列之短信服务-Sikiro.SMS.Api服务的实现>讲解了API的设计与实现,本篇主要讲解编写接口的SDK编写还有API的测试. 或许有些人会认为, ...
- 微服务架构设计实践系列之五:架构准备阶段
微服务架构设计实践系列之五:架构准备阶段 原文:微服务架构设计实践系列之五:架构准备阶段 版权声明: https://blog.csdn.net/beyondself_77/article/detai ...
- 架构技术实践系列文章
王晓宇:电商异步消息系统的实践 秦鹏:从应用到平台,云服务架构的演进过程 郭炜:从0到N建立高性价比的大数据平台 李智慧:宅米网技术变迁--初创互联网公司的技术发展之路 陶文质:分布式系统设计的求生之 ...
- React项目实践系列一
数据分析平台-实践系列一 项目创建于2018年1月底,到现在已经接近半年,在此写下半年来项目的实践过程以及自己对前端的学习与体悟. 技术选型 框架: React 路由: React-Router 4 ...
- 最佳实践系列:前端代码标准和最佳实践
最佳实践系列:前端代码标准 @窝窝商城前端(刘轶/李晨/徐利/穆尚)翻译于2012年 版本0.55 @郑昀校对 isobar的这个前端代码标准和最佳实践文档,涵盖了Web应用开发的方方面面,我们翻译了 ...
最新文章
- MIT研发无人机仓库管理系统,或将帮沃尔玛省下几十亿美元
- cf1039D 分块
- 20162314 《Program Design Data Structures》Learning Summary Of The First Week
- c#利用定时器自动备份数据库(mysql)
- 好用的平板电脑_支架里的变形金刚让手机、电脑、平板更好用!6种角度,1秒切换...
- 设计模式之适配器模式(Adapter Pattern)
- len()与cap()的区别
- Python资源大全 屌炸Python库
- python题型大全_python进阶:练习题 汇总
- 戴尔SC5020发布,专为提高效率/经济性优化设计的中端存储利器
- 使用DevExpress的WebChartControl控件绘制图表(柱状图、折线图、饼图)
- sql查询初学者指南_面向初学者SQL Server查询执行计划–类型和选项
- ArcGIS——vs2015安装arcgis engine不兼容
- 电脑计算机c盘缓存清理,电脑资深玩家教你如何有效进行C盘清理
- 基于PLC四层电梯模型控制系统课程设计
- 【10.24】一个只属于程序员的节日
- 王垠:完全用Linux工作
- 使照片带有妙的电影色彩55款工具套件的lr微妙的电影调色预设
- 使用Python进行并发编程
- 2021年上海市安全员C证考试报名及上海市安全员C证找解析
热门文章
- 第四课,Extjs中面板的应用
- 前端学习点滴留痕1: bgcolor+background
- Windows系统下通过JNI调用dll动态库的实现
- 【HCIA持续更新】NAPT
- html栅格系统格式,Bootstrap的栅格系统是什么?栅格系统详解
- 牛奶包装袋上的秘密:不知道真的还是假的
- 系统大玩家 无忧装机GHOST XP V7.0
- 快递单信息抽取【二】基于ERNIE1.0至ErnieGram + CRF预训练模型
- android 日记卡片,卡片日记 - 来自韩国设计师和中国程序员的跨国合作 - Android 应用 - 日记 - 【最美应用】...
- centos7.8安装mysql5.7步骤记录