Canal+Camus快速采集MySQL Binlog到数据仓库
点击上方蓝色字体,选择“设为星标”
回复”资源“获取更多资源
大数据技术与架构
点击右侧关注,大数据开发领域最强公众号!
大数据真好玩
点击右侧关注,大数据真好玩!
数据仓库的同步方法
我们的数据仓库长久以来一直使用天级别的离线同步方法:采用Sqoop或DataX按天定时获取各个MySQL表的全量或增量数据,然后载入到Hive里对应的各个表中。这种方法门槛低,容易操作,在数仓建设阶段能够快速启动。但是随着时间的推移,它暴露出了一些缺点:
从MySQL获取数据只能靠select,如果一次select数据量过大,会造成慢查询,甚至影响线上业务;
随着业务量的增长和新业务的加入,数据量会相应增加,离线同步一次的耗时会越来越长;
增量同步方式无法检测到MySQL中被delete掉的记录,如果没有时间戳字段的话,也较难检测到被update的记录。
所以,我们最近致力于按照变动数据获取(Change Data Capture,CDC)的方式改造我们的数仓,分三步走:
首先订阅MySQL库的Binlog,将其存储到临时表中;
然后对需要入库的表一次性制作快照,并将存量数据导入Hive;
最后基于存量数据和Binlog记录的变动进行合并,还原出与业务库相同的数据。
本文要说的就是第一步的实现方案。我们采用阿里的开源组件Canal来接入MySQL Binlog,并投递到Kafka;采用LinkedIn的开源组件Camus获取Kafka中的Binlog,并落地到Hive。限于篇幅,我们不会从源码级别探索Canal和Camus的内部机制,参考官方文档或者自行读源码都非常简单。
Canal的配置
采用最新的1.1.3版本: https://github.com/alibaba/canal/releases/tag/canal-1.1.3 Kafka版本则是1.0.1。
canal.properties配置
只列出关键的配置项如下,其中有些是和Kafka的配置对应的。完整的配置及其含义还请参见Canal项目的GitHub Wiki。
# 如果要做高可用的话,把ZooKeeper配置好
canal.zkServers = 10.10.99.130:2181,10.10.99.132:2181,10.10.99.133:2181,10.10.99.124:2181,10.10.99.125:2181
# Binlog格式,MySQL的binlog-format也应该为ROW
canal.instance.binlog.format = ROW
# 是否过滤掉DCL、DML、DDL语句
canal.instance.filter.query.dcl = true
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
# 允许自动检测Canal监听实例的变更,60秒一次
canal.auto.scan = true
canal.auto.scan.interval = 60
# 默认值tcp,改为投递到Kafka
canal.serverMode = kafka
# Kafka bootstrap.servers,可以不用写上全部的brokers
canal.mq.servers = 10.10.99.132:9092,10.10.99.133:9092,10.10.99.134:9092,10.10.99.135:9092
# 投递失败的重试次数,默认0,改成2
canal.mq.retries = 2
# Kafka batch.size,即producer一个微批次的大小,默认16K,这里加倍
canal.mq.batchSize = 32768
# Kafka max.request.size,即一个请求的最大大小,默认1M,这里也加倍
canal.mq.maxRequestSize = 2097152
# Kafka linger.ms,即sender线程在检查微批次是否就绪时的超时,默认0ms,改为150ms
# 满足batch.size和linger.ms其中之一,就会发送消息
canal.mq.lingerMs = 150
# Kafka buffer.memory,缓存大小,默认32M
canal.mq.bufferMemory = 33554432
# 获取Binlog数据的批次大小,默认50
canal.mq.canalBatchSize = 50
# 获取Binlog数据的超时时间,默认200ms
canal.mq.canalGetTimeout = 200
# 是否将Binlog转为JSON格式。如果为false,就是原生Protobuf格式
canal.mq.flatMessage = true
# 压缩类型,官方文档没有描述
canal.mq.compressionType = none
# Kafka acks,默认all,表示分区leader会等所有follower同步完才给producer发送ack
# 0表示不等待ack,1表示leader写入完毕之后直接ack
canal.mq.acks = all
# Kafka消息投递是否使用事务
# 主要针对flatMessage的异步发送和动态多topic消息投递进行事务控制来保持和Canal Binlog位置的一致性
canal.mq.transaction = false
instance.properties配置
我们最终采用多topic单partition的方式把Binlog存入Kafka,也就是每张表对应一个topic,每个topic只有一个partition。这样可以保证表级别Binlog的有序性,并且实测热点表对应topic的压力也不大。
# 需要接入Binlog的表名,支持正则,但我们手动指定每张表
canal.instance.filter.regex=mall\\.address,mall\\.base_category,mall\\.orders,mall\\.order_product,mall\\.product,mall\\.mall_category,mall\\.mall_comment
# 不需要接入Binlog表的黑名单
canal.instance.filter.black.regex=# 单topic模式下的表名
# canal.mq.topic=example
# 多topic模式下的topic名与表名的对应关系,同样支持正则
canal.mq.dynamicTopic=bl_mall_address:mall\\.address,bl_mall_base_category:mall\\.base_category,bl_mall_orders:mall\\.orders,bl_mall_order_product:mall\\.order_product,bl_mall_product:mall\\.product,bl_mall_mall_category:mall\\.mall_category,bl_mall_mall_comment:mall\\.mall_comment
# 单partition模式下的分区号
canal.mq.partition=0
# 多partition模式下的分区hash规则,需要按主键组来
# canal.mq.partitionsNum=3
# canal.mq.partitionHash=test.table:id^name,.*\\..*
通过kafka-topics工具观察自动生成的topic:
Camus的配置
Camus在国内并没有Canal那么有名,但十分好用。它是LinkedIn开源的,基于Hadoop MapReduce的Kafka到HDFS数据管道。它支持Kafka topic的自动发现与offset管理,基于Avro或JSON的数据schema,以及按时间分区的功能。另外它也提供了数据读取和写入的自定义逻辑入口,比较灵活。
Camus在很久之前就作为一个子项目合并到了同为LinkedIn开源的数据交换组件Gobblin中,不再单独维护。本文使用的是Confluent维护的镜像版本,仍然在更新,传送门:https://github.com/confluentinc/camus。另外采用的Hadoop版本是CDH自带的2.6.0。
将Camus源码clone到本地之后,执行mvn clean package编译并打包,就可以准备使用了。
camus.properties配置
这个properties文件的名字可以随便起,每一个properties就代表了一个Camus job(本质上是MR job)的定义。仍然只列出关键的配置项如下。
# Kafka brokers
kafka.brokers=10.10.99.132:9092,10.10.99.133:9092,10.10.99.134:9092,10.10.99.135:9092
# job名称
camus.job.name=binlog-fetch
# Kafka数据落地到HDFS的位置。Camus会按照topic名自动创建子目录
etl.destination.path=/user/hive/warehouse/binlog.db
# HDFS上用来保存当前Camus job执行信息的位置,如offset、错误日志等
etl.execution.base.path=/camus/exec
# HDFS上保存Camus job执行历史的位置
etl.execution.history.path=/camus/exec/history
# 即core-site.xml中的fs.defaultFS参数
fs.default.name=hdfs://mycluster
# Kafka消息解码器,默认有JsonStringMessageDecoder和KafkaAvroMessageDecoder
# Canal的Binlog是JSON格式的。当然我们也可以自定义解码器
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder
# 落地到HDFS时的写入器,默认支持Avro、SequenceFile和字符串
# 这里我们采用一个自定义的WriterProvider,代码在后面
# etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.CanalBinlogRecordWriterProvider
# JSON消息中的时间戳字段,用来做分区的
# 注意这里采用Binlog的业务时间,而不是日志时间
camus.message.timestamp.field=es
# 时间戳字段的格式
camus.message.timestamp.format=unix_milliseconds
# 时间分区的类型和格式,默认支持小时、天,也可以自定义时间
etl.partitioner.class=com.linkedin.camus.etl.kafka.partitioner.TimeBasedPartitioner
etl.destination.path.topic.sub.dirformat='pt_hour'=YYYYMMddHH
# 拉取过程中MR job的mapper数
mapred.map.tasks=20
# 按照时间戳字段,一次性拉取多少个小时的数据过后就停止,-1为不限制
kafka.max.pull.hrs=-1
# 时间戳早于多少天的数据会被抛弃而不入库
kafka.max.historical.days=3
# 每个mapper的最长执行分钟数,-1为不限制
kafka.max.pull.minutes.per.task=-1
# Kafka topic白名单和黑名单
kafka.blacklist.topics=__consumer_offsets,binlog_dym_test,binlog_mall_test,test010802,test_kylin_streaming2,user_persona4scheduler,HbaseRequestsPerSecond
kafka.whitelist.topics=
# 设定输出数据的压缩方式,支持deflate、gzip和snappy
mapred.output.compress=false
# etl.output.codec=gzip
# etl.deflate.level=6
# 设定时区,以及一个时间分区的单位
etl.default.timezone=Asia/Shanghai
etl.output.file.time.partition.mins=60
自定义Binlog落地方式
我们想要在数据输出时就符合各表的定义,而不是之后再去费力解析JSON。这可以通过实现Camus提供的RecordWriterProvider接口来自定义。不多说,直接上代码:
public class CanalBinlogRecordWriterProvider implements RecordWriterProvider {static class CanalBinlogRecordWriter extends RecordWriter<IEtlKey, CamusWrapper> {private DataOutputStream outputStream;private String fieldDelimiter;private String rowDelimiter;public CanalBinlogRecordWriter(DataOutputStream outputStream, String fieldDelimiter, String rowDelimiter) {this.outputStream = outputStream;this.fieldDelimiter = fieldDelimiter;this.rowDelimiter = rowDelimiter;}@Overridepublic void write(IEtlKey key, CamusWrapper value) throws IOException, InterruptedException {if (value == null) {return;}String recordStr = (String) value.getRecord();JSONObject record = JSON.parseObject(recordStr, Feature.OrderedField);if (record.getString("isDdl").equals("true")) {return;}JSONArray data = record.getJSONArray("data");for (int i = 0; i < data.size(); i++) {JSONObject obj = data.getJSONObject(i);if (obj != null) {StringBuilder fieldsBuilder = new StringBuilder();fieldsBuilder.append(record.getLong("id"));fieldsBuilder.append(fieldDelimiter);fieldsBuilder.append(record.getLong("es"));fieldsBuilder.append(fieldDelimiter);fieldsBuilder.append(record.getLong("ts"));fieldsBuilder.append(fieldDelimiter);fieldsBuilder.append(record.getString("type"));for (Entry<String, Object> entry : obj.entrySet()) {fieldsBuilder.append(fieldDelimiter);fieldsBuilder.append(entry.getValue());}fieldsBuilder.append(rowDelimiter);outputStream.write(fieldsBuilder.toString().getBytes());}}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {outputStream.close();}}@Overridepublic String getFilenameExtension() {return "";}@Overridepublic RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(TaskAttemptContext context,String fileName,CamusWrapper data,FileOutputCommitter committer) throws IOException, InterruptedException {Configuration conf = context.getConfiguration();String rowDelimiter = conf.get("etl.output.record.delimiter", "\n");Path path = new Path(committer.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(context, fileName, getFilenameExtension()));FileSystem fs = path.getFileSystem(conf);FSDataOutputStream outputStream = fs.create(path, false);return new CanalBinlogRecordWriter(outputStream, "\t", rowDelimiter);}
}
这样,我们就只留下了需要关心的数据,并且格式化为制表符分隔、换行符结尾的文本格式,可以直接符合数仓中对Hive表的定义规范了。
Camus job的执行和调度
可以通过hadoop jar命令来执行Camus job,不过项目内直接提供了camus-run工具,写法就很简单了:
bin/camus-run -P conf/binlog-fetch-camus.properties
通过Crontab或者Azkaban调度它都行,不再赘述。目前我们是半小时调度一次,运行良好。查看生成的目录结构。内部分区的格式是“pt_hour=YYYYMMddHH”:
在上述/camus/exec目录下也可以看到Kafka offset的存储。
版权声明:
本文为大数据技术与架构整理,原作者独家授权。未经原作者允许转载追究侵权责任。
编辑|冷眼丶
微信公众号|import_bigdata
欢迎点赞+收藏+转发朋友圈素质三连
文章不错?点个【在看】吧! ????
Canal+Camus快速采集MySQL Binlog到数据仓库相关推荐
- springboot整合canal,监听MySQL binlog日志,实现增量同步
有两个数据库,并不是主从关系,但是需要同步某张表,可以通过binlog日志,进行同步,前提是这两个数据库的要同步的表,表名和字段名需要一致. 当前项目连接的数据库(需要同步的数据库):base_pro ...
- centos7时间同步_基于 Canal 和 Kafka 实现 MySQL 的 Binlog 近实时同步
点击蓝色"架构文摘"关注我哟 加个"星标",每天上午 09:25,干货推送! 作者:Throwable 掘金:https://juejin.im/post ...
- 使用canal 监听mysql binlog获取增量数据
配置mysql sudo vi /etc/my.cnf [mysqld] log-bin=/var/lib/mysql/mysql-bin #开启日志监控 binlog-format=ROW #监控模 ...
- canal解析mysql日志异常_利用Canal解析mysql binlog日志
一.安装包下载(canal.deployer-x.x.x.tar.gz 官方建议使用1.0.22版本) 二.解压文件 tar -zxvf canal.deployer-1.0.22.tar.gz - ...
- Canal Mysql binlog 同步至 Hbase ES
文章目录 一.Canal介绍 工作原理 canal 工作原理 二.下载 三.安装使用 Mysql准备 canal 安装 解压缩 canal-deployer 配置修改 启动 查看server日志 查看 ...
- mysql binlog查看工具_数据同步工具otter(一)谈谈binlog和canal
之前因为懒,没有针对otter做更多的解释和说明,在使用过程中,也发现了一些问题,此次补上一个完整的文档,方便大家使用. Otter是基于cannal开源的,canal又是基于mysql binlog ...
- Canal同步mysql binlog至pulsar
Canal 一.简介 canal [kə'næl],主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 基于日志增量订阅和消费的业务包括 数据库镜像 数据库实时备份 索引构建和实时 ...
- elasticsearch 数据类型_基于 MySQL Binlog 的 Elasticsearch 数据同步实践
来源;马蜂窝 一.背景 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品.订单等数据的多维度检索. 使用 Elasticsearch 存 ...
- 基于 MySQL Binlog 的 Elasticsearch 数据同步实践
一.为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品.订单等数据的多维度检索. 使用 Elasticsearch 存储业务数 ...
最新文章
- Linux System Programming --Chapter Nine
- Fuxi ServiceModeJob 多租户(Quota Group) 功能介绍
- C++ Primer 5th笔记(chap 18 大型程序工具)noexcept
- 【C语言进阶深度学习记录】二十五 指针与数组的本质分析二
- Android Training精要(六)如何防止Bitmap对象出现OOM
- jquery粘贴按钮_实现前端点击按钮自动复制剪贴板功能
- 爱普森针式打印机怎么连接计算机,爱普生针式打印机怎么安装 爱普生针式打印机安装步骤详解【图文】...
- 解决sockscap64测试代理可以,实际应用无法联网的问题
- 通达信手机版分时图指标大全_通达信精选指标——挣开眼就买卖版指标详解
- 微信小程序点击事件传递自定义参数的方法和跨页面传递数据
- 【0day安全-软件漏洞分析技术】笔记
- ERROR Error: [copy-webpack-plugin] patterns must be an array
- LeetCode 5855. 找出数组中的第 K 大整数(自定义排序函数)
- 近端梯度法(proximal gradient)
- 秋招详细攻略——从准备到面试
- WHQL认证最新申请流程
- LeetCode——706,设计哈希映射
- Android 9.0 去除锁屏界面及SystemUI无sim卡拨打紧急电话控件显示功能实现
- VR和AR将如何发展下去?哪个更有前景?
- 拥有奇瑞QQ冰淇淋·桃欢喜才懂的幸福