Flink CDC 系列文章:
《Flink CDC 系列(1)—— 什么是 Flink CDC》
《Flink CDC 系列(2)—— Flink CDC 源码编译》
《Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo》
《Flink CDC 系列(4)—— Flink CDC MySQL Connector 常用参数表》
《Flink CDC 系列(5)—— Flink CDC MySQL Connector 启动模式》
《Flink CDC 系列(6)—— Flink CDC MySQL Connector 工作机制之 Incremental Snapshot Reading》
《Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch》

文章目录

  • 简介
  • 系统环境和软件版本
  • MySQL 测试数据准备
  • ElasticSearch 安装
    • 1. 安装包选择和下载
    • 2. 解压
    • 3. 启动 ElasticeSearch
    • 4. 验证是否启动成功
  • Flink 集群准备
  • 演示开始
  • 总结

简介

本文介绍了通过 Flink CDC + Flink SQL 同步 MySQL 数据到 ElasticSearch 的案例。案例包含了 Insert/Update/Delete 的操作。

系统环境和软件版本

Ubuntu 20.04
JDK 1.8
Maven 3.6.3
Flink 1.13.6
ElasticSearch 7.16.2

MySQL 测试数据准备


mysql> CREATE DATABASE mydb;mysql> USE mydb;mysql> CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));mysql> INSERT INTO products VALUES (default,"scooter1","Small 1-wheel scooter");
Query OK, 1 row affected (0.01 sec)

ElasticSearch 安装

1. 安装包选择和下载

官网下载地址:
https://www.elastic.co/cn/downloads/past-releases/elasticsearch-7-16-2

根据自己的操作系统(和芯片)选择一个合适的安装包。苹果M1芯片或者在苹果M1芯片安装的虚拟机都是选择后缀ARRCH64的安装包。
笔者当前的系统环境是基于苹果M1芯片安装Ubuntu 20.04操作系统,因此选择了 LINUX ARRCH64 的安装包。

下载命令

axel -n 20 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.16.2-linux-aarch64.tar.gz

2. 解压

tar xvf elasticsearch-7.16.2-linux-aarch64.tar.gz

3. 启动 ElasticeSearch

cd elasticsearch-7.16.2
bin/elasticsearch

4. 验证是否启动成功

curl http://localhost:9200

如下图所示,说明启动成功了

Flink 集群准备

1. 下载 flink 1.13.6 的二进制安装包

axel -n 20 https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz

2. 解压

tar xvf flink-1.13.6-bin-scala_2.11.tgz

3. 将flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar 拷贝到 flink lib 目录下,该文件由 Flink CDC 源码编译得到

cp /opt/flink-cdc-connectors/flink-sql-connector-mysql-cdc/target/flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar /opt/flink-1.13.6/lib

如何通过 Flink CDC 源码编译得到 flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar,请参考:
《Flink CDC 系列(2)—— Flink CDC 源码编译》

4. 修改 /opt/flink-1.13.6/conf/workers

vi /opt/flink-1.13.6/conf/workers

workers文件内容:

localhost
localhost

意思是要在本机启动两个work进程

5. 修改 /opt/flink-1.13.6/conf/flink-conf.yaml

vi  /opt/flink-1.13.6/conf/flink-conf.yaml

设置参数: taskmanager.numberOfTaskSlots: 2

6. 下载 flink hadoop uber jar 文件
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar, 文件拷贝到 /opt/flink-1.13.6/lib 目录下

7. 下载 flink elasticsearch connector jar 文件
flink-sql-connector-elasticsearch7_2.11-1.13.6.jar
,文件拷贝到 /opt/flink-1.13.6/lib 目录下

8. 启动单机集群

cd /opt/flink-1.13.6
bin/start-cluster.sh

9. 查看 jobmanager 和 taskmanager 的进程是否存活

$ jps -m
9824 Jps -m
9143 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
8875 StandaloneSessionClusterEntrypoint --configDir /opt/flink-1.13.6/conf --executionMode cluster -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=469762048b -D jobmanager.memory.jvm-overhead.max=201326592b
9403 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
9727 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=201326592b
ubuntu@ubuntu:/opt/flink-1.13.6$

