简介

Canal是阿里巴巴基于Java开源的数据同步工具。平时业务场景使用比较多的如下:

同步数据到ES、Redis缓存中。

数据同步。

业务需要监听数据。

图片来源阿里巴巴github仓库:https://github.com/alibaba/canal

配置MySQL

跟配置主从复制类似。

数据执行如下命令:

-- 给canal单独创建用户 用户名:canal 密码:123456
create user 'canal'@'%' identified by '123456';
-- 授权只能查询 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by '123456';

修改数据库配置文件:

vim /etc/mysql/conf.d/my.cnf

my.cnf内容如下:

[mysqld]
# 服务器id(一般都配置成服务器IP)
server_id=156
# 开启日志文件
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW

重启MySQL

查询日志文件名和偏移量

SHOW MASTER STATUS;

centos7 安装 canal

下载地址:https://github.com/alibaba/canal/releases/tag/canal-1.1.5

命令行下载

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

解压

tar -xvf canal.deployer-1.1.5.tar.gz

编辑 conf/example/instance.properties文件

## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0# 数据库地址
canal.instance.master.address=192.168.126.156:3306
# binlog日志名称
canal.instance.master.journal.name=mysql-bin.000007
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=154
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false# table regex .*\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=

启动

./bin/startup.sh

jps 查看进程

[root@localhost canal]# jps
7472 Jps
5371 CanalLauncher

java客户端

pom.xml 依赖

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version>
</dependency>

客户端代码

package com.terry.test;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import static com.alibaba.otter.canal.protocol.CanalEntry.*;
import static com.alibaba.otter.canal.protocol.CanalEntry.EntryType.TRANSACTIONBEGIN;import java.net.InetSocketAddress;
import java.util.List;/*** canal 测试* @author terry* @version 1.0* @date 2022/5/14 22:19*/
public class CanalTest {public static void main(String[] args) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.126.156", 11111), "example", "", "");try {//打开连接connector.connect();//订阅数据库表,全部表connector.subscribe(".*\\..*");//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(100);//获取批量IDlong batchId = message.getId();//获取批量的数量int size = message.getEntries().size();//如果没有数据if (batchId == -1 || size == 0) {System.out.println("等待数据中...");try {//线程休眠2秒Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}} else {//如果有数据,处理数据printEntry(message.getEntries());}//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。connector.ack(batchId);}} catch (Exception e) {e.printStackTrace();} finally {connector.disconnect();}}/*** 打印canal server解析binlog获得的实体类信息*/private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {//开启/关闭事务的实体类型,跳过continue;}//RowChange对象,包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等RowChange rowChage;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}//获取操作类型:insert/update/delete类型EventType eventType = rowChage.getEventType();//打印Header信息System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));//判断是否是DDL语句if (rowChage.getIsDdl()) {System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());}//获取RowChange对象里的每一行数据,打印出来for (RowData rowData : rowChage.getRowDatasList()) {//如果是删除语句if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());//如果是新增语句} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());//如果是更新的语句} else {//变更前的数据System.out.println("------->; before");printColumn(rowData.getBeforeColumnsList());//变更后的数据System.out.println("------->; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}

日志打印输出,当数据库更改后就会接收到消息。

等待数据中...
等待数据中...
================》; binlog[mysql-bin.000007:345] , name[test,t_user_0] , eventType : INSERT
id : 5    update=true
name : 王五    update=true
等待数据中...
等待数据中...

参考文献:

https://blog.csdn.net/yehongzhi1994/article/details/107880162

https://github.com/alibaba/canal

