摘要:本文介绍了 Dinky 功能实践系列的 Flink CDC 整库实时入仓入湖的分析。内容包括:

  1. 前言

  2. 环境要求

  3. 源库准备

  4. 整库同步参数介绍

  5. 整库入湖 Hudi

  6. 整库入仓 StarRocks

  7. 整库入库 MySQL

  8. 整库同步 Kafka

  9. 整库入库 PostgreSQL

  10. 整库入仓 ClickHouse

  11. 总结

一、前言

Dinky 整库同步发布已经有一段时间,通过阅读本文,您将会熟悉 Dinky 整库同步的用法。为此Dinky 社区准备了整库同步的系列,方便大家快速上手使用。

因业界中 Sink 端的库较多,特意选择了现在比较通用或者流行的库做为演示。并选择了 mysql-cdc做为 Source 端实现整库同步到 各 Sink 端。当然通过阅读本文,如果你的 Source 端 oracle-cdc,那么将 mysql-cdc替换即可。

二、环境要求

软件 版本
CDH 6.2.0
Hadoop 3.0.0-cdh6.2.0
Hive 2.1.1-cdh6.2.0
Hudi 0.11.1
Flink 1.13.6
Flink CDC 2.2.1
StarRocks 2.2.0
Dinky 0.6.6-SNAPSHOT
MySQL 5.7
PostgreSQL 13
ClickHouse 22.2.2.1(单机版)

所需依赖

‍    整库同步需要在 Flink下加载周围组件所需要的 Flink connector 即可。依赖如下:

# hive依赖包antlr-runtime-3.5.2.jarhive-exec-2.1.1-cdh6.2.0.jarlibfb303-0.9.3.jarflink-sql-connector-hive-2.2.0_2.12-1.13.6.jarhive-site.xml# hadoop依赖flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar# Flink Starrrocks依赖flink-connector-starrocks-1.2.2_flink-1.13_2.12.jar# Hudi 依赖hudi-flink1.13-bundle_2.12-0.11.1.jar# Dinky hadoop依赖flink-shaded-hadoop-3-uber-3.1.1.7.2.8.0-224-9.0.jar# Dinky 整库同步依赖包dlink-client-1.13-0.6.5.jardlink-client-base-0.6.5.jardlink-common-0.6.5.jar# flink cdc依赖包flink-sql-connector-mysql-cdc-2.2.1.jar# mysql 驱动依赖mysql-connector-java-8.0.21.jar# kafka flink依赖flink-sql-connector-kafka_2.12-1.13.6.jar# postgresql jdbc依赖postgresql-42.2.14.jar# clickhouse 依赖clickhouse-jdbc-0.2.6.jarflink-connector-clickhouse-1.13.6.jar

说明

1.Hive 依赖包放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 下

2.Hadoop 依赖包放置 $FLINK_HOME/lib 

3.Flink Starrrocks 依赖包放置 $FLINK_HOME/lib $DINKY_HOME/plugins 

4.Hudi 依赖包放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 下

5.Dinky hadoop 依赖包放置 $DINKY_HOME/plugins 下(网盘或者群公告下载)

6.Dinky 整库同步依赖包放置 $FLINK_HOME/lib 

7.Flink CDC 依赖包放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 

8.MySQL 驱动依赖放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 

9.Kafka Flink 依赖 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 

10.PostgreSQL jdbc 依赖放置 $FLINK_HOME/lib $DINKY_HOME/plugins 

11.ClickHouse 依赖放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 

以上依赖放入后,重启 Flink 集群和 Dinky。如果中间遇到一些jar包冲突,可自行根据报错解决相关冲突的包。

三、源库准备

MySQL 建表

如下sql脚本采用 Flink CDC 官网

# mysql建表语句(同步到Starocks)CREATE TABLE bigdata.products (  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,  name VARCHAR(255) NOT NULL,  description VARCHAR(512));
ALTER TABLE bigdata.products AUTO_INCREMENT = 101;
INSERT INTO bigdata.productsVALUES (default,"scooter","Small 2-wheel scooter"),       (default,"car battery","12V car battery"),       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),       (default,"hammer","12oz carpenter's hammer"),       (default,"hammer","14oz carpenter's hammer"),       (default,"hammer","16oz carpenter's hammer"),       (default,"rocks","box of assorted rocks"),       (default,"jacket","water resistent black wind breaker"),       (default,"spare tire","24 inch spare tire");       CREATE TABLE bigdata.orders (  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,  order_date DATETIME NOT NULL,  customer_name VARCHAR(255) NOT NULL,  price DECIMAL(10, 5) NOT NULL,  product_id INTEGER NOT NULL,  order_status BOOLEAN NOT NULL -- Whether order has been placed) AUTO_INCREMENT = 10001;
INSERT INTO bigdata.ordersVALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

