配置mysql

1、mysql开启binlog

mysql默认没有开启binlog,修改mysql的my.cnf文件,添加如下配置,注意binlog-format必须为row,因为binlog如果为STATEMENT或者MIXED,则binlog中记录的是sql语句,不是具体的数据行,canal就无法解析到具体的数据变更了。

log-bin=E:/mysql5.5/bin_log/mysql-bin.log

binlog-format=ROW

server-id=123

更详细的步骤见《MySQL安装》的第3章节。

2、给canal服务器分配一个mysql的账号权限,方便读取mysql的binlog日志

CREATE USER canal IDENTIFIED BY 'canal';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSHPRIVILEGES;

show grantsfor 'canal';

+----------------------------------------------------------------------------------------------------------------------------------------------+

| Grants for canal@%                                                                                                                           |

+----------------------------------------------------------------------------------------------------------------------------------------------+

| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY PASSWORD '*E3619321C1A937C46A0D8BD1DAC39F93B27D4458' |

+----------------------------------------------------------------------------------------------------------------------------------------------+

1 row in set (0.00 sec)

mysql>

Canal安装

1.下载canal安装包:

2.将下载好的安装包复制到Linux,解压

HA机制

canal是支持HA的,其实现机制也是依赖zookeeper来实现的,用到的特性有watcher和EPHEMERAL节点(和session生命周期绑定),与HDFS的HA类似。

canal的ha分为两部分,canal server和canal client分别有对应的ha实现

canal server: 为了减少对mysql dump的请求,不同server上的instance(不同server上的相同instance)要求同一时间只能有一个处于running,其他的处于standby状态(standby是instance的状态)。

canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

server ha的架构图如下:

大致步骤:

canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断(实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)

创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态。

一旦zookeeper发现canal server A创建的instance节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance。

canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。

Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制.

canal配置

canal的模式是这样的,一个canal里面可能会有多个instance,也就说一个instance可以监控一个mysql实例,多个instance也就可以对应多台服务器的mysql实例。也就是一个canal就可以监控分库分表下的多机器mysql。

《1》canal.properties

它是全局性的canal服务器配置,具体如下,这里面的参数涉及到方方面面。

#################################################

######### common argument #############

#################################################

canal.id= 1canal.ip=

canal.port= 11111canal.zkServers=# flush datatozk

canal.zookeeper.flush.period= 1000# flush metacursor/parse position to filecanal.file.data.dir =${canal.conf.dir}

canal.file.flush.period = 1000## memory store RingBuffer size, should be Math.pow(2,n)

canal.instance.memory.buffer.size= 16384## memory store RingBuffer used memory unit size ,default1kb

canal.instance.memory.buffer.memunit= 1024## meory store gets mode used MEMSIZEorITEMSIZE

canal.instance.memory.batch.mode=MEMSIZE

## detecing config

canal.instance.detecting.enable=false

#canal.instance.detecting.sql= insert into retl.xdual values(1,now()) on duplicate key update x=now()

canal.instance.detecting.sql= select 1canal.instance.detecting.interval.time= 3canal.instance.detecting.retry.threshold= 3canal.instance.detecting.heartbeatHaEnable=false

# support maximumtransaction size, more than the size of the transaction will be cut intomultiple transactions delivery

canal.instance.transaction.size = 1024# mysql fallback connectedtonew master should fallback times

canal.instance.fallbackIntervalInSeconds= 60# network config

canal.instance.network.receiveBufferSize= 16384canal.instance.network.sendBufferSize= 16384canal.instance.network.soTimeout= 30# binlog filter config

canal.instance.filter.query.dcl=false

canal.instance.filter.query.dml=false

canal.instance.filter.query.ddl=false

canal.instance.filter.table.error =false

canal.instance.filter.rows=false

# binlog format/image checkcanal.instance.binlog.format=ROW,STATEMENT,MIXED

canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddlisolationcanal.instance.get.ddl.isolation =false

#################################################

######### destinations #############

#################################################

canal.destinations=example

