用 canal 监控 binlog 并实现mysql定制同步数据的功能
作者:何白白
cnblogs.com/hebaibai/p/10911899.html
业务背景
写任何工具都不能脱离实际业务的背景。开始这个项目的时候是因为现有的项目中数据分布太零碎,零零散散的分布在好几个数据库中,没有统一的数据库来收集这些数据。这种情况下想做一个大而全的会员中心系统比较困难。(这边是一个以互联网保险为中心的项目,保单,会员等数据很零散的储存在好几个项目之中,并且项目之间的数据基本上是隔离的)。
现有的项目数据库是在腾讯云中储存,虽然腾讯提供了数据同步功能,但是这样必须要表结构相同才行,并不符合我们的需求。所以需要自行开发。
需求
需要能灵活配置。
实时数据10分钟内希望可以完成同步。
来源数据与目标数据可能结构,字段名称不同。
增删改都可以同步。
技术选择
这个任务交给了我和另外一个同事来做。
同事的
同事希望可以通过ETL工具Kettle来做,这个东西我没有研究过,是同事自己在研究。具体过程不是很清楚,但是最后是通过在mysql中设置更新,修改,删除的触发器,然后在Kettle中做了一个定时任务,实现了数据同步的功能,初步测试符合需求。但是必须要在数据库中设置触发器,并且会有一个临时表,这一点我个人不是很喜欢。
我的
我是本着能自己写就自己写的原则,准备自己写一个。刚开始使用的是定时任务比较两个库的数据差别,然后再同步数据。但是经过一定的数据测试后,发现在数据量大的时候,定时任务中的上一个任务没有执行完毕,下一个任务就又开始了。这样造成了两边数据不一致。最终这个方案废弃了。
后来通过研究,发现mysql的数据操作会记录在binlog中,这时就有了新的方案。可以通过逐行获取binlog信息,经过解析数据后,同步在目标库中。
既然有了方案,那么就开始做吧。
开始尝试1
首先要打开数据库的binlog功能,这一步比较简单,修改mysql的配置文件:/etc/mysql/mysql.conf.d/mysqld.cnf
,添加:
server-id = 1
log_bin = /var/log/mysql/mysql-bin.log
expire_logs_days = 10
max_binlog_size = 100M
binlog_format = ROW
然后重启mysql 就好了,具体每个参数的意思,搜索一下就好了。这时候随意的对某一个数据库中的表做一下增删改,对应的日志就会记录在/var/log/mysql/
这个文件夹下了。我们看一下这个文件夹里的东西:
这里的文件是没有办法正常查看的,需要使用mysql提供的命令来查看,命令是这个样子的:
1、查看
mysqlbinlog mysql-bin.000002
2、指定位置查看
mysqlbinlog --start-position="120" --stop-position="332" mysql-bin.000002
因为我们现在的binlog_format指定的格式是ROW(就在上面写的,还记得吗?),所谓binlog文件的内容没有办法正常查看,因为他是这个样子的:
这时,我们需要对输出进行解码
mysqlbinlog --base64-output=decode-rows -v mysql-bin.000001
这时候,显示的结果就变成了:
虽然还不是正常的sql,但是好赖是有一定的格式了。
but自己来做解析的话还是很麻烦,so~放弃这种操作。
继续尝试2
经过再次研究后,发现数据库中执行sql也是可以查看binlog的。主要有如下几条命令:
--重置binlog
reset master;--查看binlog的配置
show variables like '%binlog%';--查看所有的binlog
show binary logs;--查看正在写入的binlog
show master status;--查看指定binlog文件
show binlog events in 'mysql-bin.000001';--查看指定binlog文件,并指定位置
show binlog events in 'mysql-bin.000001' from [pos] limit [显示多少条];
按照上面的命令执行结果为:
发现sql还是不能正常显示。这里的原因应该是binlog_format配置的原因。将其修改为 binlog_format=Mixed后,完美解决。经过数据库中一通增删改后,显示的sql类似这样:
use `pay`; /* ApplicationName=DataGrip 2018.2.5 */ UPDATE `pay`.`p_pay_log` t SET t.`mark_0` = 'sdfsdf' WHERE t.`id` LIKE '342' ESCAPE '#'
现在似乎已经可以开始写数据同步了,只要在启动的时候获取当正在使用的是哪一个日志文件,记录binlog的位置,然后一点一点向下执行,解析sql就好了。但是在这个过程中,我发现阿里巴巴有一款开源的软件可以用。就是标题上说道的:canal。看了一下网站上的介绍,简直美滋滋。
它的文档和代码地址在这里:https://github.com/alibaba/canal
,大家可以看一下。现在就准备用这个来完成我所需要的功能。
正式开始写
首先看一下介绍,canal是需要单独运行一个服务的,这个服务具体的配置还是比较简单的。它的作用我自己理解就是监控binlog,然后根据自己的需要获取binlog中一定量的数据。这个数据是经过处理的,可以比较方便的知道里面的具体信息。比如那些数据发生了变动,每列数据的列名是什么,变动前和变动后的值是啥之类的。那么开始。
1.我的想法
1)项目启动的时候,开启canal的链接,以及初始化一些配置。
@Bean
public CanalConnector canalConnector() {CanalConnector connector = CanalConnectors.newSingleConnector(//对应canal服务的链接new InetSocketAddress(canalConf.getIp(), canalConf.getPort()),//链接的目标,这里对应canal服务中的配置,需要查阅文档canalConf.getDestination(),//不知道是什么用户,使用“”canalConf.getUser(),//不知道是什么密码,使用“”canalConf.getPassword());return connector;
}
2)先开启一个线程,里面写一个死循环,用于从canal的服务中获取binlog中的消息。这个消息类是:com.alibaba.otter.canal.protocol.Message。
Message message = connector.getWithoutAck(100);
connector:canal链接的实例化对象。
connector.getWithoutAck(100):从连接中获取100条binlog中的数据。
3)取出Message中的事件集合,就是binlog中的每一条数据。将类型为增删改的数据取出,之后每一条数据放在一个线程中,用线程池去执行它。
List<Entry> entries = message.getEntries();message.getEntries():从链接中获取的数据集合,每一条代表1条binlog数据
4)在每一个线程中,取出Entry中的数据,根据其类型拼接各种sql,并执行。
Header header = entry.getHeader();
//获取发生变化的表名称,可能会没有
String tableName = header.getTableName();//获取发生变化的数据库名称,可能会没有
String schemaName = header.getSchemaName();//获取事件类型
EventType eventType = rowChange.getEventType();
/**
这里我们只是用其中的三种类型:EventType.DELETE 删除EventType.INSERT 插入EventType.UPDATE 更新
*/
//获取发生变化的数据
RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());//遍历其中的数据
int rowDatasCount = rowChange.getRowDatasCount();
for (int i = 0; i < rowDatasCount; i++) {//每一行中的数据RowData rowData = rowChange.getRowDatas(i);
}//获取修改前的数据
List<Column> before = rowData.getBeforeColumnsList();//获取修改后的数据
List<Column> after = rowData.getAfterColumnsList();
Column中有一系列方法,比如是否发生修改,时候为key,是否是null等,就不在细说了。扩展:阿里Canal框架(数据同步中间件)初步实践
2.万事具备,可以开始写了
1)这里先写一个线程,用于不停的从canal服务中获取消息,然后创建新的线程并让其处理其中的数据。代码如下:
@Override
public void run() {while (true) {//主要用于在链接失败后用于再次尝试重新链接try {if (!run) {//打开链接,并设置 run=truestartCanal();}} catch (Exception e) {System.err.println("连接失败,尝试重新链接。。。");threadSleep(3 * 1000);}System.err.println("链接成功。。。");//不停的从CanalConnector中获取消息try {while (run) {//获取一定数量的消息,这里为线程池数量×3Message message = connector.getWithoutAck(batchSize * 3);long id = message.getId();//处理获取到的消息process(message);connector.ack(id);}} catch (Exception e) {System.err.println(e.getMessage());} finally {//如果发生异常,最终关闭连接,并设置run=falsestopCanal();}}}
void process(Message message) {List<Entry> entries = message.getEntries();if (entries.size() <= 0) {return;}log.info("process message.entries.size:{}", entries.size());for (Entry entry : entries) {Header header = entry.getHeader();String tableName = header.getTableName();String schemaName = header.getSchemaName();//这里判断是否可以取出数据库名称和表名称,如果不行,跳过循环if (StringUtils.isAllBlank(tableName, schemaName)) {continue;}//创建新的线程,并执行jobList.stream().filter(job -> job.isMatches(tableName, schemaName)).forEach(job -> executorService.execute(job.newTask(entry)));}
}
这里的jobList是我自己定义List<Job>
,代码如下:
package com.hebaibai.miner.job;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;import static com.alibaba.otter.canal.protocol.CanalEntry.Entry;@Slf4j
@Data
public abstract class Job {/*** 数据库链接*/protected JdbcTemplate jdbcTemplate;/*** 额外配置*/protected JSONObject prop;/*** 校验目标是否为合适的数据库和表** @param table* @param database* @return*/abstract public boolean isMatches(String table, String database);/*** 实例化一个Runnable** @param entry* @return*/abstract public Runnable newTask(final Entry entry);/*** 获取RowChange** @param entry* @return*/protected CanalEntry.RowChange getRowChange(Entry entry) {try {return CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (InvalidProtocolBufferException e) {e.printStackTrace();}return null;}}
jobList里面放的是Job的实现类。
3.写一个Job的实现类,并用于同步表,并转换字段名称。
因为需求中要求两个同步的数据中可能字段名称不一致,所以我写了一个josn用来配置两个表的字段对应关系:
//省略其他配置
"prop": {
//来源数据库"database": "pay",
//来源表"table": "p_pay_msg",
//目标表(目标库在其他地方配置)"target": "member",
//字段对应关系
//key :来源表的字段名
//value:目标表的字段名"mapping": {"id": "id","mch_code": "mCode","send_type": "mName","order_id": "phone","created_time": "create_time","creator": "remark"}
}
//省略其他配置
下面是全部的代码,主要做的就是取出变动的数据,按照对应的字段名重新拼装sql,然后执行就好了,不多解释。扩展:基于canal进行日志的订阅和转换
package com.hebaibai.miner.job;import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;import static com.alibaba.otter.canal.protocol.CanalEntry.*;/*** 单表同步,表的字段名称可以不同,类型需要一致* 表中需要有id字段*/
@SuppressWarnings("ALL")
@Slf4j
public class TableSyncJob extends Job {/*** 用于校验是否适用于当前的配置** @param table* @param database* @return*/@Overridepublic boolean isMatches(String table, String database) {return prop.getString("database").equals(database) &&prop.getString("table").equals(table);}/*** 返回一个新的Runnable** @param entry* @return*/@Overridepublic Runnable newTask(final Entry entry) {return () -> {RowChange rowChange = super.getRowChange(entry);if (rowChange == null) {return;}EventType eventType = rowChange.getEventType();int rowDatasCount = rowChange.getRowDatasCount();for (int i = 0; i < rowDatasCount; i++) {RowData rowData = rowChange.getRowDatas(i);if (eventType == EventType.DELETE) {delete(rowData.getBeforeColumnsList());}if (eventType == EventType.INSERT) {insert(rowData.getAfterColumnsList());}if (eventType == EventType.UPDATE) {update(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());}}};}/*** 修改后的数据** @param after*/private void insert(List<Column> after) {//找到改动的数据List<Column> collect = after.stream().filter(column -> column.getUpdated() || column.getIsKey()).collect(Collectors.toList());//根据表映射关系拼装更新sqlJSONObject mapping = prop.getJSONObject("mapping");String target = prop.getString("target");List<String> columnNames = new ArrayList<>();List<String> columnValues = new ArrayList<>();for (int i = 0; i < collect.size(); i++) {Column column = collect.get(i);if (!mapping.containsKey(column.getName())) {continue;}String name = mapping.getString(column.getName());columnNames.add(name);if (column.getIsNull()) {columnValues.add("null");} else {columnValues.add("'" + column.getValue() + "'");}}StringBuilder sql = new StringBuilder();sql.append("REPLACE INTO ").append(target).append("( ").append(StringUtils.join(columnNames, ", ")).append(") VALUES ( ").append(StringUtils.join(columnValues, ", ")).append(");");String sqlStr = sql.toString();log.debug(sqlStr);jdbcTemplate.execute(sqlStr);}/*** 更新数据** @param before 原始数据* @param after 更新后的数据*/private void update(List<Column> before, List<Column> after) {//找到改动的数据List<Column> updataCols = after.stream().filter(column -> column.getUpdated()).collect(Collectors.toList());//找到之前的数据中的keysList<Column> keyCols = before.stream().filter(column -> column.getIsKey()).collect(Collectors.toList());//没有key,执行更新替换if (keyCols.size() == 0) {return;}//根据表映射关系拼装更新sqlJSONObject mapping = prop.getJSONObject("mapping");String target = prop.getString("target");//待更新数据List<String> updatas = new ArrayList<>();for (int i = 0; i < updataCols.size(); i++) {Column updataCol = updataCols.get(i);if (!mapping.containsKey(updataCol.getName())) {continue;}String name = mapping.getString(updataCol.getName());if (updataCol.getIsNull()) {updatas.add("`" + name + "` = null");} else {updatas.add("`" + name + "` = '" + updataCol.getValue() + "'");}}//如果没有要修改的数据,返回if (updatas.size() == 0) {return;}//keysList<String> keys = new ArrayList<>();for (Column keyCol : keyCols) {String name = mapping.getString(keyCol.getName());keys.add("`" + name + "` = '" + keyCol.getValue() + "'");}StringBuilder sql = new StringBuilder();sql.append("UPDATE ").append(target).append(" SET ");sql.append(StringUtils.join(updatas, ", "));sql.append(" WHERE ");sql.append(StringUtils.join(keys, "AND "));String sqlStr = sql.toString();log.debug(sqlStr);jdbcTemplate.execute(sqlStr);}/*** 删除数据** @param before*/private void delete(List<Column> before) {//找到改动的数据List<Column> keyCols = before.stream().filter(column -> column.getIsKey()).collect(Collectors.toList());if (keyCols.size() == 0) {return;}//根据表映射关系拼装更新sqlJSONObject mapping = prop.getJSONObject("mapping");String target = prop.getString("target");StringBuilder sql = new StringBuilder();sql.append("DELETE FROM `").append(target).append("` WHERE ");List<String> where = new ArrayList<>();for (Column column : keyCols) {String name = mapping.getString(column.getName());where.add(name + " = '" + column.getValue() + "' ");}sql.append(StringUtils.join(where, "and "));String sqlStr = sql.toString();log.debug(sqlStr);jdbcTemplate.execute(sqlStr);}
}
项目源码
https://github.com/hjx601496320/plumber
END
Java面试题专栏
【51期】一道阿里面试题:说说你知道的关于BeanFactory和FactoryBean的区别
【52期】记一道简单的Java面试题,但答错率很高!
【53期】面试官:谈一下数据库分库分表之后,你是如何解决事务问题?
【54期】Java序列化三连问,是什么?为什么需要?如何实现?
【55期】面试中经常被问到Java引用类型原理,带你深入剖析
【56期】你说你熟悉并发编程,那么你说说Java锁有哪些种类,以及区别
【57期】面试官问,MySQL建索引需要遵循哪些原则呢?
【58期】盘点那些面试中最常问的MySQL问题,第一弹!
【59期】MySQL索引是如何提高查询效率的呢?(MySQL面试第二弹)
【60期】事务隔离级别中的可重复读能防幻读吗?(MySQL面试第三弹)
我知道你 “在看”
用 canal 监控 binlog 并实现mysql定制同步数据的功能相关推荐
- mysql 刷新二进制日志_使用binlog日志恢复MySQL数据库删除数据的方法
binlog日志简介: binlog 就是binary log,二进制日志文件,这个文件记录了MySQL所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间. b ...
- 【clickhouse】Clickhouse的MySQL引擎同步数据不准确 Decimal
文章目录 1.概述 2. 场景2 2.1 概述 1.概述 我想从MySQL同步数据到Clickhouse,但是发现Clickhouse的MySQL引擎同步数据不准确,精度丢失而且还是不对的.(很多数据 ...
- mysql 定时同步数据_MySQL数据同步之otter
一.otter介绍 基于日志数据,用于MySQL或者ORACLE之间准实时同步数据. 用途: mysql/oracle互相同步 中间表/行记录同步 二.原理及架构图 otter整体模块 manager ...
- Clickhouse单机部署以及从mysql增量同步数据
背景: 随着数据量的上升,OLAP一直是被讨论的话题,虽然druid,kylin能够解决OLAP问题,但是druid,kylin也是需要和hadoop全家桶一起用的,我也搞不定,那只能找我能搞定的技术 ...
- mysql自动同步数据_MySQL数据库实现双向自动同步
[IT168 技术]本文将探讨如何通过MySQL数据库的高级特性,实现数据库的双向自动同步,确保数据的冗余与完整性.通过以往真实的项目实战与经验,把操作实施过程全部记录下来,主要有以下几个主要内容. ...
- clickhouse 同步mysql_ClickHouse单机部署以及从MySQL增量同步数据
背景: 随着数据量的上升,OLAP一直是被讨论的话题,虽然druid,kylin能够解决OLAP问题,但是druid,kylin也是需要和hadoop全家桶一起用的,异常的笨重,再说我也搞不定,那只能 ...
- 第02期:ClickHouse 单机部署以及从 MySQL 增量同步数据
本期作者:邓亚运 37 互娱高级 DBA,负责公司 MySQL,Redis,Hadoop,Clickhouse 集群的管理和维护. 背景 随着数据量的上升,OLAP 一直是被讨论的话题,虽然 drui ...
- mysql 日志同步 数据不同步_Mysql互为主从问题--日志同步数据不同步
Mysql互为主从问题--日志同步数据不同步 我搭建的是mysql互为主从 复制 两台机器的mysql环境完全相同 第一部分测试: B为master A为slave的同步测试 在B上创建表lian,并 ...
- 最佳实践:MySQL CDC 同步数据到 ES
作者:于乐,腾讯 CSIG 工程师 一. 方案描述 1.1 概述 在线教育是一种利用大数据.人工智能等新型互联网技术与传统教育行业相结合的新型教育方式.发展在线教育可以更好的构建网络化.数字化.个性化 ...
最新文章
- AngularJs 基础教程 —— 依赖注入
- hibernate ORM related
- python教程书籍推荐-买Python入门书籍,我推荐这一本
- python 冒泡排序算法(超级详细)
- aac文件损坏修复软件_SysTools Outlook Recovery Tool : 修复损坏的Outlook PST文件的先进的软件...
- 学习SAP项目成功实施的十大条件
- 飞天技术汇“2018云栖大会·上海峰会”专场,等你加入
- smartupload 路径不存在_洞悉复杂金融场景,覆盖完备测试路径
- python与C、C++混编的四种方式
- 如何部署同一个Spring boot web 应用到不同的环境
- 提高solr的搜索速度
- java字典树(Trie)实现中文模糊匹配
- Linux服务器上新增开放端口号
- 后疫情时代“三个超级”助力品牌实现数字化增长及高效落地-白皮书
- 机器学习初探(手写数字识别)HOG图片
- 爷青结,Microsoft 放弃 Windows 95 时代的图标
- springboot 上传文件解析入库_springboot上传文件的访问
- 硬币支付问题(贪心策略)
- IEnumerable 和 IEnumerator
- 编译DCNv2网络:error: command ‘C:\\Program Files\\NVIDIAGPUComputingToolkit\\CUDA\\v10.0\\bin\\nvcc.exe‘
热门文章
- 中国移动携手华为完成5G话音的全部功能测试
- 蔚来否认关闭硅谷办公室 近期也没有回科创板的计划
- 国内最受欢迎手游国际版折戟:腾讯也很无奈
- 进击的爱奇艺文学:如何成为苹果园生态的重要一环?
- 2018年全球智能手机销售收入增至5220亿美元 但销量却下降了
- ORTP协议栈【ZT】
- Android 剪切板
- oracle 跨服务器推送视图_Oracle11g的v$diag_info视图获得控制文件转储文件名及位置...
- qt createtor 中文乱码
- Linu移植随笔:由ts_config:Success想到的