四、整库同步参数介绍

对于 Dinky 整库同步的公共参数,在大多数 Sink 目标端都是适用的。除个别 Sink 目标端,因底层实现方式不同,所以不能一概而论。如 Hudi。公共参数依据 Dinky 提供的语法,如下:

key value 上下游
connector mysql-cdc source 端
hostname 主机名 source 端
port 端口 source 端
username 用户名 source 端
password 密码 source 端
checkpoint checkpoint 时间间隔 source 端
scan.startup.mode 全量或增量读取 source 端
parallelism 1 source 端
database-name 数据库名称 source 端
table-name 表名称,支持正则 source 端
sink.* *代表 sink 端所有参数 sink 端

提示: 对于sink.*,在使用的过程中需要注意的是,sink是必须要写的,'*' 星号代表的是所有sink端的参数,比如原生 Flink Sink建表语句的连接器写"connector",在 Dinky 整库同步语法中必须是"sink.connector"。所有的 Sink 端必须参照此语法规范。

五、整库入湖 Hudi

作业脚本

EXECUTE CDCSOURCE demo_hudi2 WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.0.4', 'port' = '4406', 'username' = 'root', 'password' = '123456', 'checkpoint' = '3000', 'scan.startup.mode' = 'initial', 'parallelism' = '1','database-name'='bigdata','table-name'='bigdata\.products,bigdata\.orders','sink.connector'='hudi','sink.path'='hdfs://nameservice1/data/hudi/${tableName}','sink.hoodie.datasource.write.recordkey.field'='${pkList}','sink.hoodie.parquet.max.file.size'='268435456',--'sink.write.precombine.field'='gmt_modified','sink.write.tasks'='1','sink.write.bucket_assign.tasks'='2','sink.write.precombine'='true','sink.compaction.async.enabled'='true','sink.write.task.max.size'='1024','sink.write.rate.limit'='3000','sink.write.operation'='upsert','sink.table.type'='COPY_ON_WRITE','sink.compaction.tasks'='1','sink.compaction.delta_seconds'='20','sink.compaction.async.enabled'='true','sink.read.streaming.skip_compaction'='true','sink.compaction.delta_commits'='20','sink.compaction.trigger.strategy'='num_or_time','sink.compaction.max_memory'='500','sink.changelog.enabled'='true','sink.read.streaming.enabled'='true','sink.read.streaming.check.interval'='3','sink.hive_sync.skip_ro_suffix' = 'true', 'sink.hive_sync.enable'='true','sink.hive_sync.mode'='hms','sink.hive_sync.metastore.uris'='thrift://bigdata1:9083','sink.hive_sync.db'='qhc_hudi_ods','sink.hive_sync.table'='${tableName}','sink.table.prefix.schema'='true')

创建并提交作业

查看 HDFS 目录及 Hive 表

创建 StarRocks Hudi 外部表

在创建外部表之前,在Starrocks上首先保证要将hdfs-site.xml文件分别部署到FE和BE节点的conf目录下。重启FE和BE节点。Hudi 外表是只读的,只能用于查询操作。当前支持 Hudi 的表类型为 Copy on write。

创建和管理 Hudi 资源

CREATE EXTERNAL RESOURCE "hudi0" PROPERTIES (     "type" = "hudi",     "hive.metastore.uris" = "thrift://bigdata1:9083");SHOW RESOURCES;

创建 Hudi 外部表

CREATE EXTERNAL TABLE qhc_sta.orders (   `order_id` int  NULL COMMENT "",  `order_date` datetime  NULL COMMENT "",  `customer_name` string NULL COMMENT "",  `price` decimal(10, 5)  NULL COMMENT "",  `product_id` int NULL COMMENT "",  `order_status` int NULL COMMENT  "") ENGINE=HUDI PROPERTIES (     "resource" = "hudi0",     "database" = "qhc_hudi_ods",     "table" = "bigdata_orders" ); 
CREATE EXTERNAL TABLE qhc_sta.products (      id INT,  name STRING,  description STRING) ENGINE=HUDI PROPERTIES (     "resource" = "hudi0",     "database" = "qhc_hudi_ods",     "table" = "bigdata_products" ); 

查看 Hudi 外部表数据

六、整库入仓 StarRocks

作业脚本

