案例简单说明

使用flink cdc,完成mysql 多库 多表同时同步到doris中

版本信息

flink 1.14.4

doris 1.1.0

依赖

 <properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><scala.version>2.12</scala.version><java.version>1.8</java.version><flink.version>1.14.4</flink.version><fastjson.version>1.2.62</fastjson.version><hadoop.version>2.8.3</hadoop.version><scope.mode>compile</scope.mode><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- Add log dependencies when debugging locally --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><!-- flink-doris-connector --><dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.14_2.12</artifactId><version>1.1.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.12</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version><exclusions><exclusion><artifactId>flink-shaded-guava</artifactId><groupId>org.apache.flink</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.version}</artifactId><version>${flink.version}</version></dependency></dependencies><build><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.1</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions><configuration><args><arg>-feature</arg></args></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>

flink-connector-mysql-cdc 2.2.1版本 一直会报异常

java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder

从官网下载依赖,然后本地添加进去flink-sql-connector-mysql-cdc-2.2.0

准备mysql数据

CREATE DATABASE emp_1;USE emp_1;
CREATE TABLE employees_1 (emp_no      INT             NOT NULL,birth_date  DATE            NOT NULL,first_name  VARCHAR(14)     NOT NULL,last_name   VARCHAR(16)     NOT NULL,gender      ENUM ('M','F')  NOT NULL,    hire_date   DATE            NOT NULL,PRIMARY KEY (emp_no)
);INSERT INTO `employees_1` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
(10003,'1959-12-03','Parto','Bamford','M','1986-08-28'),
(10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'),
(10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12'),
(10006,'1953-04-20','Anneke','Preusig','F','1989-06-02'),
(10007,'1957-05-23','Tzvetan','Zielinski','F','1989-02-10'),
(10008,'1958-02-19','Saniya','Kalloufi','M','1994-09-15'),
(10009,'1952-04-19','Sumant','Peac','F','1985-02-18'),
(10010,'1963-06-01','Duangkaew','Piveteau','F','1989-08-24'),
(10011,'1953-11-07','Mary','Sluis','F','1990-01-22'),
(10012,'1960-10-04','Patricio','Bridgland','M','1992-12-18'),
(10013,'1963-06-07','Eberhardt','Terkki','M','1985-10-20'),
(10014,'1956-02-12','Berni','Genin','M','1987-03-11'),
(10015,'1959-08-19','Guoxiang','Nooteboom','M','1987-07-02'),
(10016,'1961-05-02','Kazuhito','Cappelletti','M','1995-01-27'),
(10017,'1958-07-06','Cristinel','Bouloucos','F','1993-08-03'),
(10018,'1954-06-19','Kazuhide','Peha','F','1987-04-03'),
(10019,'1953-01-23','Lillian','Haddadi','M','1999-04-30'),
(10020,'1952-12-24','Mayuko','Warwick','M','1991-01-26'),
(10021,'1960-02-20','Ramzi','Erde','M','1988-02-10'),
(10022,'1952-07-08','Shahaf','Famili','M','1995-08-22'),
(10023,'1953-09-29','Bojan','Montemayor','F','1989-12-17'),
(10024,'1958-09-05','Suzette','Pettey','F','1997-05-19'),
(10025,'1958-10-31','Prasadram','Heyers','M','1987-08-17'),
(10026,'1953-04-03','Yongqiao','Berztiss','M','1995-03-20'),
(10027,'1962-07-10','Divier','Reistad','F','1989-07-07'),
(10028,'1963-11-26','Domenick','Tempesti','M','1991-10-22'),
(10029,'1956-12-13','Otmar','Herbst','M','1985-11-20'),
(10030,'1958-07-14','Elvis','Demeyer','M','1994-02-17'),
(10031,'1959-01-27','Karsten','Joslin','M','1991-09-01'),
(10032,'1960-08-09','Jeong','Reistad','F','1990-06-20'),
(10033,'1956-11-14','Arif','Merlo','M','1987-03-18'),
(10034,'1962-12-29','Bader','Swan','M','1988-09-21'),
(10035,'1953-02-08','Alain','Chappelet','M','1988-09-05'),
(10036,'1959-08-10','Adamantios','Portugali','M','1992-01-03');CREATE TABLE employees_2 (emp_no      INT             NOT NULL,birth_date  DATE            NOT NULL,first_name  VARCHAR(14)     NOT NULL,last_name   VARCHAR(16)     NOT NULL,gender      ENUM ('M','F')  NOT NULL,    hire_date   DATE            NOT NULL,PRIMARY KEY (emp_no)
);INSERT INTO `employees_2` VALUES (10037,'1963-07-22','Pradeep','Makrucki','M','1990-12-05'),
(10038,'1960-07-20','Huan','Lortz','M','1989-09-20'),
(10039,'1959-10-01','Alejandro','Brender','M','1988-01-19'),
(10040,'1959-09-13','Weiyi','Meriste','F','1993-02-14'),
(10041,'1959-08-27','Uri','Lenart','F','1989-11-12'),
(10042,'1956-02-26','Magy','Stamatiou','F','1993-03-21'),
(10043,'1960-09-19','Yishay','Tzvieli','M','1990-10-20'),
(10044,'1961-09-21','Mingsen','Casley','F','1994-05-21'),
(10045,'1957-08-14','Moss','Shanbhogue','M','1989-09-02'),
(10046,'1960-07-23','Lucien','Rosenbaum','M','1992-06-20'),
(10047,'1952-06-29','Zvonko','Nyanchama','M','1989-03-31'),
(10048,'1963-07-11','Florian','Syrotiuk','M','1985-02-24'),
(10049,'1961-04-24','Basil','Tramer','F','1992-05-04'),
(10050,'1958-05-21','Yinghua','Dredge','M','1990-12-25'),
(10051,'1953-07-28','Hidefumi','Caine','M','1992-10-15'),
(10052,'1961-02-26','Heping','Nitsch','M','1988-05-21'),
(10053,'1954-09-13','Sanjiv','Zschoche','F','1986-02-04'),
(10054,'1957-04-04','Mayumi','Schueller','M','1995-03-13');CREATE DATABASE emp_2;USE emp_2;CREATE TABLE employees_1 (emp_no      INT             NOT NULL,birth_date  DATE            NOT NULL,first_name  VARCHAR(14)     NOT NULL,last_name   VARCHAR(16)     NOT NULL,gender      ENUM ('M','F')  NOT NULL,    hire_date   DATE            NOT NULL,PRIMARY KEY (emp_no)
);INSERT INTO `employees_1` VALUES  (10055,'1956-06-06','Georgy','Dredge','M','1992-04-27'),
(10056,'1961-09-01','Brendon','Bernini','F','1990-02-01'),
(10057,'1954-05-30','Ebbe','Callaway','F','1992-01-15'),
(10058,'1954-10-01','Berhard','McFarlin','M','1987-04-13'),
(10059,'1953-09-19','Alejandro','McAlpine','F','1991-06-26'),
(10060,'1961-10-15','Breannda','Billingsley','M','1987-11-02'),
(10061,'1962-10-19','Tse','Herber','M','1985-09-17'),
(10062,'1961-11-02','Anoosh','Peyn','M','1991-08-30'),
(10063,'1952-08-06','Gino','Leonhardt','F','1989-04-08'),
(10064,'1959-04-07','Udi','Jansch','M','1985-11-20'),
(10065,'1963-04-14','Satosi','Awdeh','M','1988-05-18'),
(10066,'1952-11-13','Kwee','Schusler','M','1986-02-26'),
(10067,'1953-01-07','Claudi','Stavenow','M','1987-03-04'),
(10068,'1962-11-26','Charlene','Brattka','M','1987-08-07'),
(10069,'1960-09-06','Margareta','Bierman','F','1989-11-05'),
(10070,'1955-08-20','Reuven','Garigliano','M','1985-10-14'),
(10071,'1958-01-21','Hisao','Lipner','M','1987-10-01'),
(10072,'1952-05-15','Hironoby','Sidou','F','1988-07-21'),
(10073,'1954-02-23','Shir','McClurg','M','1991-12-01'),
(10074,'1955-08-28','Mokhtar','Bernatsky','F','1990-08-13'),
(10075,'1960-03-09','Gao','Dolinsky','F','1987-03-19'),
(10076,'1952-06-13','Erez','Ritzmann','F','1985-07-09'),
(10077,'1964-04-18','Mona','Azuma','M','1990-03-02'),
(10078,'1959-12-25','Danel','Mondadori','F','1987-05-26'),
(10079,'1961-10-05','Kshitij','Gils','F','1986-03-27'),
(10080,'1957-12-03','Premal','Baek','M','1985-11-19'),
(10081,'1960-12-17','Zhongwei','Rosen','M','1986-10-30'),
(10082,'1963-09-09','Parviz','Lortz','M','1990-01-03'),
(10083,'1959-07-23','Vishv','Zockler','M','1987-03-31'),
(10084,'1960-05-25','Tuval','Kalloufi','M','1995-12-15');CREATE TABLE employees_2(emp_no      INT             NOT NULL,birth_date  DATE            NOT NULL,first_name  VARCHAR(14)     NOT NULL,last_name   VARCHAR(16)     NOT NULL,gender      ENUM ('M','F')  NOT NULL,    hire_date   DATE            NOT NULL,PRIMARY KEY (emp_no)
);INSERT INTO `employees_2` VALUES (10085,'1962-11-07','Kenroku','Malabarba','M','1994-04-09'),
(10086,'1962-11-19','Somnath','Foote','M','1990-02-16'),
(10087,'1959-07-23','Xinglin','Eugenio','F','1986-09-08'),
(10088,'1954-02-25','Jungsoon','Syrzycki','F','1988-09-02'),
(10089,'1963-03-21','Sudharsan','Flasterstein','F','1986-08-12'),
(10090,'1961-05-30','Kendra','Hofting','M','1986-03-14'),
(10091,'1955-10-04','Amabile','Gomatam','M','1992-11-18'),
(10092,'1964-10-18','Valdiodio','Niizuma','F','1989-09-22'),
(10093,'1964-06-11','Sailaja','Desikan','M','1996-11-05'),
(10094,'1957-05-25','Arumugam','Ossenbruggen','F','1987-04-18'),
(10095,'1965-01-03','Hilari','Morton','M','1986-07-15'),
(10096,'1954-09-16','Jayson','Mandell','M','1990-01-14'),
(10097,'1952-02-27','Remzi','Waschkowski','M','1990-09-15'),
(10098,'1961-09-23','Sreekrishna','Servieres','F','1985-05-13'),
(10099,'1956-05-25','Valter','Sullins','F','1988-10-18'),
(10100,'1953-04-21','Hironobu','Haraldson','F','1987-09-21'),
(10101,'1952-04-15','Perla','Heyers','F','1992-12-28'),
(10102,'1959-11-04','Paraskevi','Luby','F','1994-01-26'),
(10103,'1953-11-26','Akemi','Birch','M','1986-12-02'),
(10104,'1961-11-19','Xinyu','Warwick','M','1987-04-16'),
(10105,'1962-02-05','Hironoby','Piveteau','M','1999-03-23'),
(10106,'1952-08-29','Eben','Aingworth','M','1990-12-19'),
(10107,'1956-06-13','Dung','Baca','F','1994-03-22'),
(10108,'1952-04-07','Lunjin','Giveon','M','1986-10-02'),
(10109,'1958-11-25','Mariusz','Prampolini','F','1993-06-16'),
(10110,'1957-03-07','Xuejia','Ullian','F','1986-08-22'),
(10111,'1963-08-29','Hugo','Rosis','F','1988-06-19'),
(10112,'1963-08-13','Yuichiro','Swick','F','1985-10-08'),
(10113,'1963-11-13','Jaewon','Syrzycki','M','1989-12-24'),
(10114,'1957-02-16','Munir','Demeyer','F','1992-07-17'),
(10115,'1964-12-25','Chikara','Rissland','M','1986-01-23'),
(10116,'1955-08-26','Dayanand','Czap','F','1985-05-28');

准备doris表

create database test_db;
use test_db;
CREATE TABLE all_employees_info (emp_no       int NOT NULL,birth_date   date,first_name   varchar(20),last_name    varchar(20),gender       char(2),hire_date    date,database_name varchar(50),table_name    varchar(200)
)
UNIQUE KEY(`emp_no`, `birth_date`)
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

由于 UNIQUE KEY(emp_no, birth_date),因此update mysql这两个字段的时候,doris 会多一条数据

代码案例

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000);env.setParallelism(1);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// register a table in the catalogtEnv.executeSql("CREATE TABLE cdc_test_source (\n" +"  emp_no INT,\n" +"  birth_date DATE,\n" +"  first_name STRING,\n" +"  last_name STRING,\n" +"  gender STRING,\n" +"  hire_date  STRING,\n" +"  database_name STRING METADATA VIRTUAL,\n" +"  table_name STRING METADATA VIRTUAL,\n" +"  PRIMARY KEY (`emp_no`) NOT ENFORCED  \n" +") WITH (\n" +"  'connector' = 'mysql-cdc',\n" +"  'hostname' = '192.168.22.xxx',\n" +"  'port' = '3306',\n" +"  'username' = 'xxx',\n" +"  'password' = 'xxx',\n" +"  'database-name' = 'emp_[0-9]+',\n" +"  'table-name' = 'employees_[0-9]+'\n" +")");String label = UUID.randomUUID();//doris tabletEnv.executeSql("CREATE TABLE doris_test_sink (" +"  emp_no INT,\n" +"  birth_date STRING,\n" +"  first_name STRING,\n" +"  last_name STRING,\n" +"  gender STRING,\n" +"  hire_date  STRING\n" +") " +"WITH (\n" +"  'connector' = 'doris',\n" +"  'fenodes' = '172.8.10.xxx:8030',\n" +"  'table.identifier' = 'test_db.all_employees_info',\n" +"  'username' = 'xxx',\n" +"  'password' = 'xxx',\n" +/* doris stream load label, In the exactly-once scenario,the label is globally unique and must be restarted from the latest checkpoint when restarting.Exactly-once semantics can be turned off via sink.enable-2pc. */"  'sink.label-prefix' ='" + label + "',\n" +"  'sink.properties.format' = 'json',\n" +       //json data format"  'sink.properties.read_json_by_line' = 'true'\n" +")");//insert into mysql table to doris tabletEnv.executeSql("INSERT INTO doris_test_sink select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date   from cdc_test_source ");

执行之后可以再插入、更新一些数据,进行验证

补充

多库多表在于

" ‘database-name’ = ‘emp_[0-9]+’,\n" +
" ‘table-name’ = ‘employees_[0-9]+’\n" +

使用正则,可以自动匹配上

库、表 的名字在于
" database_name STRING METADATA VIRTUAL,\n" +
" table_name STRING METADATA VIRTUAL,\n" +
如果没写,是读取不到mysql数据中库、表的名字的

Doris 和 Flink 列类型映射关系 可以查看官网信息

Flink Doris Connector - Apache Doris

flink cdc MySQL2Doris 案例分享 解决分库多表同步相关推荐

  1. 【另类见解】一致性哈希就能解决分库分表问题了?

    " 开局一问:分库分表行为中,一致性哈希到底用处大不大? 装B脑图 现在是大数据的时代,其中一个体现就是数据量非常庞大.当然大数据的概念绝非是数据量就可以定义的,我自己给大数据下的定义是:无 ...

  2. 玩转MySQL:一站式解决分库分表后患问题方案

    引言 上篇有关分分库分表一文中已经将分库分表的方法论全面阐述清楚了,总体看下来用一个字形容,那就是爽!(手动狗头)尤其是分库分表技术能够让数据存储层真正成为三高架构,但前面爽是爽了,接着一起来看看分库 ...

  3. 油气大数据平台建设案例分享,让油田数据同步效率提升20%的解决方案

    你知道吗?石油探测生产,其实也是一个需要经过大量数据的分析计算才能实现的工作.早在60多年前,大庆油田的建设者们,就需要经过多达160万次的分析化验和超千万次的地层对比,才能完成地下石油分布的探查. ...

  4. 技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入

    本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分表实时 ...

  5. Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入

    导读:本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分 ...

  6. Flink CDC 系列 - Flink CDC 如何简化实时数据入湖入仓

    摘要:本文整理自伍翀 (云邪).徐榜江 (雪尽) 在 Flink Forward Asia 2021 的分享,该分享以 5 个章节详细介绍如何使用 Flink CDC 来简化实时数据的入湖入仓, 文章 ...

  7. StarRocks X Flink CDC,打造端到端实时链路

    实时数仓建设背景 实时数仓需求 随着互联网行业的飞速发展,企业业务种类变得越来越多,数据量也变得越来越大.以 Apache Hadoop 生态为核心的数据看板业务一般只能实现离线的业务.在部分领域,数 ...

  8. Flink CDC + Hudi 海量数据入湖在顺丰的实践

    摘要:本文整理自顺丰大数据研发工程师覃立辉在 5月 21 日 Flink CDC Meetup 的演讲.主要内容包括: 顺丰数据集成背景 Flink CDC 实践问题与优化 未来规划 Tips:点击「 ...

  9. XTransfer技术专家亮相Flink CDC Meetup

    背景信息:Flink CDC 是实时数据集成框架的开源代表,具有全增量一体化.无锁读取.并发读取.分布式架构等技术优势,在开源社区中非常受欢迎. 为促进 Flink CDC 技术的交流和发展,社区于 ...

最新文章

  1. ORACLE 12C安装
  2. c#对象集合去重_C# List集合去重操作注意点
  3. 机器视觉 · 工业相机
  4. mysql数据库集群技术_使用MySQL-Cluster搭建MySQL数据库集群
  5. create-react-app部署到GitHub Pages时报错:Failed to get remote。origin.url
  6. ProxyChains
  7. .bash_profile和.bashrc的什么差别
  8. 创业邦30岁以下青年企业家峰会暨颁奖典礼在京举行
  9. Linux中关于链表的遍历
  10. java简单递归算法,Java递归算法简单示例两则
  11. Unity世界坐标转换屏幕坐标(详解)
  12. Linux screen capture
  13. 计算机毕业设计之java+ssm基于web的考研助手网站
  14. 黑群晖的驱动加载与卸载(以USB网卡驱动为例)
  15. RegExp-2 【正则量词、属性、方法、使用技巧】
  16. 鸿蒙系统红米可以升级吗,小米、红米手机能刷鸿蒙系统吗?小米红米刷鸿蒙系统教程...
  17. 【Github资源大汇总】 - 王朋
  18. Maven安装配置详细教程
  19. JAVA的向上转型与向下转型(二)
  20. 2018最新廖雪峰全套Java完整版

热门文章

  1. 网络编程之Netty(网络通信框架)的基础篇
  2. springboot+vue洗衣店管理系统
  3. 【C++】单调队列 详解
  4. React 使用高德地图实现标记地图点位
  5. 梯度下降法的三种解释(BGD,SGD,MBGD).
  6. VC++仿QQ自动伸缩窗口
  7. LWIP应用开发|广播与组播
  8. 方舟手游服务器12点维护啥时候,明日方舟10月30日14:00服务器停机维护通知_明日方舟10月30日更新了什么_玩游戏网...
  9. C#(仿QQ)聊天窗口
  10. java连接池的maxIdle该如何配置