# conf root dir

canal.conf.dir= ../conf

# auto scan instance diradd/remove and start/stop instance

canal.auto.scan=true

canal.auto.scan.interval= 5canal.instance.global.mode=spring

canal.instance.global.lazy=false

#canal.instance.global.manager.address= 127.0.0.1:1099#canal.instance.global.spring.xml= classpath:spring/memory-instance.xml

canal.instance.global.spring.xml= classpath:spring/file-instance.xml

#canal.instance.global.spring.xml= classpath:spring/default-instance.xml

#################################################

## mysql serverId

canal.instance.mysql.slaveId= 1234# position info,需要改成自己的数据库信息

canal.instance.master.address= 127.0.0.1:3306canal.instance.master.journal.name=canal.instance.master.position=canal.instance.master.timestamp =#canal.instance.standby.address=#canal.instance.standby.journal.name=#canal.instance.standby.position=#canal.instance.standby.timestamp =# username/password,需要改成自己的数据库信息

canal.instance.dbUsername=root

canal.instance.dbPassword= 123456canal.instance.defaultDatabaseName=datamip

canal.instance.connectionCharset= UTF-8#tableregex

canal.instance.filter.regex= .*\\..*#################################################

说明:

由于是全局性的配置,所以上面三处标红的地方要注意一下:

canal.port= 11111                 当前canal的服务器端口号

canal.destinations= example      当前默认开启了一个名为example的instance实例,如果想开多个instance,用","逗号隔开就可以了。。。

canal.instance.filter.regex = .*\\..*    mysql实例下的所有db的所有表都在监控范围内。

《2》conf/example/instance.properties

修改配置文件 vi conf/example/instance.properties,

#################################################

## mysql serverId

canal.instance.mysql.slaveId= 1234# position info#设置要监听的mysql服务器的地址和端口

canal.instance.master.address= 127.0.0.1:3306canal.instance.master.journal.name=canal.instance.master.position=canal.instance.master.timestamp =#canal.instance.standby.address=#canal.instance.standby.journal.name=#canal.instance.standby.position=#canal.instance.standby.timestamp =# username/password#设置一个可访问mysql的用户名和密码并具有相应的权限,本示例用户名、密码都为canal

canal.instance.dbUsername= canal

canal.instance.dbPassword =canal

#连接的数据库

canal.instance.defaultDatabaseName=test

canal.instance.connectionCharset= UTF-8#tableregex#订阅实例中所有的数据库和表

canal.instance.filter.regex= .*\\..*#tableblack regex

canal.instance.filter.black.regex=#################################################

上面标红的地方注意下就好了,配置的mysql的账户和密码都是用来访问binlog的。

启动canal

启动 bin/startup.sh 或bin/startup.bat

查看canal日志:

[root@localhost canal]# tail -10f canal.logat com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:175)

at java.lang.Thread.run(Thread.java:748)

Causedby: java.io.IOException: Error When doing Client Authentication:ErrorPacket [errorNumber=1045, fieldCount=-1, message=Access denied for user 'canal'@'localhost' (using password: YES), sqlState=28000, sqlStateMarker=#]at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:199)

at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:74)

...4more

]2019-03-28 17:19:50.328 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position

2019-03-28 17:19:50.328 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare tofind start position just show master status2019-03-28 17:19:50.838 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=4,serverId=1,gtid=,timestamp=1553763617000] cost : 484ms , the next step is binlog dump

查看example日志

[root@localhost logs]# cd example/

[root@localhost example]# tail -10f example.logat com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:175)

at java.lang.Thread.run(Thread.java:748)

Causedby: java.io.IOException: Error When doing Client Authentication:ErrorPacket [errorNumber=1045, fieldCount=-1, message=Access denied for user 'canal'@'localhost' (using password: YES), sqlState=28000, sqlStateMarker=#]at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:199)

at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:74)

...4more

]2019-03-28 17:19:50.328 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position