EXECUTE CDCSOURCE jobname WITH (   'connector' = 'mysql-cdc',  'hostname' = '192.168.0.4',  'port' = '3306',  'username' = 'root',  'password' = '123456',  'checkpoint' = '3000',  'scan.startup.mode' = 'initial',  'parallelism' = '1',  'table-name' = 'bigdata\.products,bigdata\.orders',  'sink.connector' = 'starrocks',  'sink.jdbc-url' = 'jdbc:mysql://192.168.0.4:19035',  'sink.load-url' = '192.168.0.4:18035',  'sink.username' = 'devuser',  'sink.password' = '123456',  'sink.sink.db' = 'qhc_ods',  'sink.table.prefix' = 'ods_',  'sink.table.lower' = 'true',  'sink.database-name' = 'qhc_ods',  'sink.table-name' = '${tableName}',   'sink.sink.properties.format' = 'json',   'sink.sink.properties.strip_outer_array' = 'true',   'sink.sink.max-retries' = '10',   'sink.sink.buffer-flush.interval-ms' = '15000',   'sink.sink.parallelism' = '1')

创建作业

StarRocks 建表

​​​​​​​

CREATE TABLE qhc_ods.`ods_orders` (  `order_id` largeint(40) NOT NULL COMMENT "",  `order_date` datetime NOT NULL COMMENT "",  `customer_name` varchar(65533) NULL COMMENT "",  `price` decimal64(10, 5) NOT NULL COMMENT "",  `product_id` bigint(20) NULL COMMENT "",  `order_status` boolean NULL COMMENT "") ENGINE=OLAP PRIMARY KEY(`order_id`)COMMENT "OLAP"DISTRIBUTED BY HASH(`order_id`) BUCKETS 10 PROPERTIES ("replication_num" = "3","colocate_with" = "qhc","in_memory" = "false","storage_format" = "DEFAULT");
CREATE TABLE qhc_ods.`ods_products` (  `id` largeint(40) NOT NULL COMMENT "",  `name` varchar(65533) NOT NULL COMMENT "",  `description` varchar(65533) NULL COMMENT "") ENGINE=OLAP PRIMARY KEY(`id`)COMMENT "OLAP"DISTRIBUTED BY HASH(`id`) BUCKETS 10 PROPERTIES ("replication_num" = "3","colocate_with" = "qhc","in_memory" = "false","storage_format" = "DEFAULT");

查看 StarRocks 表

查看Starrocks表中数据是不是为空

提交 Flink 整库同步作业

再次查看 StarRocks

七、整库入库 MySQL

作业脚本

​​​​​​​

EXECUTE CDCSOURCE cdc_mysql2 WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.0.4', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'checkpoint' = '3000', 'scan.startup.mode' = 'initial', 'parallelism' = '1', 'table-name' = 'bigdata\.products,bigdata\.orders', 'sink.connector' = 'jdbc', 'sink.url' = 'jdbc:mysql://192.168.0.5:3306/test?characterEncoding=utf-8&useSSL=false', 'sink.username' = 'root', 'sink.password' = '123456', 'sink.sink.db' = 'test', 'sink.table.prefix' = 'test_', 'sink.table.lower' = 'true', 'sink.table-name' = '${tableName}', 'sink.driver' = 'com.mysql.jdbc.Driver', 'sink.sink.buffer-flush.interval' = '2s', 'sink.sink.buffer-flush.max-rows' = '100', 'sink.sink.max-retries' = '5')

创建作业

创建 MySQL 表

​​​​​​​

drop table test.test_products;CREATE TABLE test.test_products (  id INTEGER NOT NULL ,  name VARCHAR(255) NOT NULL,  description VARCHAR(512));
drop table test.test_orders;CREATE TABLE test.test_orders (  order_id INTEGER NOT NULL ,  order_date DATETIME NOT NULL,  customer_name VARCHAR(255) NOT NULL,  price DECIMAL(10, 5) NOT NULL,  product_id INTEGER NOT NULL,  order_status BOOLEAN NOT NULL -- Whether order has been placed); 

提交 Flink 整库同步作业

查看 MySQL 数据

八、整库同步 Kafka

作业脚本

​​​​​​​

# cdc作业EXECUTE CDCSOURCE cdc_kafka WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.0.4', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'checkpoint' = '3000', 'scan.startup.mode' = 'initial', 'parallelism' = '1', 'table-name' = 'bigdata\.products,bigdata\.orders',  'sink.connector'='datastream-kafka',  'sink.topic'='cdctest',  'sink.brokers'='bigdata2:9092,bigdata3:9092,bigdata4:9092')

