文章目录

  • 第 1 章 CDC 简介
    • 1.1 什么是 CDC
    • 1.2 CDC 的种类
    • 1.3 Flink-CDC
  • 第 2 章 FlinkCDC 案例实操
    • 2.1 DataStream 方式的应用
      • 2.1.1 导入依赖
      • 2.1.2 编写代码
      • 2.1.3 案例测试
    • 2.2 FlinkSQL 方式的应用
      • 2.2.1 添加依赖
      • 2.2.2 代码实现
    • 2.3 自定义反序列化器
      • 2.3.1 代码实现

本博文出自尚硅谷b站公开课

第 1 章 CDC 简介

1.1 什么是 CDC

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

1.2 CDC 的种类

CDC 主要分为基于查询基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:

1.3 Flink-CDC

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据增量变更数据的 source 组件。目前也已开源,开源地址:
https://github.com/ververica/flink-cdc-connectors

第 2 章 FlinkCDC 案例实操

2.1 DataStream 方式的应用

2.1.1 导入依赖

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency># cdc 核心依赖<dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.2.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

2.1.2 编写代码

package com.herobin.flink.debug_local.input.cdc;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;public class FlinkCDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序//2.1 开启 Checkpoint,每隔 5 秒钟做一次 CKenv.enableCheckpointing(5000L);//2.2 指定 CK 的一致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));//2.6 设置访问 HDFS 的用户名System.setProperty("HADOOP_USER_NAME", "atguigu");//3.创建 Flink-MySQL-CDC 的 Source//initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.//latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.//timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.//specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder().hostname("hadoop102").port(3306).username("root").password("000000").databaseList("gmall-flink").tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式.startupOptions(StartupOptions.initial()).deserializer(new StringDebeziumDeserializationSchema()).build();//4.使用 CDC Source 从 MySQL 读取数据DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);//5.打印数据mysqlDS.print();//6.执行任务env.execute();}
}

2.1.3 案例测试

1)打包并上传至 Linux

2)开启 MySQL Binlog 并重启 MySQL

3)启动 Flink 集群

[atguigu@hadoop102 flink-standalone]$ bin/start-cluster.sh

4)启动 HDFS 集群

[atguigu@hadoop102 flink-standalone]$ start-dfs.sh

5)启动程序

[atguigu@hadoop102 flink-standalone]$ bin/flink run -c com.atguigu.FlinkCDC flink-1.0-
SNAPSHOT-jar-with-dependencies.jar

6)在 MySQL 的 gmall-flink.z_user_info 表中添加、修改或者删除数据

7)给当前的 Flink 程序创建 Savepoint

[atguigu@hadoop102 flink-standalone]$ bin/flink savepoint JobId
hdfs://hadoop102:8020/flink/save

8)关闭程序以后从 Savepoint 重启程序