2019-03-28 17:19:50.328 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare tofind start position just show master status2019-03-28 17:19:50.838 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=4,serverId=1,gtid=,timestamp=1553763617000] cost : 484ms , the next step is binlog dump

表示启动成功,可以在java项目中通过客户端代码进行访问。

客户端代码

新建一个canal-example的springboot工程

添加canal依赖

com.alibaba.otter

canal.client

1.1.2

代码

packagecom.dxz.canalexample;importcom.alibaba.otter.canal.client.CanalConnector;importcom.alibaba.otter.canal.client.CanalConnectors;importcom.alibaba.otter.canal.common.utils.AddressUtils;importcom.alibaba.otter.canal.protocol.CanalEntry;importcom.alibaba.otter.canal.protocol.Message;importcom.google.protobuf.InvalidProtocolBufferException;importjava.net.InetSocketAddress;importjava.util.HashMap;importjava.util.List;importjava.util.Map;public classCanalClient {public static voidmain(String[] args) {while (true) {//连接canal

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.200.110.93", 11111), "example", "canal", "canal");

connector.connect();//订阅 监控的 数据库.表

connector.subscribe("test.t_base_daily");//一次取5条

Message msg = connector.getWithoutAck(5);long batchId =msg.getId();int size =msg.getEntries().size();if (batchId < 0 || size == 0) {

System.out.println("没有消息,休眠5秒");try{

Thread.sleep(5000);

}catch(InterruptedException e) {

e.printStackTrace();

}

}else{// CanalEntry.RowChange row = null;for(CanalEntry.Entry entry : msg.getEntries()) {try{

row=CanalEntry.RowChange.parseFrom(entry.getStoreValue());

List rowDatasList =row.getRowDatasList();for(CanalEntry.RowData rowdata : rowDatasList) {

List afterColumnsList =rowdata.getAfterColumnsList();

Map dataMap =transforListToMap(afterColumnsList);if (row.getEventType() ==CanalEntry.EventType.INSERT) {//具体业务操作

System.out.println(dataMap);

}else if (row.getEventType() ==CanalEntry.EventType.UPDATE) {//具体业务操作

System.out.println(dataMap);

}else if (row.getEventType() ==CanalEntry.EventType.DELETE) {

List beforeColumnsList =rowdata.getBeforeColumnsList();for(CanalEntry.Column column : beforeColumnsList) {if ("id".equals(column.getName())) {//具体业务操作

System.out.println("删除的id:" +column.getValue());

}

}

}else{

System.out.println("其他操作类型不做处理");

}

}

}catch(InvalidProtocolBufferException e) {

e.printStackTrace();

}

}//确认消息

connector.ack(batchId);

}

}

}public static Map transforListToMap(ListafterColumnsList) {

Map map= newHashMap();if (afterColumnsList != null && afterColumnsList.size() > 0) {for(CanalEntry.Column column : afterColumnsList) {

map.put(column.getName(), column.getValue());

}

}returnmap;

}

}

启动:

打开mysql客户端,修改一行记录:

再回到canal的运行控制台