创建作业

创建 Kafka Topic

创建 topic 可忽略,Dinky 整库同步会自动创建。

# 创建topic./bin/kafka-topics.sh \ --create \ --zookeeper   bigdata2:2181,bigdata3:2181,bigdata4:2181 \ --replication-factor 3 \ --partitions 1 \ --topic cdctest# 查看topic./bin/kafka-topics.sh  --list \--zookeeper bigdata2:2181,bigdata3:2181,bigdata4:2181

提交 Flink 整库同步作业

查看消费者

查看是否2张表数据

./bin/kafka-console-consumer.sh  --bootstrap-server bigdata2:9092,bigdata3:9092,bigdata4:9092  --topic cdctest  --from-beginning  --group test_id

九、整库入库 PostgreSQL

作业脚本

​​​​​​​

EXECUTE CDCSOURCE cdc_postgresql5 WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.0.4', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'checkpoint' = '3000', 'scan.startup.mode' = 'initial', 'parallelism' = '1', 'table-name' = 'bigdata\.products,bigdata\.orders', 'sink.connector' = 'jdbc', 'sink.url' = 'jdbc:postgresql://192.168.0.5:5432/test', 'sink.username' = 'test', 'sink.password' = '123456', 'sink.sink.db' = 'test', 'sink.table.prefix' = 'test_', 'sink.table.lower' = 'true', 'sink.table-name' = '${tableName}', 'sink.driver' = 'org.postgresql.Driver', 'sink.sink.buffer-flush.interval' = '2s', 'sink.sink.buffer-flush.max-rows' = '100', 'sink.sink.max-retries' = '5')

创建作业

创建 PostgreSQL 表

​​​​​​​

CREATE schema test;drop table test.test_products;CREATE TABLE test.test_products (  id INTEGER UNIQUE NOT NULL ,  name VARCHAR(255) NOT NULL,  description VARCHAR(512));
drop table test.test_orders;CREATE TABLE test.test_orders (  order_id INTEGER UNIQUE NOT NULL ,  order_date timestamp   NULL,  customer_name VARCHAR(255) NOT NULL,  price DECIMAL(10, 5) NULL,  product_id INTEGER NULL,  order_status INTEGER NOT NULL -- Whether order has been placed); 

提交 Flink 整库同步作业

查看 PostgreSQL 数据

十、整库入仓 ClickHouse

作业脚本

​​​​​​​

EXECUTE CDCSOURCE cdc_clickhouse WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.0.4', 'port' = '4406', 'username' = 'root', 'password' = '123456', 'checkpoint' = '3000', 'scan.startup.mode' = 'initial', 'parallelism' = '1', 'table-name' = 'bigdata\.products,bigdata\.orders',  'sink.connector' = 'clickhouse',  'sink.url' = 'clickhouse://192.168.0.5:8123',  'sink.username' = 'default',  'sink.password' = '123456',  'sink.sink.db' = 'test',  'sink.table.prefix' = 'test_',  'sink.table.lower' = 'true',  'sink.database-name' = 'test',  'sink.table-name' = '${tableName}',  'sink.sink.batch-size' = '500',  'sink.sink.flush-interval' = '1000',  'sink.sink.max-retries' = '3')

创建作业

创建 ClickHouse 表

​​​​​​​

# 创建语句为本地表create database test;drop table test.test_products;CREATE TABLE test.test_products (  id Int64  NOT NULL ,  name String NOT NULL,  description String)ENGINE = MergeTree()ORDER BY idPRIMARY KEY id;
drop table test.test_orders;CREATE TABLE test.test_orders (  order_id Int64  NOT NULL ,  order_date DATETIME NOT NULL,  customer_name String NOT NULL,  price DECIMAL(10, 5) NOT NULL,  product_id Int64  NOT NULL,  order_status BOOLEAN NOT NULL -- Whether order has been placed)ENGINE = MergeTree()ORDER BY order_idPRIMARY KEY order_id;

提交 Flink 整库同步作业

查看 ClickHouse 数据

