logstash实现mysql数据库表实时同步
logstash可以将不同数据源,例如日志、文件、或jdbc等,同步到ElasticSearch中,本文利用logstash实现mysql数据库表之间的数据。(实例:数据库DB1中的表A有添加或者修改,数据库DB2中的表B也会自动同步)
一、准备:
数据源输入使用logstash中自带的logstash-input-jdbc
,无需额外安装,官网使用说明地址。
数据源输出需要使用logstash-output-jdbc
,但是在loastash官网中output plugins列表中并没有相关插件,需要额外安装,使用说明在Github地址。
安装logstash
将logstash下载后,放到/opt/elastic/
目录下,并将logstash目录重命名为logstash-test
。
安装logstash-output-jdbc
,在/opt/elastic/logstash-test
目录下执行:
bin/logstash-plugin install logstash-output-jdbc
安装成功:
二、数据库表
在数据库DB1中创建表A,并添加数据如下:
在数据库DB2中创建表B,表结构与A一致,暂不添加数据:
三、logstash配置文件
logstash配置文件中必须包含两个元素input
和output
,分别是数据来源的配置和数据输出的配置。还有一个可选项filter
,用来处理数据源和数据输出的之间的适配,例如,需要将某个字段的值10以后再输出,这个10的动作就应该写在filter
模块;还有数据源和数据输出字段的编码不同,日期类型不同等情况的处理。(由于本文中A表和B表中数据结构都是一样的,只是实现简单的数据同步,暂时用不到filter
)
同步配置文件如下:后面对每个部分进行解释。
input {jdbc {jdbc_connection_string => "jdbc:mysql://IP:3306/DB1?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true"jdbc_user => "expert"jdbc_password => "123456"jdbc_driver_library => "/opt/elastic/logstash-test/mysql-connector-java-5.1.6.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"statement => "SELECT title,description FROM A"}
}filter {}output {jdbc {connection_string => "jdbc:mysql://IP:3306/DB2?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true"username => "root"password => "root123"driver_jar_path => "/opt/elastic/logstash-test/mysql-connector-java-5.1.6.jar"driver_class => "com.mysql.jdbc.Driver"statement => ["insert into B (title,description) values (?,?)","[title]","[description]"]}stdout {codec => json_lines}
}
input
配置
input的配置中有很多强大的功能,详细使用见官网地址,此处只介绍上面涉及到的参数:
- jdbc_connection_string :数据库连接配置
- jdbc_user :用户名(连接DB1的用户名)
- jdbc_password:密码(连接DB1的密码)
- jdbc_driver_library:数据驱动的jar位置
- jdbc_driver_class :数据库驱动类名(类似jdbc中的Class.forName(“com.mysql.jdbc.Driver”))
- statement :查询数据源的sql语句(output中就是将此处的查询结果insert到数据库DB2中)
output
配置
由于logstash-output-jdbc
是额外扩展的output插件,在配置参数的写法上也略有不同,比如:所有的参数前面都没有jdbc_前缀
- connection_string :数据库连接配置
- username :用户名(连接DB2的用户名
- password :密码(连接DB2的密码)
- driver_jar_path :与input参数中的
jdbc_driver_library
一致 - driver_class :与input参数中的
jdbc_driver_class
一致 - statement :向DB2中添加数据的insert语句。
四、执行启动命令
将上述的配置文件命令为logstash_default.conf
,放在logstash-test/conf
文件夹下,在logstash-test
下执行:
logstash启动命令:
./bin/logstash -f /opt/elastic/logstash-test/config/logstash_default.conf --path.data=/opt/elastic/logstash-test/test
启动后发现报如下异常:
java.lang.IllegalAccessError: tried to access class com.mysql.jdbc.EscapeProcessor from class com.mysql.jdbc.ConnectionImpl
java.lang.IllegalAccessError: com/mysql/jdbc/EscapeProcessor
遗憾的是目前没有找到为什么会报这个异常,不过换了一种驱动的配置方式,这个异常就消失了。另外一种指定驱动jar包的方式也是官网给出的方式如下:
下面结合本例中给出解决办法。
启动异常处理方式:
在logstash目录下,创建目录vendor/jar/jdbc
(/opt/elastic/logstash-test/vendor/jar/jdbc),将驱动jar包放入该路径下。
将配置文件中的driver_jar_path
注释掉,
重新执行logstash启动命令
./bin/logstash -f /opt/elastic/logstash-test/config/logstash_default.conf --path.data=/opt/elastic/logstash-test/test
执行结果:
检查DB2中的表B:数据已经全部同步过去了。
五、定时自动同步数据
按照上面的过程,能实现执行logstash命令以后,DB1中的表A和DB2中的表B数据同步,但是如果后续表A中的数据有新增或者修改,还需要再去启动logstash。logstash提供了一种定时任务的方式,定期去检查表A中的数据是否有变化,根据表A的最后修改时间(LastUpdateDate)将表A中新增和修改的数据修改新增到表B中。
将配置文件做如下修改:
input {jdbc {jdbc_connection_string => "jdbc:mysql://IP:3306/DB1?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true"jdbc_user => "expert"jdbc_password => "123456"jdbc_driver_library => "/opt/elastic/logstash-test/mysql-connector-java-5.1.6.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"statement => "SELECT title,description FROM A"schedule => "* * * * *"record_last_run => trueuse_column_value => truetracking_column => "LastUpdateDate"tracking_column_type => "timestamp"last_run_metadata_path => "/opt/elastic/logstash-test/last_record/logstash_default_last_time"clean_run => falselowercase_column_names => false}
}filter {}output {jdbc {connection_string => "jdbc:mysql://IP:3306/DB2?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true"username => "root"password => "root123"driver_class => "com.mysql.jdbc.Driver"statement => ["insert into B (title,description) values(?,?) on duplicate key update title=values(title),description=values(description)","[title]","[description]"]}stdout {codec => json_lines}
}
input
中新增的参数:
schedule => "* * * * *"
:每分钟检查一次- ` record_last_run => true``:是否记录上一下执行的时间
- ` use_column_value => true``:是否使用数据源的字段
- `tracking_column => “LastUpdateDate”``:数据源的最后修改时间字段
- `tracking_column_type => “timestamp”``:字段类型
- ` last_run_metadata_path => “/opt/elastic/logstash-test/last_record/logstash_default_last_time”``:存放最后修改时间的文件位置
- ` clean_run => false``:这个参数表示你在开启Logstash同步数据时需不需要clean掉上次的记录
lowercase_column_names => false
读取字段时是否区分大小写
output
新增的参数:
stdout {codec => json_lines
}
stdout
为可选字段,将输出数据的方式加一种,stdout
可以把input中statement 的select结果转为json字符串打印到logstash的log中,便于追踪检查哪些数据被更新了。
六、补充
output中的statement
修改:
将表B中的title字段设置为唯一约束,将statement改为如下。即可实现如果title相同的时候,只修改记录,而不是新增。唯一约束可以根据实际需求去设置。
["insert into B (title,description) values(?,?) on duplicate key update title=values(title),description=values(description)","[title]","[description]"]
表A修改完只有需要1分钟以后才能在表B中看到同步效果,因为定时任务设置的每分钟执行一次。
logstash实现mysql数据库表实时同步相关推荐
- Sql Server 2008 实现不同数据库-表实时同步 本地发布/订阅
SQL server 不同数据库之间的表实时同步 图片大部分来自Sql Server 2008 实现不同数据库-表实时同步 本地发布/订阅 - 简书 有做适当的内容增加解释. 本地发布 1.新建发布 ...
- MySQL数据库导入或者同步大量数据时数据丢失解决方案
MySQL数据库导入或者同步大量数据时数据丢失解决方案 参考文章: (1)MySQL数据库导入或者同步大量数据时数据丢失解决方案 (2)https://www.cnblogs.com/miss-li/ ...
- 通过Logstash实现mysql数据定时增量同步到ES
文章目录 前言 一.系统配置 二.同步步骤整体概览 三.logstash数据同步实战 1.新建mysql表 2.ES中新建索引 3.Logstash 管道配置 4.启动Logstash 5.测试 6. ...
- mysql数据库的主从同步(主服务器存在内网IP)
1 分别在两台服务器上安装系统和mysql数据库 主服务器WIN2008R2,主服务器虚拟机和从服务器上是centos 7系统,并在centos 7系统安装mysql 5.7 本文中的两台服务器的IP ...
- MySQL异构同步_详解MySQL数据库异构数据同步
本文主要向大家介绍了MySQL数据库异构数据同步,通过具体的内容向大家展现,希望对大家学习MySQL数据库有所帮助. 在实现levelDB挂载成MySQL引擎时,发现在实际存储是key-value格式 ...
- MySql数据库从库同步的延迟问题及解决方案
1).MySQL数据库主从同步延迟原理mysql主从同步原理: 主库针对写操作,顺序写binlog,从库单线程去主库顺序读"写操作的binlog",从库取到binlog在本地原样执 ...
- Canal 实现 Mysql数据库实时数据同步
简介 1.1 canal介绍 Canal是一个基于MySQL二进制日志的高性能数据同步系统.Canal广泛用于阿里巴巴集团(包括https://www.taobao.com),以提供可靠的低延迟增量数 ...
- Mysql数据库之结构同步
Mysql数据库之结构同步 同步步骤: 1.点击"工具",选择结构同步 2.选择被同步的数据库连接和数据库.要同步到的数据库连接和数据库,点击"对比"按钮 3. ...
- mysql数据库的主从同步,实现读写分离
目录 前言 1 分别在两台centos 7系统上安装mysql 5.7 2 master主服务器的配置 2.1 配置文件my.cnf的修改 2.2 创建从服务器的用户和权限 2.3 重启mysql服务 ...
最新文章
- ftp 信息服务器日常维护,Web Ftp Mail服务器的日常管理与维护
- 两种通过代码访问SalesOrder header text内容的办法
- linux nuttx 环境搭建,ubuntu14.04 nuttx开发环境的搭建
- IE浏览器开发人员工具怎么使用
- 吴恩达深度学习 —— 作业2
- crm高速开发之EntityCollection
- osx10.15.5降级成10.3.3
- java pattern堆栈溢出_我的正则表达式导致Java中的堆栈溢出;我错过了什么?
- __str__和__repr__
- win7(32位)U盘安装、卸载ubuntu(64位)双系统
- 线程并行化的概念及其用法
- Java项目开发流程
- anylogic和java_Anylogic各个版本的功能对比
- 2022 AI趋势8大预测
- leetcode237
- ssh-keygen 常用命令与参数
- Java高级之Float类和Double类的isNaN()方法
- charles的使用
- CSS中 *{ }、*zoom,各种 * 代表的意思
- JRebel has expired Jrebel过期解决!!