1 什么是canal

  canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)

2 canal使用场景

  (1)阿里otter(阿里用于进行异地数据库之间的同步框架)中间件的一部分,这是原始场景

  (2)更新缓存:如果有大量的请求发送到mysql的话,mysql查询速度慢,QPS上不去,光查mysql可能会瘫痪,那就可以在前面加个缓存,这个缓存有2个主要的问题。一是缓存没有怎么办,二是数据不一致怎么办。对于第一个问题查缓存没有就差mysql,mysql再往缓存中写一份。对于第二个问题,如果数据库修改了,那就采用异步的方式进行修改,启动一个canal服务,监控mysql,只要一有变化就同步缓存,这样mysql和缓存就能达到最终的一致性。

  (3)抓取业务数据新增变化表,用于制作拉链表:做拉链表是需要有增加时间和修改时间的,需要数据今天新增和变化的数据,如果时间不全就没办法知道哪些是修改的。可以通过canal把变化的抽到自己的表里,以后数据就从这个表出。

  (4)取业务表的新增变化数据,用于制作实时统计

3 canal工作原理

  首先了解一下mysql主备复制原理:

  (1)master主库将改变记录,发送到二进制文件(binary log)中

  (2)slave从库向mysql Master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log)

  (3)slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库

  canal的工作原理:把自己伪装成slave,从master复制数据。读取binlog是需要master授权的,因为binlog是加密的,授权分用户名密码才能读。master授权后不知道读他的binlog的是从机还是canal,他的所有传输协议都符合从机的标准,所以master一直以为是从机读的。

4 mysql的binlog

4.1 二进制日志

  mysql的二进制日志记录了所有的DDL和DML(除了数据查询语句),以事件的形式进行记录,包含语句执行消耗的时间,mysql的二进制日志是事务安全型的。

  开启二进制日志大概会有1%的性能损坏。二进制日志有2个主要的使用场景:①mysql的主备复制②数据恢复,通过使用mysqlbinlog工具来恢复数据(用这个做恢复是备选方案,主方案还是定期快照,定期执行脚本导数据,其实就是把当前所有数据导成insert,这个量少)

  二进制日志包括2类文件:①二进制日志索引文件(后缀为.index)用于记录所有的二进制文件②二进制日志文件(后缀为.00000*)记录数据库所有的DDL和DML(除了数据查询语句)

4.2 开启binlog

  修改mysql的配置文件my.cnf。

# vim /etc/my.cnfgG

  在[mysqld] 区块 添加

log-bin=mysql-bin

  mysql-bin表示binlog日志的前缀,以后生成的的日志文件就是 mysql-bin.000001 的文件后面的数字按顺序生成。 当mysql重启或到达单个文件大小的阈值时,新生一个文件,按顺序编号。

4.3 binlog分类

  binlog的格式有三种:STATEMENT,MIXED,ROW对比如下

格式 描述 优点
STATEMENT 语句级别,记录每一次执行写操作的语句,相对于ROW模式节省了空间,但是可能产生数据不一致如update tt set create_date=now(),由于执行时间不同产生饿得数据就不同 节省空间 可能造成数据不一致
ROW 行级,记录每次操作后每行记录的变化。假如一个update的sql执行结果是1万行statement只存一条,如果是row的话会把这个1000行的结果存这。 持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果 占用较大空间
MIXED 是对statement的升级,如当函数中包含 UUID() 时,包含 AUTO_INCREMENT 字段的表被更新时,执行 INSERT DELAYED 语句时,用 UDF 时,会按照 ROW的方式进行处理 节省空间,同时兼顾了一定的一致性 还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便

4.4 binlog格式选择

  如果只考虑主从复制的话可以用mixed,一般情况下使用statement,遇到几种特殊情况使用row,同步的话有SQL就行,因为手里有数据,前提是有数据才能执行这个SQL。在大数据场景下我们抽取数据是用于统计分析,分析的数据,如果用statement抽了SQL手里也没数据,不知道执行修改哪些,因为没有数据,所以没办法分析,所以适合用row,清清楚楚的表明了每一行是什么样。

4.5 修改配置文件

  修改my.cnf文件,在[mysqld]模块下添加如下内容

server-id= 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=bigdata

  binlog-do-db用于指定库,缩小监控的范围,server-id不能和mysql集群的其他节点重复

4.6 重启mysql

# service mysqld restart
Redirecting to /bin/systemctl restart mysqld.service

  到数据目录下查询是否生成binlog文件,这里我把数据目录自定义为了/data/mysql/