Dinky实践系列之FlinkCDC整库实时入仓入湖相关推荐

  1. flink 开发平台Dinky 构建 Flink CDC 整库入仓入湖

    原文:http://www.senlt.cn/article/866753893.html 摘要:本文介绍了如何使用 Dinky 实时计算平台构建 Flink CDC 整库入仓入湖.内容包括: 背景 ...

  2. Flink CDC 系列 - Flink CDC 如何简化实时数据入湖入仓

    摘要:本文整理自伍翀 (云邪).徐榜江 (雪尽) 在 Flink Forward Asia 2021 的分享,该分享以 5 个章节详细介绍如何使用 Flink CDC 来简化实时数据的入湖入仓, 文章 ...

  3. 【大数据新手上路】“零基础”系列课程--MySQL 数据整库迁移到 MaxCompute

    随着公司业务的增多,云数据库 RDS 下的 MySQL 数据库的表越来越多,想要把它全部迁移到 MaxCompute 中进行计算分析,但又愁要配置太多次同步任务.如何能将大量的数据表一次性上传到 Ma ...

  4. 【RxSwift 实践系列 2/3】thinking in Rx- Create和Drive

    ---> 上节 [RxSwift 实践系列 1/3]为什么使用RxSwift RxSwift 是一种编程思想,不是一门语言,学习他最难的部分就是thinking in Reactive Prog ...

  5. .net core实践系列之短信服务-Api的SDK的实现与测试

    前言 上一篇<.net core实践系列之短信服务-Sikiro.SMS.Api服务的实现>讲解了API的设计与实现,本篇主要讲解编写接口的SDK编写还有API的测试. 或许有些人会认为, ...

  6. 微服务架构设计实践系列之五:架构准备阶段

    微服务架构设计实践系列之五:架构准备阶段 原文:微服务架构设计实践系列之五:架构准备阶段 版权声明: https://blog.csdn.net/beyondself_77/article/detai ...

  7. 架构技术实践系列文章

    王晓宇:电商异步消息系统的实践 秦鹏:从应用到平台,云服务架构的演进过程 郭炜:从0到N建立高性价比的大数据平台 李智慧:宅米网技术变迁--初创互联网公司的技术发展之路 陶文质:分布式系统设计的求生之 ...

  8. React项目实践系列一

    数据分析平台-实践系列一 项目创建于2018年1月底,到现在已经接近半年,在此写下半年来项目的实践过程以及自己对前端的学习与体悟. 技术选型 框架: React 路由: React-Router 4 ...

  9. 最佳实践系列:前端代码标准和最佳实践

    最佳实践系列:前端代码标准 @窝窝商城前端(刘轶/李晨/徐利/穆尚)翻译于2012年 版本0.55 @郑昀校对 isobar的这个前端代码标准和最佳实践文档,涵盖了Web应用开发的方方面面,我们翻译了 ...

最新文章

  1. MIT研发无人机仓库管理系统,或将帮沃尔玛省下几十亿美元
  2. cf1039D 分块
  3. 20162314 《Program Design Data Structures》Learning Summary Of The First Week
  4. c#利用定时器自动备份数据库(mysql)
  5. 好用的平板电脑_支架里的变形金刚让手机、电脑、平板更好用!6种角度,1秒切换...
  6. 设计模式之适配器模式(Adapter Pattern)
  7. len()与cap()的区别
  8. Python资源大全 屌炸Python库
  9. python题型大全_python进阶:练习题 汇总
  10. 戴尔SC5020发布,专为提高效率/经济性优化设计的中端存储利器
  11. 使用DevExpress的WebChartControl控件绘制图表(柱状图、折线图、饼图)
  12. sql查询初学者指南_面向初学者SQL Server查询执行计划–类型和选项
  13. ArcGIS——vs2015安装arcgis engine不兼容
  14. 电脑计算机c盘缓存清理,电脑资深玩家教你如何有效进行C盘清理
  15. 基于PLC四层电梯模型控制系统课程设计
  16. 【10.24】一个只属于程序员的节日
  17. 王垠:完全用Linux工作
  18. 使照片带有妙的电影色彩55款工具套件的lr微妙的电影调色预设
  19. 使用Python进行并发编程
  20. 2021年上海市安全员C证考试报名及上海市安全员C证找解析

热门文章

  1. 第四课,Extjs中面板的应用
  2. 前端学习点滴留痕1: bgcolor+background
  3. Windows系统下通过JNI调用dll动态库的实现
  4. 【HCIA持续更新】NAPT
  5. html栅格系统格式,Bootstrap的栅格系统是什么?栅格系统详解
  6. 牛奶包装袋上的秘密:不知道真的还是假的
  7. 系统大玩家 无忧装机GHOST XP V7.0
  8. 快递单信息抽取【二】基于ERNIE1.0至ErnieGram + CRF预训练模型
  9. android 日记卡片,卡片日记 - 来自韩国设计师和中国程序员的跨国合作 - Android 应用 - 日记 - 【最美应用】...
  10. centos7.8安装mysql5.7步骤记录