演示开始

1. 启动 Flink SQL Client

cd /opt/flink-1.13.6
bin/sql-client.sh

2. 在 Flink SQL Client 中执行 DDL 和 查询

-- 创建 mysql-cdc source
Flink SQL> CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.64.6','port' = '3306','username' = 'test','password' = 'test','database-name' = 'mydb','table-name' = 'products');
[INFO] Execute statement succeed.
Flink SQL> select * from products;
id                 name                 description
1                 scooter1          Small 1-wheel scooterFlink SQL> CREATE TABLE products_es_sink (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://localhost:9200','index' = 'products');
[INFO] Execute statement succeed.Flink SQL> insert into products_es_sink select * from products;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b962baa7f6a8890cc45e43a7c95765d2

3. 查看 Elasticearch Index 的数据

curl http://localhost:9200/products/_search?pretty
{"took" : 3,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 1,"relation" : "eq"},"max_score" : 1.0,"hits" : [{"_index" : "products","_type" : "_doc","_id" : "1","_score" : 1.0,"_source" : {"id" : 1,"name" : "scooter1","description" : "Small 1-wheel scooter"}}]}
}

4. 在Mysql客户端插入新的数据

mysql> INSERT INTO products VALUES (default,"scooter2","Small 2-wheel scooter");

5. 查看 Elasticearch Index 的数据
在命令行执行:

curl http://localhost:9200/products/_search?pretty
{"took" : 433,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 2,"relation" : "eq"},"max_score" : 1.0,"hits" : [{"_index" : "products","_type" : "_doc","_id" : "1","_score" : 1.0,"_source" : {"id" : 1,"name" : "scooter1","description" : "Small 1-wheel scooter"}},{"_index" : "products","_type" : "_doc","_id" : "2","_score" : 1.0,"_source" : {"id" : 2,"name" : "scooter2","description" : "Small 2-wheel scooter"}}]}
}
-- 新数据写到了elasticsearch

6. 在Mysql客户端更新的数据

mysql> update products set name = 'scooter----1' where id = 1;

7. 查看 Elasticearch Index 的数据
在命令行执行:

curl http://localhost:9200/products/_search?pretty
{"took" : 154,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 2,"relation" : "eq"},"max_score" : 1.0,"hits" : [{"_index" : "products","_type" : "_doc","_id" : "2","_score" : 1.0,"_source" : {"id" : 2,"name" : "scooter2","description" : "Small 2-wheel scooter"}},{"_index" : "products","_type" : "_doc","_id" : "1","_score" : 1.0,"_source" : {"id" : 1,"name" : "scooter----1","description" : "Small 1-wheel scooter"}}]}
}
-- id=1的数据被更新到了elasticsearch

7. 在Mysql客户端删除的数据

mysql> delete from products where id  = 1;

8. 查看 Elasticearch Index 的数据

在命令行执行:

curl http://localhost:9200/products/_search?pretty
{"took" : 347,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 1,"relation" : "eq"},"max_score" : 1.0,"hits" : [{"_index" : "products","_type" : "_doc","_id" : "2","_score" : 1.0,"_source" : {"id" : 2,"name" : "scooter2","description" : "Small 2-wheel scooter"}}]}
}-- id=1的数据被删除

总结

