logstash可以将不同数据源,例如日志、文件、或jdbc等,同步到ElasticSearch中,本文利用logstash实现mysql数据库表之间的数据。(实例:数据库DB1中的表A有添加或者修改,数据库DB2中的表B也会自动同步)

一、准备:

数据源输入使用logstash中自带的logstash-input-jdbc,无需额外安装,官网使用说明地址。
数据源输出需要使用logstash-output-jdbc,但是在loastash官网中output plugins列表中并没有相关插件,需要额外安装,使用说明在Github地址。

安装logstash

将logstash下载后,放到/opt/elastic/目录下,并将logstash目录重命名为logstash-test

安装logstash-output-jdbc,在/opt/elastic/logstash-test目录下执行:

 bin/logstash-plugin install logstash-output-jdbc

安装成功:

二、数据库表

在数据库DB1中创建表A,并添加数据如下:

在数据库DB2中创建表B,表结构与A一致,暂不添加数据:

三、logstash配置文件

logstash配置文件中必须包含两个元素inputoutput,分别是数据来源的配置和数据输出的配置。还有一个可选项filter,用来处理数据源和数据输出的之间的适配,例如,需要将某个字段的值10以后再输出,这个10的动作就应该写在filter模块;还有数据源和数据输出字段的编码不同,日期类型不同等情况的处理。(由于本文中A表和B表中数据结构都是一样的,只是实现简单的数据同步,暂时用不到filter

同步配置文件如下:后面对每个部分进行解释。

input {jdbc {jdbc_connection_string => "jdbc:mysql://IP:3306/DB1?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true"jdbc_user => "expert"jdbc_password => "123456"jdbc_driver_library => "/opt/elastic/logstash-test/mysql-connector-java-5.1.6.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"statement => "SELECT title,description FROM A"}
}filter {}output {jdbc {connection_string => "jdbc:mysql://IP:3306/DB2?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true"username => "root"password => "root123"driver_jar_path => "/opt/elastic/logstash-test/mysql-connector-java-5.1.6.jar"driver_class => "com.mysql.jdbc.Driver"statement => ["insert into B (title,description) values (?,?)","[title]","[description]"]}stdout {codec => json_lines}
}

input配置

input的配置中有很多强大的功能,详细使用见官网地址,此处只介绍上面涉及到的参数:

  • jdbc_connection_string :数据库连接配置
  • jdbc_user :用户名(连接DB1的用户名)
  • jdbc_password:密码(连接DB1的密码)
  • jdbc_driver_library:数据驱动的jar位置
  • jdbc_driver_class :数据库驱动类名(类似jdbc中的Class.forName(“com.mysql.jdbc.Driver”))
  • statement :查询数据源的sql语句(output中就是将此处的查询结果insert到数据库DB2中)

output配置

由于logstash-output-jdbc是额外扩展的output插件,在配置参数的写法上也略有不同,比如:所有的参数前面都没有jdbc_前缀

  • connection_string :数据库连接配置
  • username :用户名(连接DB2的用户名
  • password :密码(连接DB2的密码)
  • driver_jar_path :与input参数中的jdbc_driver_library一致
  • driver_class :与input参数中的jdbc_driver_class一致
  • statement :向DB2中添加数据的insert语句。

四、执行启动命令

将上述的配置文件命令为logstash_default.conf,放在logstash-test/conf文件夹下,在logstash-test下执行:

logstash启动命令

./bin/logstash -f /opt/elastic/logstash-test/config/logstash_default.conf --path.data=/opt/elastic/logstash-test/test

启动后发现报如下异常:

java.lang.IllegalAccessError: tried to access class com.mysql.jdbc.EscapeProcessor from class com.mysql.jdbc.ConnectionImpl

java.lang.IllegalAccessError: com/mysql/jdbc/EscapeProcessor

遗憾的是目前没有找到为什么会报这个异常,不过换了一种驱动的配置方式,这个异常就消失了。另外一种指定驱动jar包的方式也是官网给出的方式如下:

下面结合本例中给出解决办法。

启动异常处理方式:

在logstash目录下,创建目录vendor/jar/jdbc(/opt/elastic/logstash-test/vendor/jar/jdbc),将驱动jar包放入该路径下。

将配置文件中的driver_jar_path注释掉,

重新执行logstash启动命令

./bin/logstash -f /opt/elastic/logstash-test/config/logstash_default.conf --path.data=/opt/elastic/logstash-test/test

执行结果:

检查DB2中的表B:数据已经全部同步过去了。

五、定时自动同步数据

按照上面的过程,能实现执行logstash命令以后,DB1中的表A和DB2中的表B数据同步,但是如果后续表A中的数据有新增或者修改,还需要再去启动logstash。logstash提供了一种定时任务的方式,定期去检查表A中的数据是否有变化,根据表A的最后修改时间(LastUpdateDate)将表A中新增和修改的数据修改新增到表B中。

将配置文件做如下修改:

input {jdbc {jdbc_connection_string => "jdbc:mysql://IP:3306/DB1?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true"jdbc_user => "expert"jdbc_password => "123456"jdbc_driver_library => "/opt/elastic/logstash-test/mysql-connector-java-5.1.6.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"statement => "SELECT title,description FROM A"schedule => "* * * * *"record_last_run => trueuse_column_value => truetracking_column => "LastUpdateDate"tracking_column_type => "timestamp"last_run_metadata_path => "/opt/elastic/logstash-test/last_record/logstash_default_last_time"clean_run => falselowercase_column_names => false}
}filter {}output {jdbc {connection_string => "jdbc:mysql://IP:3306/DB2?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true"username => "root"password => "root123"driver_class => "com.mysql.jdbc.Driver"statement => ["insert into B (title,description) values(?,?) on duplicate key update title=values(title),description=values(description)","[title]","[description]"]}stdout {codec => json_lines}
}

input中新增的参数:

  • schedule => "* * * * *":每分钟检查一次
  • ` record_last_run => true``:是否记录上一下执行的时间
  • ` use_column_value => true``:是否使用数据源的字段
  • `tracking_column => “LastUpdateDate”``:数据源的最后修改时间字段
  • `tracking_column_type => “timestamp”``:字段类型
  • ` last_run_metadata_path => “/opt/elastic/logstash-test/last_record/logstash_default_last_time”``:存放最后修改时间的文件位置
  • ` clean_run => false``:这个参数表示你在开启Logstash同步数据时需不需要clean掉上次的记录
  • lowercase_column_names => false读取字段时是否区分大小写

output新增的参数:

stdout {codec => json_lines
}

stdout为可选字段,将输出数据的方式加一种,stdout可以把input中statement 的select结果转为json字符串打印到logstash的log中,便于追踪检查哪些数据被更新了。

六、补充

output中的statement修改:

将表B中的title字段设置为唯一约束,将statement改为如下。即可实现如果title相同的时候,只修改记录,而不是新增。唯一约束可以根据实际需求去设置。

["insert into B (title,description) values(?,?) on duplicate key update title=values(title),description=values(description)","[title]","[description]"]

表A修改完只有需要1分钟以后才能在表B中看到同步效果,因为定时任务设置的每分钟执行一次。

logstash实现mysql数据库表实时同步相关推荐

  1. Sql Server 2008 实现不同数据库-表实时同步 本地发布/订阅

    SQL server 不同数据库之间的表实时同步 图片大部分来自Sql Server 2008 实现不同数据库-表实时同步 本地发布/订阅 - 简书 有做适当的内容增加解释. 本地发布 1.新建发布 ...

  2. MySQL数据库导入或者同步大量数据时数据丢失解决方案

    MySQL数据库导入或者同步大量数据时数据丢失解决方案 参考文章: (1)MySQL数据库导入或者同步大量数据时数据丢失解决方案 (2)https://www.cnblogs.com/miss-li/ ...

  3. 通过Logstash实现mysql数据定时增量同步到ES

    文章目录 前言 一.系统配置 二.同步步骤整体概览 三.logstash数据同步实战 1.新建mysql表 2.ES中新建索引 3.Logstash 管道配置 4.启动Logstash 5.测试 6. ...

  4. mysql数据库的主从同步(主服务器存在内网IP)

    1 分别在两台服务器上安装系统和mysql数据库 主服务器WIN2008R2,主服务器虚拟机和从服务器上是centos 7系统,并在centos 7系统安装mysql 5.7 本文中的两台服务器的IP ...

  5. MySQL异构同步_详解MySQL数据库异构数据同步

    本文主要向大家介绍了MySQL数据库异构数据同步,通过具体的内容向大家展现,希望对大家学习MySQL数据库有所帮助. 在实现levelDB挂载成MySQL引擎时,发现在实际存储是key-value格式 ...

  6. MySql数据库从库同步的延迟问题及解决方案

    1).MySQL数据库主从同步延迟原理mysql主从同步原理: 主库针对写操作,顺序写binlog,从库单线程去主库顺序读"写操作的binlog",从库取到binlog在本地原样执 ...

  7. Canal 实现 Mysql数据库实时数据同步

    简介 1.1 canal介绍 Canal是一个基于MySQL二进制日志的高性能数据同步系统.Canal广泛用于阿里巴巴集团(包括https://www.taobao.com),以提供可靠的低延迟增量数 ...

  8. Mysql数据库之结构同步

    Mysql数据库之结构同步 同步步骤: 1.点击"工具",选择结构同步 2.选择被同步的数据库连接和数据库.要同步到的数据库连接和数据库,点击"对比"按钮 3. ...

  9. mysql数据库的主从同步,实现读写分离

    目录 前言 1 分别在两台centos 7系统上安装mysql 5.7 2 master主服务器的配置 2.1 配置文件my.cnf的修改 2.2 创建从服务器的用户和权限 2.3 重启mysql服务 ...

最新文章

  1. ftp 信息服务器日常维护,Web Ftp Mail服务器的日常管理与维护
  2. 两种通过代码访问SalesOrder header text内容的办法
  3. linux nuttx 环境搭建,ubuntu14.04 nuttx开发环境的搭建
  4. IE浏览器开发人员工具怎么使用
  5. 吴恩达深度学习 —— 作业2
  6. crm高速开发之EntityCollection
  7. osx10.15.5降级成10.3.3
  8. java pattern堆栈溢出_我的正则表达式导致Java中的堆栈溢出;我错过了什么?
  9. __str__和__repr__
  10. win7(32位)U盘安装、卸载ubuntu(64位)双系统
  11. 线程并行化的概念及其用法
  12. Java项目开发流程
  13. anylogic和java_Anylogic各个版本的功能对比
  14. 2022 AI趋势8大预测
  15. leetcode237
  16. ssh-keygen 常用命令与参数
  17. Java高级之Float类和Double类的isNaN()方法
  18. charles的使用
  19. CSS中 *{ }、*zoom,各种 * 代表的意思
  20. JRebel has expired Jrebel过期解决!!

热门文章

  1. 微信公众号开发之绑定微信开发者
  2. Vuejs基本知识(一)【项目文件夹基本结构】
  3. AI生成二维码Stable diffusion生成可识别二维码【附完整教程】【附完整案例】
  4. 为什么APP需要做漏洞渗透测试服务?
  5. 棋牌游戏一上线就被DDOS和CC攻击怎么解决?
  6. 名帖117 文徵明 小楷《草堂十志》
  7. 90+深度学习开源数据集整理|包括目标检测、工业缺陷、图像分割等多个方向
  8. Linux下解压缩zip、tar、gz
  9. iis php ttfb,IIS高且不稳定的TTFB
  10. Throw someone a bone 表面的恩赐