Canal1.1.4的安装与使用

  • Canal
    • 一、修改mysql的配置
    • 二、Canal的安装
    • 三、Java客户端

Canal

Canal,译意为水道/管道/沟渠。主要用途是基于MYSQL数据库增量日志解析,提供增量数据订阅和消费。

一、修改mysql的配置

目的:开启mysql的binlog日志功能

  1. 创建用户 ,这里用户名暂定为canal,密码:Canal@123456;

    create user 'canal'@'%' identified by 'Canal@123456';
    
  2. 给用户授权,授权 *.*表示所有库;

    grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to       'canal'@'%' identified by 'Canal@123456';
    

    如果mysql的版本为8.0.3以上版本,由于自MySQL 8.0.3开始,身份验证插件默认使用caching_sha2_password,此时Canal 1.1.4 会启动报错:caching_sha2_password Auth failed,故还需在mysql中修改canal用户对应的身份验证插件为mysql_native_password,其中canal为用户名,Canal@123456为密码。

    select host,user,plugin from mysql.user ;
    ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'Canal@123456';
    
  3. 在mysql配置文件my.cnf或my.ini设置如下信息

    # 打开binlog
    log-bin=mysql-bin
    # 选择ROW(行)模式
    binlog-format=ROW
    
  4. 改了配置文件之后,重启Mysql,使用命令查看是否打开binlog模式;

    show variables like 'log_bin';
    
  5. 查看binlog日志文件列表;

    show binary logs;
    
  6. 查看正在写入的binlog文件;

    show master status;
    

二、Canal的安装

  1. 解压canal.deployer-1.1.4.tar.gz,canal-1.1.4的下载地址

  2. 修改canal.deployer-1.1.4/conf/example/instance.properties文件;
    2.1 修改canal.instance.master.address的值为要连接的数据库的ip地址

    # position info
    #数据库的ip地址
    canal.instance.master.address=127.0.0.1:3306
    canal.instance.master.journal.name=
    canal.instance.master.position=
    canal.instance.master.timestamp=
    canal.instance.master.gtid=
    

2.2 设置连接数据库的用户名和密码,这个用户名和密码是在Mysql中授权的用于伪装成Mysql slave的用户名和密码

   # username/password#用户名canal.instance.dbUsername=canal#密码canal.instance.dbPassword=Canal@123456canal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false

2.3 配置监控规则
.*\…*表示监控所有数据库的所有表,也可以是具体的表名,用,隔开,比如test.t_people,test.user

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
  1. 修改canal.deployer-1.1.4/conf/canal.properties配置文件信息

    #去掉下面一行的注释
    #canal.instance.parser.parallelThreadSize = 16
    ## disruptor ringbuffer size, must be power of 2
    canal.instance.parser.parallelBufferSize = 256
    
  2. 启动canal
    在canal.deployer-1.1.4/bin中运行startup文件。其中win10系统运行startup.bat,linux系统运行startup.sh

    在startup.bat中还需删除如下的配置文件内容,否则会启动就报错

     -Dlogback.configurationFile="%logback_configurationFile%"
    

启动canal

三、Java客户端

  1. 引入Maven依赖
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version>
</dependency>

