概述

把 mysql 的数据迁移到 es 有很多方式,比如直接用 es 官方推荐的 logstash 工具,或者监听 mysql 的 binlog 进行同步,可以结合一些开源的工具比如阿里的 canal。

这里打算详细介绍另一个也是不错的同步方案,这个方案基于 kafka 的连接器。流程可以概括为:

mysql连接器监听数据变更,把变更数据发送到 kafka topic。

ES 监听器监听kafka topic 消费,写入 ES。

Kafka Connect有两个核心概念:Source和Sink。 Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector,也就是连接器。在本例中,mysql的连接器是source,es的连接器是sink。

这些连接器本身已经开源,我们之间拿来用即可。不需要再造轮子。

过程详解

准备连接器工具

我下面所有的操作都是在自己的mac上进行的。

首先我们准备两个连接器,分别是 kafka-connect-elasticsearch 和 kafka-connect-elasticsearch, 你可以通过源码编译他们生成jar包,源码地址:

我个人不是很推荐这种源码的编译方式,因为真的好麻烦。除非想研究源码。

我是直接下载 confluent 平台的工具包,里面有编译号的jar包可以直接拿来用,下载地址:

我下载的是 confluent-5.3.1 版本, 相关的jar包在 confluent-5.3.1/share/java 目录下

我们把编译好的或者下载的jar包拷贝到kafka的libs目录下。拷贝的时候要注意,除了 kafka-connect-elasticsearch-5.3.1.jar 和 kafka-connect-jdbc-5.3.1.jar,相关的依赖包也要一起拷贝过来,比如es这个jar包目录下的http相关的,jersey相关的等,否则会报各种 java.lang.NoClassDefFoundError 的错误。

另外mysql-connector-java-5.1.22.jar也要放进去。

数据库和ES环境准备

数据库和es我都是在本地启动的,这个过程具体就不说了,网上有很多参考的。

我创建了一个名为test的数据库,里面有一个名为login的表。

配置连接器

这部分是最关键的,我实际操作的时候这里也是最耗时的。

首先配置jdbc的连接器。

我们从confluent工具包里拷贝一个配置文件的模板(confluent-5.3.1/share目录下),自带的只有sqllite的配置文件,拷贝一份到kafka的config目录下,改名为sink-quickstart-mysql.properties,文件内容如下:

# tasks to create:

name=mysql-login-connector

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

tasks.max=1

connection.url=jdbc:mysql://localhost:3306/test?user=root&password=11111111

mode=timestamp+incrementing

timestamp.column.name=login_time

incrementing.column.name=id

topic.prefix=mysql.

table.whitelist=login

connection.url指定要连接的数据库,这个根据自己的情况修改。mode指示我们想要如何查询数据。在本例中我选择incrementing递增模式和timestamp 时间戳模式混合的模式, 并设置incrementing.column.name递增列的列名和时间戳所在的列名。

混合模式还是比较推荐的,它能尽量的保证数据同步不丢失数据。具体的原因大家可以查阅相关资料,这里就不详述了。

topic.prefix是众多表名之前的topic的前缀,table.whitelist是白名单,表示要监听的表,可以使组合多个表。两个组合在一起就是该表的变更topic,比如在这个示例中,最终的topic就是mysql.login。

connector.class是具体的连接器处理类,这个不用改。

其它的配置基本不用改。

接下来就是ES的配置了。同样也是拷贝 quickstart-elasticsearch.properties 文件到kafka的config目录下,然后修改,我自己的环境内容如下:

name=elasticsearch-sink

connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

tasks.max=1

topics=mysql.login

key.ignore=true

connection.url=http://localhost:9200

type.name=mysqldata

topics的名字和上面mysql设定的要保持一致,同时这个也是ES数据导入的索引。从里也可以看出,ES的连接器一个实例只能监听一张表。

