1. 下载canal deployer压缩包

首先下载canal压缩包。
canal.deployer-1.1.5-SNAPSHOT.tar.gz

2. 将canal.deployer 复制到固定目录并解压

mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal

3.修改配置参数

3.1 修改instance配置 conf/example/instance.properties

#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=test_project.test_topic
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

3.2 修改canal配置文件 conf/canal.properties

# ...
canal.serverMode = kafka
# ...
kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN

其中必须配置参数为canal.serverMode、kafka.bootstrap.servers、kafka.security.protocol、kafka.sasl.mechanism,其他参数用户可根据实际情况自主进行调优,kafka.bootstrap.servers需要选择topic所在region的endpoint,endpoint可前往DataHub兼容kafka协议查看。

3.3 修改jass配置文件 conf/kafka_client_producer_jaas.conf

4. 启动和关闭

启动之前需要保证DataHub有相应的Topic,
创建的Topic要求可以参考
docker 安装kafka,创建topic

4.1 启动

cd /usr/local/canal/
sh bin/startup.sh

4.2 查看日志

查看 logs/canal/canal.logvi logs/canal/canal.log

2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

###查看instance的日志:vi logs/example/example.log

2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

查看meta的日志,vi logs/example/meta.log

数据库的每次增删改操作,都会在meta.log中生成一条记录,查看该日志可以确认canal是否有采集到数据

tail -f example/meta.log
2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/127.0.0.1:3306]
2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/127.0.0.1:3306]
2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/127.0.0.1:3306]

canal启动完成后,剩下的就是java客户端监听kafka接口消息就可以了
pom.xml

 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.3.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.2.RELEASE</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency>

application.yml

server:port: 8083
spring:application:name: canalkafka:#Kafka服务器地址bootstrap-servers: 192.168.115.128:9092producer:#设置数据value的序列化处理类value-serializer: org.apache.kafka.common.serialization.StringSerializer
#Canal配置
canal:server: 192.168.115.128:11111
#  server: 172.17.0.1:11111destination: example#日志配置
logging:pattern:console: "%msg%n"level:root: info
@Component
@Slf4j
public class Receiver {@KafkaListener(topics = "example",id = "069OiR2vRpinUzoSca78yg")public void listen(ConsumerRecord<?, ?> record) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {String message = (String) kafkaMessage.get();log.info("message: {}", message);}}
}

canal -kafka快速实践相关推荐

  1. 2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明

    目录 Kafka快速回顾 消息队列: 发布/订阅模式: Kafka 重要概念: 常用命令 整合说明 两种方式 两个版本API 在实际项目中,无论使用Storm还是SparkStreaming与Flin ...

  2. 消息中间件 --- Kafka快速入门

    消息中间件 --- Kafka 快速入门 消息中间件:https://blog.51cto.com/u_9291927/category33 GitHub: GitHub - scorpiostudi ...

  3. 大数据技术之 Kafka (第 2 章 Kafka快速入门)

    第 2 章 Kafka 快速入门 下载安装kafka集群 1.需要jdk 2.需要zookeeper,这个东西在最新版的Kafka中内置. 3.下载Kafka安装包 (下载官网地址:Apache Ka ...

  4. canal+Kafka实现mysql与redis数据同步

    前言 上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用.在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化.如果这时候数据库数据发生变更操作,就不 ...

  5. 快速实践大规模轻量级图片分类模型:飞桨识图 PP-ShiTu

    快速实践大规模图片分类模型:飞桨识图 PP-ShiTu 飞桨识图PP-ShiTu是轻量级图像识别系统,集成了目标检测.特征学习.图像检索等模块,广泛适用于各类图像识别任务.CPU上0.2s即可完成在1 ...

  6. Docker安装部署MySQL+Canal+Kafka+Camus+HIVE数据实时同步

    因为公司业务需求要将mysql的数据实时同步到hive中,在网上找到一套可用的方案,即MySQL+Canal+Kafka+Camus+HIVE的数据流通方式,因为是首次搭建,所以暂时使用伪分布式的搭建 ...

  7. 大数据技术之Kafka(一)Kafka概述、Kafka快速入门、Kafka架构深入

    文章目录 1 Kafka 概述 1.1 定义 1.2 消息队列 1.2.1 传统消息队列的应用场景 1.2.2 消息队列的两种模式 1.3 Kafka 基础架构 2 Kafka 快速入门 2.1 安装 ...

  8. Kafka 快速入门(安装)

    kafka学习目录:kafka目录 二.Kafka 快速入门 2.1.windows版安装 2.1.1.Quick Start 本次安装学习在Windows操作系统进行.(Linux版本的差别不大,运 ...

  9. Python搭建个人博客(pelican)-快速实践~

    Python搭建个人博客(pelican)-快速实践~ 文章目录 Python搭建个人博客(pelican)-快速实践~ 一.安装 二.创建项目 三.创作一篇文章 四.生成网站 五.预览网站 强烈建议 ...

最新文章

  1. 青少年蓝桥杯_2020_steam考试_初级组_第三题
  2. buu rsarsa
  3. Doxygen自动文档生成工具在Eclipse中的集成及使用举例
  4. 内存对齐指令详解(posix_memalign)
  5. php实现直播答题系统,直播答题解决方案
  6. 数的划分(信息学奥赛一本通-T1304)
  7. resnet结构_来聊聊ResNet及其变种
  8. centos 安装Jitsi架设_Docker学习6:docker安装centos7
  9. K8s与Docker
  10. 压电式加速计matlab,低频压电加速度传感器的噪声特性及信号处理方法研究
  11. libpcap中主要函数使用介绍
  12. 一起看懂Redis两种持久化方式的原理
  13. 用计算机怎么弹两只老虎,七键两只老虎曲谱_64键的电子琴怎么弹两只老虎1234567按哪个键...
  14. 聊一聊我在移动平台混合开发的经验
  15. 在Win7上安装TexLive及设置XeLaTeX的整个过程
  16. Python画美国盾牌
  17. RTCP Inactivity导致掉话
  18. 前端必看的 HTML + CSS技巧
  19. 游戏开发技术——游戏引擎
  20. 再读新疆系列(一)——穿越准葛尔盆地

热门文章

  1. win11本地用户和组在哪里添加?
  2. Linux网络编程基础1(网络应用程序设计模式,分层模型,协议格式)
  3. 【渝粤题库】陕西师范大学152110 行政秘书与公文写作
  4. 程序员必懂的15大定律和7大原则
  5. CentOS 5.2 储存区域网路(SAN)-安装与设定iSCSI Target 篇
  6. 越疆魔术师DEBOT(magician)机械臂ROS、MoveIt!和Gazebo功能包与ROS-I教程(melodic)
  7. msf017-010复现及木马利用
  8. 想实时了解键盘功能键(Caps Lock/Num Lock/Scroll Lock)状态?用它就行了——Wireless Keyboard Indicator
  9. [iOS Xcode8报错]dyld: Library not loaded: /System/Library/Frameworks/UserNotifications.framework/UserN
  10. 用openssl进行SSL编程