# cd /data/mysql/
# ll
total 188500
-rw-r----- 1 mysql mysql       56 Jul  1  2020 auto.cnf
-rw------- 1 mysql mysql     1676 Jul  1  2020 ca-key.pem
-rw-r--r-- 1 mysql mysql     1112 Jul  1  2020 ca.pem
-rw-r--r-- 1 mysql mysql     1112 Jul  1  2020 client-cert.pem
-rw------- 1 mysql mysql     1676 Jul  1  2020 client-key.pem
drwxr-x--- 2 mysql mysql     4096 Jul  1  2020 dataxweb
-rw-r----- 1 mysql mysql      526 Jan 14 11:03 ib_buffer_pool
-rw-r----- 1 mysql mysql 79691776 Jan 14 11:04 ibdata1
-rw-r----- 1 mysql mysql 50331648 Jan 14 11:04 ib_logfile0
-rw-r----- 1 mysql mysql 50331648 Aug  5 06:20 ib_logfile1
-rw-r----- 1 mysql mysql 12582912 Jan 14 11:04 ibtmp1
drwxr-x--- 2 mysql mysql      116 Jul  1  2020 iot
drwxr-x--- 2 mysql mysql     4096 Jul  1  2020 mysql
-rw-r----- 1 mysql mysql      154 Jan 14 11:03 mysql-bin.000001
-rw-r----- 1 mysql mysql       19 Jan 14 11:03 mysql-bin.index
srwxrwxrwx 1 mysql mysql        0 Jan 14 11:03 mysql.sock
-rw------- 1 mysql mysql        6 Jan 14 11:03 mysql.sock.lock
drwxr-x--- 2 mysql mysql     8192 Jul  1  2020 performance_schema
-rw------- 1 mysql mysql     1680 Jul  1  2020 private_key.pem
-rw-r--r-- 1 mysql mysql      452 Jul  1  2020 public_key.pem
-rw-r--r-- 1 mysql mysql     1112 Jul  1  2020 server-cert.pem
-rw------- 1 mysql mysql     1676 Jul  1  2020 server-key.pem
drwxr-x--- 2 mysql mysql     8192 Jul  1  2020 sys

  可以发现,这二进制日志索引文件和日志文件生成了。只要重启mysql,mysql-bin后面的序号就会往上涨,他的切分规则就是重启或者到一个大小的阈值,就会切一个

mysql-bin.000001
mysql-bin.index

5 安装canal

5.1 下载地址

https://github.com/alibaba/canal/releases

5.2 mysql为canal配置权限

  在mysql中给canal单独建一个用户,给全库全表的读,拷贝,复制的权限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;

  报错:ERROR 1819 (HY000): Your password does not satisfy the current policy requirements

原因是因为密码设置的过于简单会报错,MySQL有密码设置的规范,具体是与validate_password_policy的值有关,下图表明该值规则

  查看MySQL完整的初始密码规则,登陆后执行以下命令

mysql> SHOW VARIABLES LIKE 'validate_password%';
+--------------------------------------+--------+
| Variable_name                        | Value  |
+--------------------------------------+--------+
| validate_password_check_user_name    | OFF    |
| validate_password_dictionary_file    |        |
| validate_password_length             | 8      |
| validate_password_mixed_case_count   | 1      |
| validate_password_number_count       | 1      |
| validate_password_policy             | MEDIUM |
| validate_password_special_char_count | 1      |
+--------------------------------------+--------+

 密码的长度是由validate_password_length决定的,但是可以通过以下命令修改

set global validate_password_length=4;

  validate_password_policy决定密码的验证策略,默认等级为MEDIUM(中等),可通过以下命令修改为LOW(低)

set global validate_password_policy=0;

  重新执行

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;

5.3 解压及配置

$ tar -zxvf canal.deployer-1.1.4.tar.gz

  配置说明:canal server的conf下有几个配置文件

conf/
├── canal_local.properties
├── canal.properties
├── example
│   ├── h2.mv.db
│   ├── instance.properties
│   └── meta.dat
├── logback.xml
├── metrics
│   └── Canal_instances_tmpl.json
└── spring├── base-instance.xml├── default-instance.xml├── file-instance.xml├── group-instance.xml├── memory-instance.xml└── tsdb├── h2-tsdb.xml├── mysql-tsdb.xml├── sql│   └── create_table.sql└── sql-map├── sqlmap-config.xml├── sqlmap_history.xml└── sqlmap_snapshot.xml

canal.properties的common属性前四个配置项:

canal.id= 1             #canal的编号,在集群环境下,不同canal的id不同,注意它和mysql的server_id不同。
canal.ip=               # ip这里不指定,默认为本机
canal.port= 11111       # 端口号,是给tcp模式(netty)时候用的,如果用了kafka或者rocketmq,就不会去起这个端口了
canal.zkServers=         # zk用于canal cluster
canal.serverMode = tcp   # 用于指定什么模式拉取数据

destinations相关的配置:

#################################################
#########       destinations        #############
#################################################
canal.destinations = example
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/file-instance.xml

canal.destinations = example可以设置多个,比如example1,example2,则需要创建对应的两个文件夹,并且每个文件夹下都有一个instance.properties文件。

全局的canal实例管理用spring,这里的file-instance.xml最终会实例化所有的destinations instances:

        <bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring"><property name="destination" value="${canal.instance.destination}" /><property name="eventParser"><ref local="eventParser" /></property><property name="eventSink"><ref local="eventSink" /></property><property name="eventStore"><ref local="eventStore" /></property><property name="metaManager"><ref local="metaManager" /></property><property name="alarmHandler"><ref local="alarmHandler" /></property><property name="mqConfig"><ref local="mqConfig" /></property></bean>

如canal.instance.destination等于example,就会加载example/instance.properties配置文件

修改instance 配置文件

vi conf/example/instance.properties

#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=10.0.165.1:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
#################################################

6 canal的instance与消费方式

  canal.properties这个配置文件负责的是canal服务的基础配置,每个canal可以起n多个实例instance,一个instance代表一个线程,每个instance都有一个独立的配置文件instance.properties,不同的instance可以采集不同的mysql数据库,也就是一个canal可以对应多个mysql数据库。

  在instance里面有一个小队列,可以理解为是jvm级的队列,instance抓取来的数据先放入到队列中,队列可以有很多出口:①一个是canal server自己主动把数据推送到kafka,这个比较简单,一行代码不用写,只需要配个kafka的地址,每个instance对应kafka的一个topic,数据是json串。这种方式虽然简单,但是他的缺点主要体现在2个方面,一个instance对应一个topic,所有表都在这一个topic,所以实时的时候要进行分流。另一方面,因为数据是json,并且携带了很多冗余信息,但是数据量大的时候传输效率比较低。②第二种方式是启动canal客户端主动去拉取数据,可以定义多长周期消费多少数据。他的缺点在于抓取出来的是序列化压缩的数据,所以需要反序列化,解压,比较麻烦。他的优点在于我们可以进行压缩,过滤掉没用的冗余信息,只保留我们需要的信息,提交传输效率。

$ ll
total 16
-rwxrwxr-x 1 canal canal  291 Sep  2  2019 canal_local.properties
-rwxrwxr-x 1 canal canal 5202 Jan 14 12:10 canal.properties
drwxrwxr-x 2 canal canal   33 Jan 14 12:15 example
-rwxrwxr-x 1 canal canal 3119 Sep  2  2019 logback.xml
drwxrwxr-x 2 canal canal   39 Jan 14 12:00 metrics
drwxrwxr-x 3 canal canal  149 Jan 14 12:00 spring

  一个example的目录就是一个instance,canal要配置多个实例采集多个数据源mysql的话如下配置,然后把conf目录下example复制多份,分别重命名。如下

#################################################
#########               destinations            #############
#################################################
canal.destinations = example1,example2,example3

7 canal server主动推送数据

7.1 配置

  修改配置vim conf/canal.properties:这个是总配置,端口号,服务器参数,kafka地址,zookeeper地址(高可用)等

  修改如下内容,这个zookeeper是配置高可用的,配置采用kafka方式,kafka的地址

canal.zkServers = 10.0.165.4:2181,10.0.165.5:2181,10.0.165.6:2181
canal.serverMode = kafka
canal.mq.servers = 10.0.165.8:9092,10.0.165.9:9092

  修改配置vim conf/example/instance.properties针对要追踪的mysql的实例配置:一个instance实例对应一个数据库(这个是指数据库服务器)服务器的binlog。所以一个instance具体采集几个数据库是binlog定的和canal没关系,canal不管,canal就把binlog里面有什么就采集,不管是一个数据库还是多个,只要在一个binlog都采集

  修改如下内容,配置用户名,密码,地址。canal.mq.partitionsNum这个是发送到第几个分区

canal.instance.master.address=10.0.165.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.mq.topic=ods_bigdata_mysql

7.2 启动canal

./bin/startup.sh

7.3 测试

  启动canal后,在kafka创建topic