[atguigu@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flink/save/... -c
com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar

2.2 FlinkSQL 方式的应用

2.2.1 添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version>
</dependency>

2.2.2 代码实现

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkSQL_CDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.创建 Flink-MySQL-CDC 的 SourcetableEnv.executeSql("CREATE TABLE user_info (" +" id INT," +" name STRING," +" phone_num STRING" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'hostname' = 'hadoop102'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '000000'," +" 'database-name' = 'gmall-flink'," +" 'table-name' = 'z_user_info'" +")");tableEnv.executeSql("select * from user_info").print();env.execute();}}

2.3 自定义反序列化器

2.3.1 代码实现

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Properties;/*** 自定义反序列化器*/
public class Flink_CDCWithCustomerSchema {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.创建 Flink-MySQL-CDC 的 SourceProperties properties = new Properties();//initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.//latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.//timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.//specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder().hostname("hadoop102").port(3306).username("root").password("000000").databaseList("gmall-flink").tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式.startupOptions(StartupOptions.initial()).deserializer(new DebeziumDeserializationSchema<String>() {//自定义数据解析器@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String>collector) throws Exception {//获取主题信息,包含着数据库和表名 mysql_binlog_source.gmall-flink.z_user_infoString topic = sourceRecord.topic();String[] arr = topic.split("\\.");String db = arr[1];String tableName = arr[2];
//获取操作类型 READ DELETE UPDATE CREATEEnvelope.Operation operation =Envelope.operationFor(sourceRecord);
//获取值信息并转换为 Struct 类型Struct value = (Struct) sourceRecord.value();//获取变化后的数据Struct after = value.getStruct("after");
//创建 JSON 对象用于存储数据信息JSONObject data = new JSONObject();for (Field field : after.schema().fields()) {Object o = after.get(field);data.put(field.name(), o);}
//创建 JSON 对象用于封装最终返回值数据信息JSONObject result = new JSONObject();result.put("operation", operation.toString().toLowerCase());result.put("data", data);result.put("database", db);result.put("table", tableName);
//发送数据至下游collector.collect(result.toJSONString());}@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}}).build();//3.使用 CDC Source 从 MySQL 读取数据DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);//4.打印数据mysqlDS.print();//5.执行任务env.execute();}
}

尚硅谷大数据技术之 Flink-CDC(转)相关推荐

  1. 尚硅谷大数据技术之电商用户行为数据分析

    尚硅谷大数据技术之电商用户行为分析 第1章 项目整体介绍 1.1 电商的用户行为 电商平台中的用户行为频繁且较复杂,系统上线运行一段时间后,可以收集到大量的用户行为数据,进而利用大数据技术进行深入挖掘 ...

  2. 尚硅谷大数据技术Scala教程-笔记04【集合】

    视频地址:尚硅谷大数据技术之Scala入门到精通教程(小白快速上手scala)_哔哩哔哩_bilibili 尚硅谷大数据技术Scala教程-笔记01[Scala课程简介.Scala入门.变量和数据类型 ...

  3. 尚硅谷大数据技术Zookeeper教程-笔记01【Zookeeper(入门、本地安装、集群操作)】

    视频地址:[尚硅谷]大数据技术之Zookeeper 3.5.7版本教程_哔哩哔哩_bilibili 尚硅谷大数据技术Zookeeper教程-笔记01[Zookeeper(入门.本地安装.集群操作)] ...

  4. 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy)】

    视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[Spark(概述.快速上手.运行环境.运行架构)] 尚硅谷大数据技术Spark教 ...

  5. 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】

    视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[Spark(概述.快速上手.运行环境.运行架构)] 尚硅谷大数据技术Spark教 ...

  6. 尚硅谷大数据技术Spark教程-笔记09【SparkStreaming(概念、入门、DStream入门、案例实操、总结)】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表[课程资料下载] 视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[SparkCore ...

  7. 尚硅谷大数据技术Hadoop教程-笔记02【Hadoop-入门】

    视频地址:尚硅谷大数据Hadoop教程(Hadoop 3.x安装搭建到集群调优) 尚硅谷大数据技术Hadoop教程-笔记01[大数据概论] 尚硅谷大数据技术Hadoop教程-笔记02[Hadoop-入 ...

  8. 尚硅谷大数据技术Hadoop教程-笔记01【大数据概论】

    视频地址:尚硅谷大数据Hadoop教程(Hadoop 3.x安装搭建到集群调优) 尚硅谷大数据技术Hadoop教程-笔记01[大数据概论] 尚硅谷大数据技术Hadoop教程-笔记02[Hadoop-入 ...

  9. 尚硅谷大数据技术Hadoop教程-笔记03【Hadoop-HDFS】

    视频地址:尚硅谷大数据Hadoop教程(Hadoop 3.x安装搭建到集群调优) 尚硅谷大数据技术Hadoop教程-笔记01[大数据概论] 尚硅谷大数据技术Hadoop教程-笔记02[Hadoop-入 ...

  10. 尚硅谷大数据技术之Kettle

    搜遍全网,好不容易只找到v1.1版本的Kettle参考资料,之前都是自己跟着课程手写,CSDN太多无耻之徒拿去收费,今天免费分享给大家,欢迎学习大数据的同学和我交流,链接不能用dd我就行.后期我跟着课 ...

最新文章

  1. 批量下载文献中的参考文献
  2. 【POJ】3268 Silver Cow Party (将有向图的边反转)
  3. STC8G1K单片机软件执行时间物理测量
  4. spring入门详细教程(五)
  5. 使用Hibernate生成数据库和连接数据库
  6. (多线程)leetcode1195. 交替打印字符串 最简单解法一个变量搞定
  7. ExtTabMenu 控件
  8. VirtualBox桥接网络的简单配置,让虚拟机直接访问网络
  9. 【Okio】Okio 简单入门
  10. Queries with streaming sources must be executed with writeStream.start()
  11. 一个人想生存发展具备3大关键
  12. 《Python游戏趣味编程》 第7章 飞机大战
  13. Python降低XGBoost 过度拟合多种方法
  14. Deep Glow mac(AE高级辉光特效插件)支持AE2022
  15. 组播负载分担、静态组播路由和MBGP技术原理
  16. Linux安装及破解密码
  17. Ubuntu下出现授权问题
  18. 正版maya安装时,更改注册登录方式方法
  19. IPv4无网络访问权限解决办法
  20. 《微积分:一元函数微分学》——导数公式

热门文章

  1. Chrome最新离线安装包下载
  2. java反编译 编译_5个最佳Java反编译器
  3. Cplex安装教程与使用介绍
  4. (进来补知识啦!)利用双四选一数据选择器74153实现十六选一数据选择器(包含74153简单解释)
  5. HTML实现页面注册
  6. java中的递归算法_java递归算法详解
  7. Cobalt Strike Profile 学习记录
  8. 新经济 DTC 转型,一个简单而强大的数据平台至关重要
  9. ansys18.0安装教程
  10. “飞客蠕虫”形成全球最大僵尸网络 每日感染数万网民