【MySQL高性能】Canal数据同步神器相关推荐

  1. Canal 实现 Mysql数据库实时数据同步

    简介 1.1 canal介绍 Canal是一个基于MySQL二进制日志的高性能数据同步系统.Canal广泛用于阿里巴巴集团(包括https://www.taobao.com),以提供可靠的低延迟增量数 ...

  2. 基于Canal的MySQL=>ES数据同步方案

    文章目录 1.MySQL和ES的主要区别? 1.1 功能性 1.2 性能指标 1.3 在搜索业务上的区别 1.3.1 查询 1.3.2 检索 2.为什么要做数据同步 2.1 检索性能 2.2 写入性能 ...

  3. 使用canal解决Mysql和Redis数据同步问题

    前言 千呼万唤始出来,停了好个月,终于又开始动手写文章了,今天带给大家的是阿里的一个工具Canal,这个工具是企业做数据同步使用的比较多的方案,希望对你有所帮助,喜欢的话请给个好评 工作原理分析 我们 ...

  4. 阿里开源数据同步神器DataX异构数据源间数据同步同步MySQL与HDFS相互实战

    Datax 实战使用 继上一篇 阿里开源数据同步神器DataX异构数据源间数据同步基础介绍与快速入门之后的实战篇 1.MySQL-To-HDFS 环境 & 准备说明: 描述: 为了快速搭建测试 ...

  5. 离线数据同步神器:DataX,支持几乎所有异构数据源的离线同步到MaxCompute

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: 概述 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.SqlSer ...

  6. springboot实现增量备份_SpringBoot canal数据同步解决方案

    SpringBoot canal数据同步解决方案 一.需求 微服务多数据库情况下可以使用canal替代触发器,canal是应阿里巴巴跨机房同步的业务需求而提出的,canal基于数据库的日志解析,获取变 ...

  7. canal - 数据同步工具

    一.应用场景 在前面Echarts - 实现图表显示中,我们使用了服务调用(统计表中的信息通过调用用户模块服务来获取)获取统计信息,这样耦合度高,效率相对较低,目前有另一种方法,通过实时同步数据库表的 ...

  8. MySQL的异构数据同步

    前言 当MySQL操作完数据后如果将数据同步到ElasticSearch中,或者还要同步到MongoDB中,而且操作MySQL和ElasticSearch和MongoDB的分属于多个部门应该怎么办,如 ...

  9. MySQL异构同步_详解MySQL数据库异构数据同步

    本文主要向大家介绍了MySQL数据库异构数据同步,通过具体的内容向大家展现,希望对大家学习MySQL数据库有所帮助. 在实现levelDB挂载成MySQL引擎时,发现在实际存储是key-value格式 ...

  10. mysql 与 es 数据同步常见方案

    mysql 与 es 数据同步常见方案 说明 @author JellyfishMIX - github / blog.jellyfishmix.com LICENSE GPL-2.0 问题背景 最近 ...

最新文章

  1. centos7 python3.6升级到3.7_Centos7 升级python3,解决升级后不兼容问题
  2. DotNetNuke CSS hierarchy
  3. exchange 2013 升级CU15,提示“上次安装完成后没有重启”的提示
  4. etcd 启动分析_Kubernetes网络分析之Flannel
  5. VC实现HTTP协议的GET和POST方法
  6. 区分一下强制类型转换运算符重载/赋值运算符重载/对象定义的赋值
  7. 为单个Web应用程序配置多个上下文根– JBoss
  8. ansj 自定义 停用词_构造自定义停用词列表的快速提示
  9. 使用jQuery来实现一个简单的ajax请求
  10. 经典算法问题——稳定匹配(Stable Matching)
  11. Git 团队协作中常用术语 WIP PTAL CC LGTM 等解释
  12. Shopee卖家如何布局产品合理定价,新手必知的定价策略
  13. 前端练手项目 HTML 游戏叠高塔(包含源码)
  14. 北京慈文影视制作有限公司诉被告北京百度网讯科技有限公司侵犯著作权纠纷一案判决书
  15. 选择粘贴性无html,用好Office的选择性粘贴
  16. DNA计算机及DNA存储
  17. 内容算法解读:提高内容摘要与原文的一致性(Faithfulness)
  18. 安装Linux win双系统 无法正常启动 读不出U盘
  19. 高等数学-曲线积分与曲面积分
  20. 【兴趣书签】十部国产黑暗动画经典

热门文章

  1. nmap扫描常用命令
  2. 初学者如何利用米思齐编写出一套完整的程序
  3. grandMA2onPC控制UE4灯光
  4. ideaIU-2020.3.2安装教程以及导入第一个spring boot项目运行和环境配置教程
  5. 宏观经济模型代码来源 :MMB库
  6. 新规后股指期货开户条件
  7. SQLExpress数据库类型与AttachDbFilename用法
  8. SONiC vs testbed搭建
  9. matlab中如何画柱状图,如何在用Matlab画柱状图
  10. Android--›键盘表情切换的终极解决方案(已重构)