type.name需要关注下,我使用的ES版本是7.1,我们知道在7.x的版本中已经只有一个固定的type(_doc)了,使用低版本的连接器在同步的时候会报错误,我这里使用的5.3.1版本已经兼容了。继续看下面的章节就知道了。

关于es连接器和es的兼容性问题,有兴趣的可以看看下面这个issue:

启动测试

当然首先启动zk和kafka。

然后我们启动mysql的连接器,

./bin/connect-standalone.sh config/connect-standalone.properties config/source-quickstart-mysql.properties &

接着手动往login表插入几条记录,正常情况下这些变更已经发到kafka对应的topic上去了。为了验证,我们在控制台启动一个消费者从mysql.login主题读取数据:

./bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic mysql.login --from-beginning

可以看到刚才插入的数据。

把数据从 MySQL 移动到 Kafka 里就算完成了,接下来把数据从 Kafka 写到 ElasticSearch 里。

首先启动ES和kibana,当然后者不是必须的,只是方便我们在IDE环境里测试ES。你也可以通过控制台给ES发送HTTP的指令。

先把之前启动的mysql连接器进程结束(因为会占用端口),再启动 ES 连接器,

./bin/connect-standalone.sh config/connect-standalone.properties config/quickstart-elasticsearch.properties &

如果正常的话,ES这边应该已经有数据了。打开kibana的开发工具,在console里执行

GET _cat/indices

这是获取节点上所有的索引,你应该能看到,

green open mysql.login 1WqRjkbfTlmXj8eKBPvAtw 1 1 4 0 12kb 7.8kb

说明索引已经正常创建了。然后我们查询下,

GET mysql.login/_search?pretty=true

结果如下,

{

"took" : 1,

"timed_out" : false,

"_shards" : {

"total" : 1,

"successful" : 1,

"skipped" : 0,

"failed" : 0

},

"hits" : {

"total" : {

"value" : 4,

"relation" : "eq"

},

"max_score" : 1.0,

"hits" : [

{

"_index" : "mysql.login",

"_type" : "mysqldata",

"_id" : "mysql.login+0+0",

"_score" : 1.0,

"_source" : {

"id" : 1,

"username" : "lucas1",

"login_time" : 1575870785000

}

},

{

"_index" : "mysql.login",

"_type" : "mysqldata",

"_id" : "mysql.login+0+1",

"_score" : 1.0,

"_source" : {

"id" : 2,

"username" : "lucas2",

"login_time" : 1575870813000

}

},

{

"_index" : "mysql.login",

"_type" : "mysqldata",

"_id" : "mysql.login+0+2",

"_score" : 1.0,

"_source" : {

"id" : 3,

"username" : "lucas3",

"login_time" : 1575874031000

}

},

{

"_index" : "mysql.login",

"_type" : "mysqldata",

"_id" : "mysql.login+0+3",

"_score" : 1.0,

"_source" : {

"id" : 4,

"username" : "lucas4",

"login_time" : 1575874757000

}

}

]

}

}

参考:

1.《kafka权威指南》

关注公众号:犀牛饲养员的技术笔记