canal下载 linux_Canal入门相关推荐

  1. canal下载 linux_canal实时同步mysql数据到redis或ElasticSearch

    一.Canal架包下载上传 (一)下载 官网架包地址为:https://github.com/alibaba/canal/releases/tag/canal-1.1.5-alpha-2 本人百度云盘 ...

  2. 基于STM32MP157的鸿蒙学习(一)— 资料下载及入门

    基于STM32MP157的鸿蒙学习(一)- 资料下载及入门 一.前言 本系列学习根据韦东山老师的鸿蒙课程,使用的开发板也是百问网的100ASK_STM32MP157 Pro开发板. 其核心板资源如下所 ...

  3. 嵌入式linux单片机视频教程下载从入门到精通分享

    嵌入式linux单片机视频教程下载从入门到精通分享 视频教程下载地址

  4. smarty下载及入门教程(转)

    smarty下载及入门教程(转) 2009-06-24 09:46 smarty下载: Smarty 2.6.18 Source March 7th, 2007 Smarty 2.6.17 Sourc ...

  5. canal下载 linux_阿里canal数据库同步ES使用

    canal的概念这里我就不多说了,以下附上git链接查看:https://github.com/alibaba/canal 随笔记录一下我遇到的问题与修改,这里主要讲如何正确的配置: 首先,去官网下载 ...

  6. python入门ppt下载_Python3入门ppt

    PPT内容 这是Python3入门ppt,包括了Python概述,变量和内建数据类型,语句,函数,标准库模块,程序调试等内容,欢迎点击下载. Python3入门 邓笛 目录 关于Python 一种编程 ...

  7. 3Dmax在哪里下载 |3Dmax入门学习教程有哪些!!想学习的你还在等什么?

    说到3Dmax下载是很多刚接触3Dmax新人的痛处,在官网下载的话,需要花费很大一笔钱,在没有接触这个行业时候,就先为3Dmax这个软件花费出巨大金钱,所以很多人不会去选择.此外在一些其他网站上下载的 ...

  8. python基础教程视频下载-Python入门到精通视频教程下载[21课程全]

    Python入门到精通视频教程 初级共21节课 python编程入门,针对0基础就python语言基础语法的各个点逐步讲解,由浅入深,通俗易懂,层层深入.提取码: python编程入门,针对0基础就p ...

  9. python 600集下载_Python入门教程完整版(懂中文就能学会)(600集)-站长资讯中心...

    目录大纲: 本套教程15天 学前环境搭建 1-3   天内容为Linux基础命令 4-13  天内容为Python基础教程 14-15 天内容为 飞机大战项目演练 视频概括: 第一阶段(1-3天): ...

最新文章

  1. k8s mysql operator_将 MySQL 通过 presslabs/mysql-operator 部署到 k8s 内部
  2. 设计模式之美:Facade(外观)
  3. 20行python代码的入门级小游戏-python实现石头剪刀布小游戏
  4. 【Android 异步操作】线程池 ( 线程池使用示例 | 自定义线程池使用流程 | 自定义任务拒绝处理策略 | 完整代码示例 )
  5. 英语听力里面的religion words
  6. geometry-api-java 学习笔记(七)拓扑运算之cut
  7. word List 07
  8. 带有Java DSL的Spring Integration MongoDB适配器
  9. ES6 各浏览器支持情况
  10. uni的numberbox怎么用_jQuery EasyUI表单插件Numberbox数字框
  11. lasted是什么意思_lasted是什么意思_lasted怎么读_lasted翻译_用法_发音_词组_同反义词_继续存在( last的过去式和过去分词 )-新东方在线英语词典...
  12. 浪潮激荡大时代,存储起航新十年
  13. 用r语言画时序图和自相关图,并检验纯随机性
  14. (邱维声)高等代数课程笔记:数域
  15. ESP32----NVS使用
  16. 计算机在机械智能制造中的应用,数控技术在智能制造中的应用及发展
  17. 电路设计基础--三极管驱动直流电机电路
  18. Google在线翻译(WinForm版)
  19. Amesim学习——RC电路仿真
  20. QT QSerialPort 编写串口通讯案例 设计一些基础的ui界面

热门文章

  1. VMware Workstation 16 pro安装GhostBSD 21解决虚拟驱动问题备忘录
  2. createfile 无权限_访问被拒绝CreateFileMapping在不同的用户帐户下运行时用runas
  3. WebRTC 简单入门与实践
  4. android.mk 添加v7_在Android上以命令行方式移植FFmpeg
  5. Python 实现海康机器人工业相机 MV-CU060-10GM 的实时显示视频流及拍照功能
  6. Android SDK Manager资源下载
  7. 鸿蒙是不是基于安卓系统,鸿蒙系统是基于安卓吗?鸿蒙系统和安卓的区别?
  8. 为什么 Go 语言没有泛型
  9. 安装ubuntu出现花屏_ubuntu安装成功,开机花屏问题
  10. java cloneable 接口_JDK源码阅读笔记-Cloneable接口