2、canal客户端代码如下:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CannalClient implements InitializingBean {private final static int BATCH_SIZE = 1000;@Overridepublic void afterPropertiesSet() throws Exception {// 创建链接,这里的127.0.0.1是canal的ip地址CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");try {//打开连接connector.connect();//订阅数据库表,全部表//connector.subscribe(".*\\..*");
//我这里订阅了test数据库的t_people表和record_opertime表connector.subscribe("test.t_people,test.record_opertime");//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(BATCH_SIZE);//获取批量IDlong batchId = message.getId();//获取批量的数量int size = message.getEntries().size();//如果没有数据if (batchId == -1 || size == 0) {try {//线程休眠2秒Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}} else {//如果有数据,处理数据printEntry(message.getEntries());}//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。connector.ack(batchId);}} catch (Exception e) {e.printStackTrace();} finally {connector.disconnect();}}/*** 打印canal server解析binlog获得的实体类信息*/private static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {//开启/关闭事务的实体类型,跳过continue;}//RowChange对象,包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChage;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChage.getEventType();//打印Header信息System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));//判断是否是DDL语句if (rowChage.getIsDdl()) {System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());}//获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {//如果是删除语句if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());//如果是新增语句} else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());//如果是更新的语句} else {//变更前的数据System.out.println("------->; before");printColumn(rowData.getBeforeColumnsList());//变更后的数据System.out.println("------->; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}
  1. 修改mysql数据库的数据
    修改表的数据之后,就能在控制台看到如下信息:

Canal1.1.4的安装与使用相关推荐

  1. canal1.1.4的下载及安装

    canal1.1.4的安装 一.找到 MySQL 配置文件的位置 ➢ Linux: /etc/my.cnf 如果/etc 目录下没有,可以通过 locate my.cnf 查找位置 ➢ Windows ...

  2. java怎么安装manven_Canal——Canal-Adapter源码在IDEA部署运行

    一.下载源码 我这里用的是canal-1.1.4版本 源码结构 client-adapter项目就是本次要部署运行的 源码导入到IDEA中的结构如下: 二.安装配置 找到manven模块中有root的 ...

  3. mysql数据实时同步:Canal安装部署、kafka安装、zk安装、mysql安装、Canal Server+Canal Client HA,Canal+mysql+Kafka,相关验证(学习笔记)

    目录 Canal安装部署 1.1. 服务器准备 1.2. 设置主机名并配置hosts 1.3. 免密设置 1.4. 设置ntp时间 1.5. 关闭防火墙 1.6. 关闭selinux 1.7. 安装J ...

  4. macOS、Linux CentOS 、Docker安装部署canal-server(canal-deployer)服务

    1.环境准备 canal-server(canal-deployer)依赖jdk,需要先安装部署好jdk1.8 mysql开启binlog 2.安装部署 2.1.Docker方式安装部署 这里以1.1 ...

  5. canel-1.1.5 canal.deployer安装

    简介 canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 工作原理 canal 模拟 MySQL slave 的交互协议,伪装 ...

  6. 怎样安装win服务器系统,win服务器系统安装教程

    win服务器系统安装教程 内容精选 换一换 SAP B1快速部署方案如图1所示.说明如下:VPC网络:为了保证网络的安全,SAP B1系统中所有节点在一个VPC网络内,且所有节点应属于同一个AZ(Av ...

  7. 使用canal1.1.5让MySQL5.7同步到ES7.x

    哎!也算是记录自己的踩坑日记吧 注意: MySQL和ES7请自己自行安装好,请大家一定确定自己MySQL的版本. 使用的系统:Centos,Ubuntu.windows请移步 # 这里的版本不能低于5 ...

  8. Canal adapter1.1.5安装部署配置(3)

    使用前必须先安装Canal server,Canal最新1.1.5版安装部署(1) 安装 版本根据情况自行调整,最新版本参考:Releases · alibaba/canal · GitHub #进入 ...

  9. Mysql+Canal1.1.5+Es实现数据同步

    注:关于MySql和ES的安装过程,本篇文章不做详细描述 1.配置mysql 部分(binlog日志) 1)找到mysql中的my.ini文件(如下图) 2)修改my.ini中的配置,如下图 port ...

最新文章

  1. Linux下vsftpd服务器
  2. leetcode算法题--字符串转换整数 (atoi)
  3. java支持接口吗_java – 支持专用接口方法
  4. wxWidgets:WxBase 事件循环
  5. 巧妙地在Windows搭建node服务器
  6. 记一次网络访问故障排查
  7. 【Java】15分钟快速体验阿里Java诊断工具Arthas
  8. 抢先下载:Windows XP SP3英文预览版
  9. kettle使用命令行来运行ktr和kjb
  10. 只用2秒,轻松获取一线所有城市公交数据!
  11. 适配7.0手机拍照、相册、裁剪图片
  12. js去除字符串的首尾的逗号
  13. win7系统蓝屏故障以及常见的解决方案
  14. rabbitMq系列2:AMPQ协议与RabbitMQ执行过程
  15. VLOOKUP函数使用及注意事项
  16. 手机充电总要充到100%吗?充电时先插手机还是充电器
  17. 透析阿里3亿元投资的如涵:孵化张大奕,吸金但苦逼
  18. 这又是一则招聘贴——招聘区块链系统开发的同学
  19. 哪有那么多BAT的逆袭?
  20. 要怎么在计算机里清除桌面内存,怎么清理运行内存占用_怎么清理电脑运行内存-win7之家...

热门文章

  1. Bootstrap 颜色
  2. 《机械与电子》期刊简介
  3. 微软的COM中GUID和UUID、CLSID、IID
  4. java中的静态变量方法
  5. 软件工程的专业介绍与前景
  6. github页面绑定域名/Domain‘s DNS record could not be retrieved
  7. 赖世雄---音标总结
  8. 编译器32位和64位数据类型区别
  9. 大数据展望“双11” 预计淘宝销售额将超600亿
  10. 【Redis】Redis7.0新特性汇总(详细)