flink-cdc 环境搭建 version 1.14.3
flink-connector-cdc 独立于flink项目,顾名思义集成的时候要注意版本,注意版本,注意版本
flink-1.14.3 cdc jar 免费下载
1.环境
- java: jdk8+
- scala: 1.11 或 1.12看你的flink和cdc依赖的scala
- flink: 1.14.3
- mysql: 8.0
- flink-cdc
1.1 flink-sql环境:
如上flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar
即为flink1.14的依赖,需要在flink_home/lib/下面添加该依赖。
这个依赖需要自己编译,官方提供的只到2.1.1(在2022-03-11 17:05还没最新的)。方法如下:
官方提供的方法:flink-cdc readme
- 直接下载提供的jar
- 自己编译
git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests
编译好后直接用啥取啥,编译过程会下亿堆插件,so慢
接下来进入flink的/bin目录启动集群
start-cluster.sh
查看
点这里看flink-web-ui
### demo flink-sql cdc mysql 数据
需要开启mysql的binlog,并且创建的表要有主键
3. 创建mysql表:
-- mysql
show databases;
use test;
create table if not exists test (id int primary key auto_increment,name varchar(32)
);
- 启动flink-sql client,创建flink流表
sql-client.sh
-- flink sql
CREATE TABLE test (id INT,name STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = '用户名','password' = '密码','database-name' = '数据库名','table-name' = '表名');
- mysql数据库插入数据
-- mysql
insert into test values(0, "pjs");
insert into test values(0, "jyl");
6.查看flink-sql输出
-- flink sql
select * from test;
1.2 flink-stream
pom
<properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><flink.version>1.14.3</flink.version><scala.version>2.11.12</scala.version><scala.binary.version>2.11</scala.binary.version><flink.cdc.version>2.2-SNAPSHOT</flink.cdc.version></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version>
<!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink.cdc.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>${flink.cdc.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency></dependencies>
demo:
gitee
public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("test") // set captured database.tableList("test.test") // set captured table.username("kuro").password("pwdsdfsa;_=sfds").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JString.build();Configuration configuration = Configuration.fromMap(Map.of("rest.port", "10010"));StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);// enable checkpointenv.enableCheckpointing(3000);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// set 4 parallel source tasks.setParallelism(4).print().setParallelism(1); // use parallelism 1 for sink to keep message orderingenv.execute("Print MySQL Snapshot + Binlog");}
输出:
{"before":null,"after":{"id":2,"name":"ljy"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"test","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1646993221523,"transaction":null}
{"before":null,"after":{"id":1,"name":"kuro"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"test","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1646993221522,"transaction":null}
{"before":null,"after":{"id":3,"name":"liyouqiang"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"test","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1646993221524,"transaction":null}
3月 11, 2022 6:07:05 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to localhost:3306 at mysql-bin.000001/3592 (sid:5536, cid:33)
{"before":{"id":1,"name":"kuro"},"after":{"id":1,"name":"LJY"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1646993370000,"snapshot":"false","db":"test","sequence":null,"table":"test","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":3813,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1646993370264,"transaction":null}
flink-cdc 第一次会全量同步数据,其后就会增量进行同步
flink-cdc 环境搭建 version 1.14.3相关推荐
- Flink+Iceberg环境搭建及生产问题处理
全网最全大数据面试提升手册! 概述 作为实时计算的新贵,Flink受到越来越多公司的青睐,它强大的流批一体的处理能力可以很好地解决流处理和批处理需要构建实时和离线两套处理平台的问题,可以通过一套Fli ...
- 痞子衡嵌入式:串口调试工具Jays-PyCOM诞生记(1)- 环境搭建(Python2.7.14 + pySerial3.4 + wxPython4.0.3)...
大家好,我是痞子衡,是正经搞技术的痞子.今天痞子衡给大家介绍的是串口调试工具Jays-PyCOM诞生之环境搭建. 在写Jays-PyCOM时需要先搭好开发和调试环境,下表列出了开发过程中会用到的所有软 ...
- Flink开发环境搭建(maven)
1.下载scala sdk http://www.scala-lang.org/download/ 直接到这里下载sdk,(https://downloads.lightbend.com/scala/ ...
- Flink教程(03)- Flink环境搭建
文章目录 01 引言 02 Local本地单机模式 2.1 工作原理 2.2 安装部署 2.3 测试验证 03 Standalone独立集群模式 3.1 工作原理 3.2 安装部署 3.3 测试验证 ...
- Flink CDC 2.0 正式发布,详解核心改进
简介:Flink CDC 2.0.0 版本于 8 月 10 日正式发布,点击了解详情- 本文由社区志愿者陈政羽整理,内容来源自阿里巴巴高级开发工程师徐榜江 (雪尽) 7 月 10 日在北京站 Flin ...
- Apache Flink 1.14.4 on yarn ha环境搭建
Flink支持多种安装模式 Local-本地单机模式,学习测试时使用 Standalone-独立集群模式,Flink自带集群,开发测试环境使用 StandaloneHA-独立集群高可用模式,Flink ...
- 小知识点:ARM 架构 Linux 大数据集群基础环境搭建(Hadoop、MySQL、Hive、Spark、Flink、ZK、Kafka、Nginx、Node)
换了 M2 芯片的 Mac,以前 x86 版本的 Linux 大数据集群基础环境搭建在 ARM 架构的虚拟机集群上有些用不了了,现在重新写一份基于 ARM 架构的,少数不兼容之外其他都差不多,相当 ...
- Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成
Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成 一.环境准备 1.1 软件版本 Flink 1.14.4Scala 2.11CDH 6. ...
- [官方Flink入门笔记 ] 三、开发环境搭建和应用的配置、部署及运行
一.Flink 开发环境部署和配置 Flink 是一个以 Java 及 Scala 作为开发语言的开源大数据项目,代码开源在 GitHub 上,并使用 Maven 来编译和构建项目.对于大部分使用 F ...
最新文章
- 基于平面几何精确且鲁棒的尺度恢复单目视觉里程计
- 移动通信网络协议 — GTP 协议
- Quartz在Spring中设置动态定时任务 .
- 七年前将UC卖给马云,套现300亿的何小鹏,现今再创新奇迹?播报文章
- 为什么大家都说 SELECT * 效率低?
- 【2016年第4期】分布式协商:建立稳固分布式 大数据系统的基石
- 图片格式转换大小调整工具_如何轻松快速地将图片转换到JPG/JPEG/PNG/BMP/TIFF
- docker 守护进程
- MySQL学习笔记—复制表
- 中后台管理信息系统通用原型方案、业务中台管理系统、业务中台架构、管理信息系统、订单管理、客户管理、货源管理、财务管理、客服管理、营销管理、办公申请、协作管理、CMS、OA、CRM、ERP、Axure
- org.hibernate.hql.ast.QuerySyntaxException: myaddressbook is not mapped
- Web Worker——js的多线程,实现统计博客园总阅读量
- sqldependency 实现原理_2020阿图什方形摇摆筛安装原理
- ts16949 软件开发流程图_新产品开发流程图:包括APQP,DFMEA,PFMEA,PPAP(FAI),SPC,MSA(GRR)等...
- 13、TORCH.OPTIM
- 你的程序要读入一系列正整数数据,输入-1表示输入结束,-1本身不是输入的数据。程序输出读到的数据中的奇数和偶数的个数。
- excel表格打印每页都有表头_Excel小技巧3:打印每页添加表头
- 邮箱发信量到达上限,发不出邮件,你知道该怎么办吗?
- mysql是网状_三种数据模型---层次模型、网状模型以及关系模型
- 【PyTorch深度强化学习】带基线的蒙特卡洛策略梯度法(REINFOECE)在短走廊和CartPole环境下的实战(超详细 附源码)