Canal1.1.4的安装与使用
Canal1.1.4的安装与使用
- Canal
- 一、修改mysql的配置
- 二、Canal的安装
- 三、Java客户端
Canal
Canal,译意为水道/管道/沟渠。主要用途是基于MYSQL数据库增量日志解析,提供增量数据订阅和消费。
一、修改mysql的配置
目的:开启mysql的binlog日志功能
创建用户 ,这里用户名暂定为canal,密码:Canal@123456;
create user 'canal'@'%' identified by 'Canal@123456';
给用户授权,授权 *.*表示所有库;
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'Canal@123456';
如果mysql的版本为8.0.3以上版本,由于自MySQL 8.0.3开始,身份验证插件默认使用caching_sha2_password,此时Canal 1.1.4 会启动报错:caching_sha2_password Auth failed,故还需在mysql中修改canal用户对应的身份验证插件为mysql_native_password,其中canal为用户名,Canal@123456为密码。
select host,user,plugin from mysql.user ; ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'Canal@123456';
在mysql配置文件my.cnf或my.ini设置如下信息
# 打开binlog log-bin=mysql-bin # 选择ROW(行)模式 binlog-format=ROW
改了配置文件之后,重启Mysql,使用命令查看是否打开binlog模式;
show variables like 'log_bin';
查看binlog日志文件列表;
show binary logs;
查看正在写入的binlog文件;
show master status;
二、Canal的安装
解压canal.deployer-1.1.4.tar.gz,canal-1.1.4的下载地址
修改canal.deployer-1.1.4/conf/example/instance.properties文件;
2.1 修改canal.instance.master.address的值为要连接的数据库的ip地址# position info #数据库的ip地址 canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid=
2.2 设置连接数据库的用户名和密码,这个用户名和密码是在Mysql中授权的用于伪装成Mysql slave的用户名和密码
# username/password#用户名canal.instance.dbUsername=canal#密码canal.instance.dbPassword=Canal@123456canal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false
2.3 配置监控规则
.*\…*表示监控所有数据库的所有表,也可以是具体的表名,用,隔开,比如test.t_people,test.user
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
修改canal.deployer-1.1.4/conf/canal.properties配置文件信息
#去掉下面一行的注释 #canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256
启动canal
在canal.deployer-1.1.4/bin中运行startup文件。其中win10系统运行startup.bat,linux系统运行startup.sh
在startup.bat中还需删除如下的配置文件内容,否则会启动就报错-Dlogback.configurationFile="%logback_configurationFile%"
启动canal
三、Java客户端
- 引入Maven依赖
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version>
</dependency>
2、canal客户端代码如下:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CannalClient implements InitializingBean {private final static int BATCH_SIZE = 1000;@Overridepublic void afterPropertiesSet() throws Exception {// 创建链接,这里的127.0.0.1是canal的ip地址CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");try {//打开连接connector.connect();//订阅数据库表,全部表//connector.subscribe(".*\\..*");
//我这里订阅了test数据库的t_people表和record_opertime表connector.subscribe("test.t_people,test.record_opertime");//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(BATCH_SIZE);//获取批量IDlong batchId = message.getId();//获取批量的数量int size = message.getEntries().size();//如果没有数据if (batchId == -1 || size == 0) {try {//线程休眠2秒Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}} else {//如果有数据,处理数据printEntry(message.getEntries());}//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。connector.ack(batchId);}} catch (Exception e) {e.printStackTrace();} finally {connector.disconnect();}}/*** 打印canal server解析binlog获得的实体类信息*/private static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {//开启/关闭事务的实体类型,跳过continue;}//RowChange对象,包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChage;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChage.getEventType();//打印Header信息System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));//判断是否是DDL语句if (rowChage.getIsDdl()) {System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());}//获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {//如果是删除语句if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());//如果是新增语句} else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());//如果是更新的语句} else {//变更前的数据System.out.println("------->; before");printColumn(rowData.getBeforeColumnsList());//变更后的数据System.out.println("------->; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}}
}
- 修改mysql数据库的数据
修改表的数据之后,就能在控制台看到如下信息:
Canal1.1.4的安装与使用相关推荐
- canal1.1.4的下载及安装
canal1.1.4的安装 一.找到 MySQL 配置文件的位置 ➢ Linux: /etc/my.cnf 如果/etc 目录下没有,可以通过 locate my.cnf 查找位置 ➢ Windows ...
- java怎么安装manven_Canal——Canal-Adapter源码在IDEA部署运行
一.下载源码 我这里用的是canal-1.1.4版本 源码结构 client-adapter项目就是本次要部署运行的 源码导入到IDEA中的结构如下: 二.安装配置 找到manven模块中有root的 ...
- mysql数据实时同步:Canal安装部署、kafka安装、zk安装、mysql安装、Canal Server+Canal Client HA,Canal+mysql+Kafka,相关验证(学习笔记)
目录 Canal安装部署 1.1. 服务器准备 1.2. 设置主机名并配置hosts 1.3. 免密设置 1.4. 设置ntp时间 1.5. 关闭防火墙 1.6. 关闭selinux 1.7. 安装J ...
- macOS、Linux CentOS 、Docker安装部署canal-server(canal-deployer)服务
1.环境准备 canal-server(canal-deployer)依赖jdk,需要先安装部署好jdk1.8 mysql开启binlog 2.安装部署 2.1.Docker方式安装部署 这里以1.1 ...
- canel-1.1.5 canal.deployer安装
简介 canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 工作原理 canal 模拟 MySQL slave 的交互协议,伪装 ...
- 怎样安装win服务器系统,win服务器系统安装教程
win服务器系统安装教程 内容精选 换一换 SAP B1快速部署方案如图1所示.说明如下:VPC网络:为了保证网络的安全,SAP B1系统中所有节点在一个VPC网络内,且所有节点应属于同一个AZ(Av ...
- 使用canal1.1.5让MySQL5.7同步到ES7.x
哎!也算是记录自己的踩坑日记吧 注意: MySQL和ES7请自己自行安装好,请大家一定确定自己MySQL的版本. 使用的系统:Centos,Ubuntu.windows请移步 # 这里的版本不能低于5 ...
- Canal adapter1.1.5安装部署配置(3)
使用前必须先安装Canal server,Canal最新1.1.5版安装部署(1) 安装 版本根据情况自行调整,最新版本参考:Releases · alibaba/canal · GitHub #进入 ...
- Mysql+Canal1.1.5+Es实现数据同步
注:关于MySql和ES的安装过程,本篇文章不做详细描述 1.配置mysql 部分(binlog日志) 1)找到mysql中的my.ini文件(如下图) 2)修改my.ini中的配置,如下图 port ...
最新文章
- Linux下vsftpd服务器
- leetcode算法题--字符串转换整数 (atoi)
- java支持接口吗_java – 支持专用接口方法
- wxWidgets:WxBase 事件循环
- 巧妙地在Windows搭建node服务器
- 记一次网络访问故障排查
- 【Java】15分钟快速体验阿里Java诊断工具Arthas
- 抢先下载:Windows XP SP3英文预览版
- kettle使用命令行来运行ktr和kjb
- 只用2秒,轻松获取一线所有城市公交数据!
- 适配7.0手机拍照、相册、裁剪图片
- js去除字符串的首尾的逗号
- win7系统蓝屏故障以及常见的解决方案
- rabbitMq系列2:AMPQ协议与RabbitMQ执行过程
- VLOOKUP函数使用及注意事项
- 手机充电总要充到100%吗?充电时先插手机还是充电器
- 透析阿里3亿元投资的如涵:孵化张大奕,吸金但苦逼
- 这又是一则招聘贴——招聘区块链系统开发的同学
- 哪有那么多BAT的逆袭?
- 要怎么在计算机里清除桌面内存,怎么清理运行内存占用_怎么清理电脑运行内存-win7之家...