ES迁mysql_使用kafka连接器迁移mysql数据到ElasticSearch相关推荐

  1. python脚本迁移数据库_Python迁移MySQL数据到MongoDB脚本

    MongoDB是一个文档数据库,在存储小文件方面存在天然优势.随着业务求的变化,需要将线上MySQL数据库中的行记录,导入到MongoDB中文档记录. 一.场景:线上MySQL数据库某表迁移到Mong ...

  2. mysql中如何迁移数据文件,迁移mysql数据文件存放位置

    迁移mysql数据文件存放位置 (2012-01-11 14:46:30) 标签: mysql db datafile 数据 文件 1.备份当前的数据库 2.停止mysql服务 root@box:~/ ...

  3. 关于Ubuntu 16.04系统挂载硬盘以及迁移MYSQL数据存储目录的操作步骤

    转载链接 : 关于Ubuntu 16.04系统挂载硬盘以及迁移MYSQL数据存储目录的操作步骤 :https://www.jianshu.com/p/58093888ee25 本文背景: 现有项目系统 ...

  4. 使用canal实时同步MySQL数据到Elasticsearch

    使用canal实时同步MySQL数据到Elasticsearch 搭建环境 安装 elasticsearch 安装 kibana 下载和安装canal 1.下载canal 2.配置MySQL 3.配置 ...

  5. 使用canal同步MySQL数据到Elasticsearch(ES)

    目录 1.功能及使用场景 1.1.功能介绍 1.2.使用场景 2.需求引入 3.canal文件下载及准备 3.1 下载文件 3.2 准备文件 4.deployer安装及效果测试 4.1.deploye ...

  6. 使用sqoop迁移mysql数据到hive

    相关参考文献: 1.原文地址:http://www.cnblogs.com/charlist/p/7122198.html 使用Sqoop能够极大简化MySQL数据迁移至Hive之流程,并降低Hado ...

  7. 实践练习四:迁移 MySQL 数据到 OceanBase 集群

    练习目的 本次练习目的掌握从 MySQL 向 OceanBase 迁移数据的基本方法:mysqldump.datax .canal 等. 练习内容 请记录并分享下列内容: (必选)使用 mysqldu ...

  8. mysql | 利用docker快速迁移mysql数据

    一.项目背景   最近在进行系统的复刻迁移,如,有个demo环境,其中包含mysql.bpm.nginx等,mysql使用的是源码安装.目前最繁琐的步骤就是迁移mysql,mysql数据库级别在20G ...

  9. 通过dts迁移MYSQL数据到达梦数据库

    达梦提供了免费的数据迁移工具DTS,可以将oracle,mysql,sql server,pg等数据库迁移到达梦,全图形化操作,非常的直观和方便,而且速度也很快. 安装完达梦数据库软件后,已自带这个数 ...

最新文章

  1. Visual Studio原生开发的10个调试技巧
  2. 计算机windows多用户,windows Server 2012 专业版配置多用户远程桌面连接
  3. 创建featureclass,为它赋别名,并移动到数据集下
  4. Linux sed命令高级用法精讲
  5. python蟒蛇绘制
  6. POJ 3308 最少点集覆盖
  7. 本地上传文件到FastDFS命令上传报错:ERROR - file: connection_pool.c, line: 142, connect to server 192.168.0.197:221
  8. 原生Django常用API 参数
  9. 如何修改Tomcat版本
  10. 有哪些防护措施可以解决DDOS攻击?
  11. 淘宝开店经验 心得 攻略
  12. 里约热内卢圣徒java_里约热内卢:圣徒之城
  13. 快乐星球计算机老师,《快乐星球Ⅴ》新电脑老师(下)
  14. 西瓜视频中视频计划还有机会吗?
  15. html表格填充空白单元格,Excel表格数据录入:怎么快速填充所有空白单元格?
  16. NR PRACH(一)Preamble的确定
  17. 335x系列平台-usb的模式切换HOST和OTG
  18. iOS 16.2 的7个惊人变化
  19. 一般信道容量的计算matlab,DMC信道容量迭代计算的matlab实现
  20. [转]《英特尔多核/多线程技术》

热门文章

  1. 关于lock_guard使用细节
  2. C#利用反射将Datatable转化为指定实体类ListT
  3. android底部弹出显示不全,Android 解决 NestedScrollView 底部内容被遮挡显示不全
  4. 64位程序怎么判断指针是否有效_AArch64应用程序级编程模型
  5. java可以返回微妙吗_Java开发中10个最为微妙的最佳编程实践
  6. c语言乘法口诀表的流程图_例18:C语言编程实现九九乘法表
  7. Win7旗舰版系统网页显示不全怎么办
  8. 谷歌浏览器怎么禁用flash flash禁用方法分享
  9. 腾讯视频免费下载安装_怎样下载腾讯视频里的视频
  10. SpringMVC深度探险 —— SpringMVC核心配置文件详解