bin/kafka-topics.sh --create --zookeeper 10.0.165.4:2181 --replication-factor 2 --partitions 12 --topic ods_bigdata_mysql

  到kafka目录下开销消费端查询是否有数据

bin/kafka-console-consumer.sh --bootstrap-server  10.0.165.8:9092,10.0.165.9:9092 --topic  ods_bigdata_mysql

  (1)往需要采集的库中的user_info表插入一条数据数据

  执行sql

insert  into user_info values (10001,'test','test',NULL,'test','11111111111','111@gmail.com',NULL,'3','1999-09-09','F','2020-02-02 02:02:02',NULL)

  可以看到kafka消费出了如下一条数据

{"data":[{"id":"10001","login_name":"test","nick_name":"test","passwd":null,"name":"test","phone_num":"11111111111","email":"111@gmail.com","head_img":null,"user_level":"3","birthday":"1999-09-09","gender":"F","create_time":"2020-02-02 02:02:02","operate_time":null}],"database":"bigdata","es":1610676724000,"id":2,"isDdl":false,"mysqlType":{"id":"bigint(20)","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93},"table":"user_info","ts":1610676724288,"type":"INSERT"}

  (2)更新前面插入的这条数据

  执行sql

UPDATE user_info SET name="update" WHERE id=10001

  kafka消费出的数据如下

{"data":[{"id":"10001","login_name":"test","nick_name":"test","passwd":null,"name":"update","phone_num":"11111111111","email":"111@gmail.com","head_img":null,"user_level":"3","birthday":"1999-09-09","gender":"F","create_time":"2020-02-02 02:02:02","operate_time":null}],"database":"bigdata","es":1610676928000,"id":3,"isDdl":false,"mysqlType":{"id":"bigint(20)","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime"},"old":[{"name":"test"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93},"table":"user_info","ts":1610676928644,"type":"UPDATE"}

  (3)删除前面插入的这条数据

  执行sql

DELETE FROM user_info WHERE id=10001

  kafka消费出的数据如下

{"data":[{"id":"10001","login_name":"test","nick_name":"test","passwd":null,"name":"update","phone_num":"11111111111","email":"111@gmail.com","head_img":null,"user_level":"3","birthday":"1999-09-09","gender":"F","create_time":"2020-02-02 02:02:02","operate_time":null}],"database":"bigdata","es":1610677003000,"id":4,"isDdl":false,"mysqlType":{"id":"bigint(20)","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93},"table":"user_info","ts":1610677003637,"type":"DELETE"}

8 canal主动拉取数据客户端

8.1 修改配置

  修改canal.properties,zookeeper配置高可用,配置采用tcp方式

canal.zkServers = 10.0.165.4:2181,10.0.165.5:2181,10.0.165.6:2181
canal.serverMode = tcp

  注意:需要修改canal.proerties的canal.serverMode为tcp否则不会启动11111端口

  修改instance.properties,配置用户名,密码,地址。

canal.instance.master.address=10.0.165.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

  重新启动后查看11111端口是否被占用

[canal@fbi-local-02 bin]$ lsof -i:11111
COMMAND   PID  USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
java    38516 canal  108u  IPv4 4281763      0t0  TCP fbi-local-02:vce (LISTEN)

8.2 将binlog转换为ProtoBuf消息

  (1)编写proto描述文件CanalBinLog.proto

syntax = "proto3";
option java_package = "com.quinto.canal";
option java_outer_classname = "CanalBinLog";/* 行数据 */
message RowData {uint64 executeTime = 1;string schemaName = 2;string tableName = 3;string eventType = 4;/* 列数据 */map<string, string> columns = 5;uint64 logfileoffset = 14;string logfilename = 15;
}

  (2)canal客户端代码编写

  导入依赖

    <properties><protobuf.version>3.5.0</protobuf.version><kafka.client.version>1.0.0</kafka.client.version><kafka.version>0.11.0.2</kafka.version><canal.version>1.1.4</canal.version></properties><dependencies><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>${canal.version}</version></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>${protobuf.version}</version></dependency><!-- kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.client.version}</version></dependency></dependency>

  ①工具类

  读取配置文件工具类

package com.quinto.utils;import java.io.IOException;
import java.util.Properties;/*** 读取config.properties配置文件的工具类*/
public class ConfigUtil {// 定义一个properties对象public static Properties properties;// 定义一个静态代码块,只执行一次static {try {properties = new Properties();properties.load(ConfigUtil.class.getClassLoader().getResourceAsStream("config.properties"));} catch (IOException e) {e.printStackTrace();}}public static String canalServerIp() {return properties.getProperty("canal.server.ip");}public static int canalServerPort() {return Integer.parseInt(properties.getProperty("canal.server.port"));}public static String canalServerDestination() {return properties.getProperty("canal.server.destination");}public static String canalServerUsername() {return properties.getProperty("canal.server.username");}public static String canalServerPassword() {return properties.getProperty("canal.server.password");}public static String canalSubscribeFilter() {return properties.getProperty("canal.subscribe.filter");}public static String zookeeperServerIp() {return properties.getProperty("zookeeper.server.ip");}public static String kafkaBootstrap_servers_config() {return properties.getProperty("kafka.bootstrap_servers_config");}public static int kafkaBatch_size() {return Integer.parseInt(properties.getProperty("kafka.batch_size"));}public static String kafkaAcks() {return properties.getProperty("kafka.acks");}public static String kafkaRetries() {return properties.getProperty("kafka.retries");}public static String kafkaBatch() {return properties.getProperty("kafka.batch");}public static String kafkaClient_id_config() {return properties.getProperty("kafka.client_id_config");}public static String kafkaKey_serializer_class_config() {return properties.getProperty("kafka.key_serializer_class_config");}public static String kafkaValue_serializer_class_config() {return properties.getProperty("kafka.value_serializer_class_config");}public static String kafkaTopic() {return properties.getProperty("kafka.topic");}public static void main(String[] args) {System.out.println(kafkaTopic());}
}

  配置文件

# canal配置
canal.server.ip=10.0.165.2
canal.server.port=11111
canal.server.destination=example
canal.server.username=canal
canal.server.password=canal
canal.subscribe.filter=bigdata.*# zookeeper配置
zookeeper.server.ip=10.0.165.4:2181,10.0.165.5:2181,10.0.165.6:2181# kafka配置
# kafka集群地址
kafka.bootstrap_servers_config=10.0.165.8:9092,10.0.165.9:9092
# 配置批次发送数据的大小,满足批次大小才会发送数据
kafka.batch_size= 10240
# ack
kafka.acks=all
# 重试次数
kafka.retries=2
kafka.client_id_config=quinto_canal
# kafka的key序列化
kafka.key_serializer_class_config=org.apache.kafka.common.serialization.StringSerializer
# kafka的value序列化,自定义开发
kafka.value_serializer_class_config=com.quinto.protobuf.ProtoBufSerializer
# 数据写入到kafka的哪个topic中
kafka.topic=ods_canal_mysql

  kafka工具类

package com.quinto.utils;import com.quinto.bean.CanalRowData;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** kafka工具类*/
public class KafkaUtil {public static KafkaProducer getKafkaProducer(){// 定义一个properties对象接收参数Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ConfigUtil.kafkaBootstrap_servers_config());properties.put(ProducerConfig.BATCH_SIZE_CONFIG, ConfigUtil.kafkaBatch_size());properties.put(ProducerConfig.ACKS_CONFIG, ConfigUtil.kafkaAcks());properties.put(ProducerConfig.RETRIES_CONFIG, ConfigUtil.kafkaRetries());properties.put(ProducerConfig.CLIENT_ID_CONFIG, ConfigUtil.kafkaClient_id_config());properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ConfigUtil.kafkaKey_serializer_class_config());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ConfigUtil.kafkaValue_serializer_class_config());//实例化生产者对象并返回,key使用默认的String序列化范式,value采用自定义的序列化方式,这个序列化需要传递一个Protobufable的子类return new KafkaProducer<String, CanalRowData>(properties);}/*** 传递参数,将数据写入到kafka集群* @param rowData*/public static void send(KafkaProducer kafkaProducer,CanalRowData rowData){kafkaProducer.send(new ProducerRecord<>(ConfigUtil.kafkaTopic(), rowData));}
}

  ②自定义kafka序列化类

package com.quinto.protobuf;import org.apache.kafka.common.serialization.Serializer;import java.util.Map;/*** 实现kafka的value的自定义序列化对象* 要求传递的泛型必须是集成ProtoBufabl接口的实现列,才可以被序列化成功*/
public class ProtoBufSerializer implements Serializer<ProtoBufable> {@Overridepublic void configure(Map<String, ?> map, boolean b) {}@Overridepublic byte[] serialize(String s, ProtoBufable data) {return data.toBytes();}@Overridepublic void close() {}
}

  ③protobuf序列化接口,所有能够使用protobuf序列化的bean都需要集成这个接口

package com.quinto.protobuf;/*** 定义protobuf序列化接口,返回的是byte[]二进制对象,所有能够使用protobuf序列化的bean都需要集成这个接口*/
public interface ProtoBufable {/*** 将对象转换成二进制数组* @return*/byte[] toBytes();
}

  ④canal数据的Protobuf的实现类

package com.quinto.bean;import com.quinto.protobuf.CanalBinLog;
import com.quinto.protobuf.ProtoBufable;import java.util.Map;/*** canal数据的Protobuf的实现类,使用protobuf序列化成bean对象。* 用于将binlog解析后的map对象转换成protobuf序列化后的字节码数据,写入kafka集群*/
public class CanalRowData implements ProtoBufable {private String logfileName;private Long logfileOffset;private Long executeTime;private String schemaName;private String tableName;private String eventType;private Map<String, String> columns;public String getLogfileName() {return logfileName;}public void setLogfileName(String logfileName) {this.logfileName = logfileName;}public Long getLogfileOffset() {return logfileOffset;}public void setLogfileOffset(Long logfileOffset) {this.logfileOffset = logfileOffset;}public Long getExecuteTime() {return executeTime;}public void setExecuteTime(Long executeTime) {this.executeTime = executeTime;}public String getSchemaName() {return schemaName;}public void setSchemaName(String schemaName) {this.schemaName = schemaName;}public String getTableName() {return tableName;}public void setTableName(String tableName) {this.tableName = tableName;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public Map<String, String> getColumns() {return columns;}public void setColumns(Map<String, String> columns) {this.columns = columns;}/*** 构造方法中解析map对象的binlog日志*/public CanalRowData(Map map){//解析map对象中所有的参数if(map.size()>0){this.logfileName = map.get("logfileName").toString();this.logfileOffset = Long.parseLong(map.get("logfileOffset").toString());this.executeTime = Long.parseLong(map.get("executeTime").toString());this.schemaName = map.get("schemaName").toString();this.tableName = map.get("tableName").toString();this.eventType = map.get("eventType").toString();this.columns = (Map<String, String>)map.get("columns");}}/*** 将map对象解析出来的参数,赋值给protobuf对象,然后序列化后字节码返回* @return*/@Overridepublic byte[] toBytes() {CanalBinLog.RowData.Builder builder = CanalBinLog.RowData.newBuilder();builder.setLogfileName(this.getLogfileName());builder.setLogfileOffset(this.getLogfileOffset());builder.setExecuteTime(this.getExecuteTime());builder.setSchemaName(this.getSchemaName());builder.setTableName(this.getTableName());builder.setEventType(this.getEventType());for (String key : this.getColumns().keySet()) {builder.putColumns(key, this.getColumns().get(key));}//将传递的binlog数据解析后序列化成字节码数据返回return builder.build().toByteArray();}
}

  ⑤canal客户端类

package com.quinto.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import com.quinto.bean.CanalRowData;
import com.quinto.utils.ConfigUtil;
import com.quinto.utils.KafkaUtil;
import org.apache.kafka.clients.producer.KafkaProducer;import java.util.HashMap;
import java.util.List;
import java.util.Map;public class CanalClient {// Canal客户端连接器private CanalConnector canalConnector;// kafka生产者工具类private KafkaProducer kafkaProducer;public CanalClient(){// 在构造方法中初始化连接与kafka工具类kafkaProducer = KafkaUtil.getKafkaProducer();}public void statrt() {// 1 创建连接并建立连接,连接的是高可用集群System.out.println(ConfigUtil.zookeeperServerIp()+ConfigUtil.canalServerDestination()+ConfigUtil.canalServerUsername()+ConfigUtil.canalServerPassword());canalConnector = CanalConnectors.newClusterConnector("10.0.165.4:2181","example", "canal", "canal");// 不停拉取的标识boolean isFetching = true;// 建立连接try {canalConnector.connect();// 回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿canalConnector.rollback();// 2 订阅主题canalConnector.subscribe(ConfigUtil.canalSubscribeFilter());// 不停的拉取数据while (isFetching){// 3 获取数据,尝试拿batchSize条记录,有多少取多少,不会阻塞等待Message message = canalConnector.getWithoutAck(ConfigUtil.kafkaBatch_size());// 获取这个批次的idlong batchId = message.getId();// 获取拉取到的日志数据总数int size = message.getEntries().size();// 判断是否又获取到数据if (batchId == -1 | size == 0){System.out.println("没有抓取到数据");Thread.sleep(1000);}else {System.out.println("发送数据:"+ message);// 将binlog日志解析成Map对象Map map = binlogToMap(message);// 将map对象序列化成protobuf格式写入到kafka中CanalRowData canalRowData = new CanalRowData(map);// 有数据将数据发送到kafka集群if(map.size()>0){KafkaUtil.send(kafkaProducer,canalRowData);}}// 4 提交确认// 提交确认,进行batch id的确认,确认之后,小于等于此 batchId 的 Message 都会被确认。canalConnector.ack(batchId);}} catch (Exception e) {e.printStackTrace();} finally {// 5 关闭连接canalConnector.disconnect();}}private Map binlogToMap(Message message) throws InvalidProtocolBufferException {Map rowDataMap = new HashMap();// 构建CanalClient.RowData实体
//        CanalBinLog.RowData.Builder builder = CanalBinLog.RowData.newBuilder();// 遍历message中的所有binlog实体for (CanalEntry.Entry entry: message.getEntries()){// 只处理事务型的binlogif(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){continue;}// 获取文件名String logfileName = entry.getHeader().getLogfileName();// 获取logfile的偏移量long logfileOffset = entry.getHeader().getLogfileOffset();// 获取sql语句的执行时间戳long executeTime = entry.getHeader().getExecuteTime();// 获取数据库名称String schemaName = entry.getHeader().getSchemaName();// 获取表名String tableName = entry.getHeader().getTableName();// 获取事件类型 insert/update/deleteString eventType = entry.getEntryType().toString().toLowerCase();rowDataMap.put("logfileName", logfileName);rowDataMap.put("logfileOffset", logfileOffset);rowDataMap.put("executeTime", executeTime);rowDataMap.put("schemaName", schemaName);rowDataMap.put("tableName", tableName);rowDataMap.put("eventType", eventType);// 封装列数据HashMap<String, String> columnDataMap = new HashMap<>();// 获取所有行上的变更CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList){if(eventType.equals("insert") || eventType.equals("update")){for (CanalEntry.Column column : rowData.getAfterColumnsList()){columnDataMap.put(column.getName(), column.getValue());}}else if(eventType.equals("delete")) {for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {columnDataMap.put(column.getName(), column.getValue());}}}rowDataMap.put("columns",columnDataMap);}return rowDataMap;}
}

  ⑥入口类

package com.quinto;import com.quinto.canal.CanalClient;public class App {public static void main(String[] args) {// 实例化canal客户端对象,调用start方法拉取canalserver的binlog日志发送到kafka集群CanalClient canalClient = new CanalClient();canalClient.statrt();}
}

8.3 测试

  (1)往user_info表插入一条数据

insert  into user_info values (10001,'test','test',NULL,'test','11111111111','111@gmail.com',NULL,'3','1999-09-09','F','2020-02-02 02:02:02',NULL)

  canal client从canal server拉取到的数据如下

发送数据:Message[id=8,entries=[header {version: 1logfileName: "mysql-bin.000002"logfileOffset: 4615211serverId: 1serverenCode: "UTF-8"executeTime: 1610943288000sourceType: MYSQLschemaName: ""tableName: ""eventLength: 75
}
entryType: TRANSACTIONBEGIN
storeValue: " `"
, header {version: 1logfileName: "mysql-bin.000002"logfileOffset: 4615374serverId: 1serverenCode: "UTF-8"executeTime: 1610943288000sourceType: MYSQLschemaName: "bigdata"tableName: "user_info"eventLength: 105eventType: INSERTprops {key: "rowsCount"value: "1"}
}
entryType: ROWDATA
storeValue: "\b\210\001\020\001P\000b\241\004\022*\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0010\000B\00510001R\nbigint(20)\022*\b\001\020\f\032\nlogin_name \000(\0010\000B\004testR\fvarchar(200)\022)\b\002\020\f\032\tnick_name \000(\0010\000B\004testR\fvarchar(200)\022 \b\003\020\f\032\006passwd \000(\0010\001R\fvarchar(200)\022$\b\004\020\f\032\004name \000(\0010\000B\004testR\fvarchar(200)\0220\b\005\020\f\032\tphone_num \000(\0010\000B\v11111111111R\fvarchar(200)\022.\b\006\020\f\032\005email \000(\0010\000B\r111@gmail.comR\fvarchar(200)\022\"\b\a\020\f\032\bhead_img \000(\0010\001R\fvarchar(200)\022\'\b\b\020\f\032\nuser_level \000(\0010\000B\0013R\fvarchar(200)\022&\b\t\020[\032\bbirthday \000(\0010\000B\n1999-09-09R\004date\022!\b\n\020\f\032\006gender \000(\0010\000B\001FR\nvarchar(1)\0226\b\v\020]\032\vcreate_time \000(\0010\000B\0232020-02-02 02:02:02R\bdatetime\022\"\b\f\020]\032\foperate_time \000(\0010\001R\bdatetime"
, header {version: 1logfileName: "mysql-bin.000002"logfileOffset: 4615479serverId: 1serverenCode: "UTF-8"executeTime: 1610943288000sourceType: MYSQLschemaName: ""tableName: ""eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\00522379"
],raw=false,rawEntries=[]]

  查看kafka消费出来的数据

ϝ												

Canal原理及其使用相关推荐

  1. Canal~1:canal原理

    canal原理 1 什么是 canal 2 使用场景 3 canal 的工作原理 4 MySQL 的 binlog (1) 什么是 binlog (2) binlog 的开启 (3) binlog 的 ...

  2. canal原理的一些学习-2(HA 模式搭建)

    简介   本篇主要介绍canal的HA集群的搭建过程,以及结合自身使用过程的一些经历介绍一些注意事项. 简介 1. 集群的搭建 1.1 机器准备 1.2 在两台canal机器上完成以下配置 1.2.1 ...

  3. canal原理的一些学习-1(canal的一些原理性介绍)

    1. cannal 是什么,能做什么用 1.1 mysql的binlog 1.2 mysql 的主从复制过程 1.3 canal能够同步数据的原理 2. quick start 3. canal 的设 ...

  4. 阿里巴巴Canal原理剖析

    Canal是阿里开源的一款基于Mysql数据库binlog的增量订阅和消费组件,通过它可以订阅数据库的binlog日志,然后进行一些数据消费,如数据镜像.数据异构.数据索引.缓存更新等.相对于消息队列 ...

  5. erosa mysql_MySQL协议和canal实现

    前言 前面的文章里,我们了解到 canal 可以从 MySQL 中感知数据的变化.这是因为它模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,从而实现了主从复制. 正是了 ...

  6. mysql获取最好成绩对应数据的其他项_开源数据同步神器——canal

    前言 如今大型的IT系统中,都会使用分布式的方式,同时会有非常多的中间件,如redis.消息队列.大数据存储等,但是实际核心的数据存储依然是存储在数据库,作为使用最广泛的数据库,如何将mysql的数据 ...

  7. 开源数据同步神器——canal

    前言 如今大型的IT系统中,都会使用分布式的方式,同时会有非常多的中间件,如redis.消息队列.大数据存储等,但是实际核心的数据存储依然是存储在数据库,作为使用最广泛的数据库,如何将mysql的数据 ...

  8. MySQL协议和canal实现

    前言 前面的文章里,我们了解到 canal 可以从 MySQL 中感知数据的变化.这是因为它模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,从而实现了主从复制. 正是了 ...

  9. Canal实时监控mysql数据库

    1. canal原理概述 1.1 mysql的主从复制原理1.1.1 mysql master将数据变更写入二进制日志(binlog,其中记录叫二进制日志事件,可通过show binlog event ...

最新文章

  1. HDU 5115 Dire Wolf ——(区间DP)
  2. linux中-i选项的作用,linux – find中的-prune选项有什么作用?
  3. 开发人员如何了解用户和需求
  4. apple watch自身不会让你更健康,而这些会
  5. C语言从未排序的链接列表中删除重复项的算法(附完整源码)
  6. DOA——ESPRIT算法
  7. 路遥工具箱全面迁移至 .NET 6.0 并发布 3.0 版本及迁移记录详解
  8. std::string中的find_first_of()和find_last_of()函数
  9. STM32 - 定时器的设定 - 基础-03 - 输出波形控制 - Output compare mode
  10. 面试经验:求职面试时的835守则
  11. Cocos2d-Lua 输出脚本预编译错误产生位置
  12. 学习笔记之awk用法
  13. delphi相关文件扩展名
  14. 了解信息安全管理体系的基本思路
  15. unittest模块:单元测试
  16. T检验和F检验\自由度
  17. TCP粘包以及UDP丢包问题
  18. cocos2d-x初步
  19. 里氏转换,arraylist,path,file
  20. 将一个真分数分解为埃及分数相加的形式

热门文章

  1. w3cschool linux命令,linux常见操作指令
  2. Python 添加根目录的三种方法
  3. 360 for Linux 与 setuid
  4. 量化投资学习——读书笔记
  5. Python Web技术开发软件安装
  6. Web安全——sql注入
  7. 均值漂移(mean shift )聚类算法Matlab实现详解
  8. docker linux 快速开窗口_Linux下Docker快速部署LAMP
  9. 011mmap进程通信
  10. 直播一:H.264编码基础知识详解