通过 Flink CDC 可以捕获到 MySQL 的 insert/update/delete 操作日志,并通过 Flink SQL 可对 ElasticSearch 的索引数据进行 insert/update/delete。

Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch相关推荐

  1. Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo

    Flink CDC 系列文章: <Flink CDC 系列(1)-- 什么是 Flink CDC> <Flink CDC 系列(2)-- Flink CDC 源码编译> < ...

  2. Flink CDC 系列(1)—— 什么是 Flink CDC

    Flink CDC 系列文章: <Flink CDC 系列(1)-- 什么是 Flink CDC> <Flink CDC 系列(2)-- Flink CDC 源码编译> < ...

  3. Flink CDC 系列 | 构建 MySQL 和 Postgres 上的 Streaming ETL

    摘要:本篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL. Flink-CDC 项目地址: https://github.com/ververica ...

  4. Flink CDC 系列 - Flink CDC 如何简化实时数据入湖入仓

    摘要:本文整理自伍翀 (云邪).徐榜江 (雪尽) 在 Flink Forward Asia 2021 的分享,该分享以 5 个章节详细介绍如何使用 Flink CDC 来简化实时数据的入湖入仓, 文章 ...

  5. flink cdc 2.2.1 mysql connector

    报错 java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent ...

  6. Flink CDC 2.0 正式发布,详解核心改进

    简介:Flink CDC 2.0.0 版本于 8 月 10 日正式发布,点击了解详情- 本文由社区志愿者陈政羽整理,内容来源自阿里巴巴高级开发工程师徐榜江 (雪尽) 7 月 10 日在北京站 Flin ...

  7. Flink cdc +doris生产遇到的问题汇总-持续更新

    问题: 我有个表主键是字符串类型 然后cdc去读取的时候 自己split了很久 checkpoint一直显示执行中,我看日志打印是info : checkpoint一直卡在那里 程序一直等待中: 原因 ...

  8. Flink读写系列之-读HBase并写入HBase

    这里读HBase提供两种方式,一种是继承RichSourceFunction,重写父类方法,一种是实现OutputFormat接口,具体代码如下: 方式一:继承RichSourceFunction p ...

  9. Flink系列之:Flink CDC深入了解MySQL CDC连接器

    Flink系列之:Flink CDC深入了解MySQL CDC连接器 一.增量快照特性 1.增量快照读取 2.并发读取 3.全量阶段支持 checkpoint 4.无锁算法 5.MySQL高可用性支持 ...

最新文章

  1. centos7 安装 Mysql 5.7.28,详细完整教程
  2. Linux环境Nginx安装多版本PHP
  3. 用eclipse生成可运行jar包、启动jar包及常见错误
  4. 数据库 —— 应用程序与数据库的连接
  5. mysql 安装只有一半_记一次MySQL安装出现的坑爹问题。。。
  6. Day14作业 三、编程题 写一个Student类,属性:名字,年龄,分数,班级 (注意分包)
  7. 安卓Java读取SD卡文本文件
  8. UI交互设计关键词:情感化设计与心理
  9. 坯子库安装不上_柜式七氟丙烷的安装调试方法
  10. 由于3²+4²=5²,所以称‘3,4,5‘为勾股数,求n(包括n)以内所有勾股数数组。
  11. 博途调试g120_【免费资料】西门子变频器调试软件汇总
  12. 三天搭建内容推荐系统——标签挖掘、画像搭建、算法推荐
  13. 数据分析师必备技能之埋点
  14. 淘宝人群拖价怎么做? 大神导航,一个神奇的网站,从此开启大神之路!
  15. php如何处理查询请求,PHP的curl查看header信息的功能(包括查看返回header和请求header)...
  16. 微信DAT文件解密(dat转图像)
  17. 天猫淘宝整合营销 为你的店铺点亮奇迹!
  18. rj45 千兆接口定义_RJ45接口针脚定义(各种接口针脚定义)
  19. 高精度减法(C语言实现)
  20. CentOS8设置时间同步

热门文章

  1. 线段树合并:从入门到放弃
  2. Wish3D用户必看!模型加载失败原因汇总
  3. python画三维坐标_使用PyOpenGL绘制三维坐标系实例
  4. 天翼云安全一体化纵深体系是怎么炼成的?
  5. python手记(四):pillow(一) Image类简单图片处理
  6. navicat for mysql打开控制台操作数据库
  7. spring-boot 入门 ssmb小例子
  8. 自主研发智能扭蛋机,快乐扭蛋获数百万 Pre-A 轮融资
  9. 小米基于 Flink 的实时数仓建设实践
  10. Nginx配置基于ip的虚拟主机