1、概述

1.1、目标

实时数据同步主要实现从源数据库到目标数据库的实时数据同步。源数据主要支持mysql数据库,目标数据包括mysql数据库和hbase数据库。

下面是实时数据同步的数据流转图,mysql的增量订阅数据经过canal和kafka,数据最终实时流入hbase或mysql。

1.2、整体设计架构

实时数据同步基于数据库变更订阅中心,实现从源数据到目标数据的实时数据同步应用。

整体设计架构如下图所示:

1.3、概要设计

实时数据同步分两部分:生产端(productor)和消费端(consumer)

1.3.1生产端(productor

集成canal的consumer和kafka的productor。主要完成如下任务

1、监听canal producer 发送过来的

2、将数据进行格式化,

3、调用kafka producer,发送数据。

1.3.2消费端(consumer

主要集成kafka consumer和HBase,主要完成如下任务

1、监听producer发送过来的数据。

2、解析数据

3、数据写入到HBase

1.4、技术组件

1.4.1 canal

1.4.1.1 canal简介

canal 是阿里巴巴mysql数据库binlog的增量订阅&消费组件。

1.4.1.2 canal工作原理

2.1.2、可靠性设计

canal设计规则。两台server。采用主从模式,有zookeeper管理。依据需求,合理配置instance个数。

每个instance最多只允许一个client接收数据。

2.1.3 数据处理规则

Producer不在对数据进行任何形式的过滤。数据是否发送,发送那些数据,需要在canal instance的配置文件中配置。Producer只负责将接收到canal producer发送过来的数据进行解析和格式化。这样虽然会增加占用带宽和存储资源。但可以保证数据在不同的应用中使用。

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,mysql master发送dump协议
  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  • canal解析binary log对象(原始为byte流)
  • 1.4.1.3 canal工作流程

    数据按照instance为单位进行划分,每个server上可以部署多个instance。但是同一个instance在整个集群中仅在一台上处于activity状态。其余均处于standby状态。也就是说instanceA两台server中均有部署,但是在这个集群中,仅有一个server上的instanceA处于activity状态。

    为了保证数据的有序性,每个instance只能被一个client接收。而且数据称队列方式消费消费,有且仅能被消费一次。

  • 1.4.2 Kafka

    1.4.2.1 kafka介绍

    kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。broker依赖zookeeper管理集群和存储一些meta信息

    14.2.2工作流程

  • kafka 在server端将数据分topic进行管理,每个topic按照需求,可以分多个partition。
  • Kafka消息传输同时集成了队列和广播传输两种模式,
  • 针对consumer端进行分group管理,每组会有多个consumer。
  • 每个topic的消息可以被多个group同时消费,每个group的多个consumer正常情况下只能消费一次消息。
  • 2、详细设计

    2.1、生产设计(producer

    由于canal服务端集成生产者,故程序直接调用canal consumer api 。收集canal producer发送的数据。

    在producer对数据的处理比较少,设计时希望数据原汁原味的把数据发送出去。Consumer端可以根据不同的场景需求,对数据进行处理。

    2.1.1、类关系图

  • 类ClusterCanalClient
    1. 主类,程序的启动入口,继承AbstractCanalClient
    2. 接收启动参数。并组织调用其他类完成消费端工作。
  • AbstractCanalClient
  • 调用consumerConfig,设置consumer参数。启动consumer。
  • 周期性获取消费数据,调用数据解析和格式化程序,处理数据。
  • 调用kafka接口,将处理后的数据发送。
  • 按照要求,解析并格式化数据。
  •  ItheimaProductorConfig
  • 读取并初始化配置信息
  • 每台consumer client可以接收两个instance。
  • 每两个instance配置3台client。确保系统稳定性
  • 处理规则

2、发送时的数据格式

{

table:database.tableName, 

binlog_id:””,

event_type:”insert/update/delete”

binlog_id:””,

exe_time:””,

cur_time:””,

cols:[{col:columnName,Val:value,type:columnType},{…},...]}

例如: 同步jeehe_goods_info表中的数据如下,将得到如下消息

{table:yzbmp.jehhe_goods_info, binlog”:”23455234234”,

“event_type”:”INSERT/UPDATE/DELETE”,

“exe_time”:””,

cur_time”:””,

 “cols[

{col:order_id,val:20014587,type:double”,”update”:”true/flase”,},

{col:“user_id,val:123456,type:varchar}

]}

其中

binlog_id 记录获取的binlog ID,用于核对数据,

event_type 当前数据操作类型。插入/修改/删除

exe_time:binlog生成时间

cur_time:canal获取binlog时间

table标签值为当前行所在的表名(数据库.表名)

cols:将列作为数组传输。

col:列名称

val:当前列的值

type:当前列的数据类型,为当前数据库规定的类型,比如mysql的varchar。

* 1、log日志中记录下当前批次,数据获取获取时间和当前处理时间,用于统计数据延迟和数据处理情况。

2、记录下数据binlog信息,并在consumer端同样记录,用于核实数据丢失情况。

目前,consumer在解析数据时,首先查找table标签,发现table标签后,再做进一步解析,如果没有发现table标签,丢弃该条消息。

2.2、消费端设计consumer

此消费端主要是从kafka中获取数据,将该数据存入到hbase中

2.2.1、关系图

  • 类 ClusterKafkaClient
  1. 消费端启动类。调用kafka consumer启动程序
  • 类KafkaConsumerController
  1. 消费端启动类,启动时负责初始化数据。
  2. 调用kafkaconsumer消费端,周期性(暂定30秒)接收producer发送的数据。
  3. 调用格式化,格式化数据。
  4. 调用Hbase控制类,实例化数据
  • 类 HBaseController
  1. 调用HBaseConfig,
  2. 获取rowMeta数据,以row为单位,持久化数据。
  3. 鉴于线上采用HBase v1.0 版本,目前,程序主要使用V1.0 版本的API。
  • 类YZHBaseTransferConfig

该类主要用于记录mysql数据同步至hbase时的对应关系:

  1. 创建对象时,连接一次数据库,并初始化数据。
  2. 依据数据库对应关系表,将数据实例化为两个对象,分别为SynColumn,SyTable。
  3. 同步时主要分为全表同步和部分同步。全表同步时,所有列都会同步至hbase中,部分同步时,只同步指定列
  • 类YZHBaseTransfer
  1. 该类主要负责格式化数据。将kafka接收的消息按照同步对应关系要求。进行格式化,将数据存入rowMeta实例中。
  •   SynColumn SynTable

数据库中数据同步至hbase时的字段对应关系,分别与yzdc_sync_table_mapping和yzdc_sync_column_mapping表相对应。

  • 类 ColumnMetaRowMeta

存入hbase数据库的数据对象

2.2.2、可靠性设计 

在kafka中,由于每台partition需要对应一台 consumer client。目前环境做如下配置:

2台broker(server).

topic的partition设置为3。这样就可以设置3台 consumer client

2.2.3、数据准确性保证

由于kafka消息传播再多个partition之间是无序的。

Hbase写入时必须设置合适的key,在出现故障时可以将数据冗余覆盖

kafka的offset修改为手动提交,保证HBase写入后再关提交offset。

2.3、数据库设计

数据库主要表结构设计

2.3.1数据对应关系表 yzdc_sync_table_mapping

字段名称

字段类型

注释

tb_id

int

主键,自增

orig_tb

VARCHAR

原始表名称

dest_tb

VARCHAR

目标表名称

key_col

VARCHAR

作为rowkey的列

default_family

VARCHAR

默认family

syn_type

ENUM

数据同步类型,分为all和part。all表示全表同步,part表示仅同步表的一部分

is_delete

TINYINT

是否删除

update_time

long

记录当前数据修改时间

2.3.2数据对应关系表 yzdc_sync_column_mapping

字段名称

字段类型

注释

col_id

int

主键、自增

orig_clo

VARCHAR

原始列名称

dest_qualifier

VARCHAR

对应的目标qualifier

dest_family

VARCHAR

对应的目标family

tb_id

int

对应的表主键

update_time

long

修改时间

is_delete

boolean

是否删除

附录一、kafka数据无序性解决方案

为了解决数据负载均衡,通常情况下会为kafka的topic设置多个partition。便于多consumer接收数据,这样便会引起数据时序性问题。

例如: 首先修改A 为 A1,修改结果发送至partition 1.

再次修改A1为 A2,修改结果发送至partition 2.

而客户端再接收数据时,针对不同的partition并没有时序性,很有可能会先接收partition 2 的数据,将结果存为A2,然后又接收到partition 1 数据,将结果再次修改为A1,这样的结果和实际结果不符。

一文带你玩转实时数据同步方案相关推荐

  1. 基于数据库数据增量同步_基于 Flink SQL CDC 的实时数据同步方案

    简介:Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink PMC,阿里巴巴技术专家伍翀 (云邪)分享,内容将从传统的 ...

  2. cdc工具 postgresql_基于 Flink SQL CDC 的实时数据同步方案

    作者:伍翀 (云邪) 整理:陈政羽(Flink 社区志愿者) Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink P ...

  3. 独家直播双十一全网动态?前黑客“劳改”带你玩转大数据

    独家直播双十一全网动态?前黑客"劳改"带你玩转大数据 发表于2015-11-24 10:26| 4044次阅读| 来源CSDN| 7 条评论| 作者蒲婧 CTO俱乐部CTOCTO讲 ...

  4. pyecharts对于经纬度_一文带你掌握Pyecharts地理数据可视化的方法

    本文主要介绍了Pyecharts地理数据可视化,分享给大家,具体如下: 一.Pyecharts简介和安装 1. 简介 Echarts 是一个由百度开源的数据可视化,凭借着良好的交互性,精巧的图表设计, ...

  5. ubuntun系统mysql数据库同步_Canal 实现 Mysql数据库实时数据同步

    简介 1.1 canal介绍 ​ Canal是一个基于MySQL二进制日志的高性能数据同步系统.Canal广泛用于阿里巴巴集团(包括https://www.taobao.com),以提供可靠的低延迟增 ...

  6. Canal 实现 Mysql数据库实时数据同步

    简介 1.1 canal介绍 Canal是一个基于MySQL二进制日志的高性能数据同步系统.Canal广泛用于阿里巴巴集团(包括https://www.taobao.com),以提供可靠的低延迟增量数 ...

  7. 基于debezium实时数据同步(Oracle为主)

    基于debezium实时数据同步 全部需要下载的内容链接 1.下载zookeeper-3.4.10 2.下载kafka_2.13-2.8.0 3.下载Kafka Connector:建议使用1.6以上 ...

  8. Oracle到MySQL实时数据同步CloudCanal实战

    简述 CloudCanal 2.1.0.x 版本开始支持 Oracle 作为源端的数据迁移同步能力,目前邀请测试中. 本文通过 Oracle 到 MySQL 的数据迁移同步案例简要介绍这个源端的能力. ...

  9. sersync 实现实时数据同步

    sersync 介绍 sersync类似于inotify,同样用于监控,但它克服了inotify的缺点. inotify最大的不足是会产生重复事件,或者同一个目录下多个文件的操作会产生多个事件,例如, ...

最新文章

  1. 区块链共识机制及其迭代
  2. 苹果自动驾驶部门「裁员」又「重组」,AI负责人接掌「泰坦」
  3. Manacher算法 , 实例 详解 . NYOJ 最长回文
  4. 让几个div靠外面容器底部对齐
  5. 路由器 radius认证获取ip_玩转网络工程师·认证篇
  6. 基于Xml 的IOC 容器-开始启动
  7. 深度学习与TensorFlow: VGG论文笔记
  8. 【李宏毅2020 ML/DL】P14 Tips for training DNN | 激活函数、Maxout、正则、剪枝 Dropout
  9. Atitit 自然语言与人工语言的语法构建ast的异同点 目录 1. 语言节点gaishu。。 2 1.1. 节点、函数数量大约200个 2 1.2. 关键词节点 是 有 的 3 1.3. 标识符
  10. OCR文字识别,PDF格式转换
  11. SpringBoot 微信H5支付
  12. java retainall_java 取交集方法retainAll
  13. kong翻译_最全的中国姓氏英文说法,你知道自己的姓氏怎么翻译吗?
  14. 解决多次点击出现蓝色背景
  15. 程序员慵懒的周末:不想出门还想吃肯德基香辣鸡翅?
  16. 加一度分享:快手PK抖音,谁更有优势
  17. 西北农林科技大学考研计算机大纲,2021年西北农林科技大学考研真题大纲参考书目...
  18. vscode 下载慢解决方法
  19. 有利润表模板的BI软件有哪些?
  20. iOS wifi 路由器 连接

热门文章

  1. html div转换表格,Dreamweaver页面布局:表格与Div的相互转换
  2. Win10系统通过VMware安装Centos7,部署KVM安装WIN7虚拟机
  3. 【基础网络】TCP与UDP 的区别
  4. pic单片机c语言读eeprom,PIC单片机的EEPROM读写实例
  5. 以optee的sign_encrypt.py为例讲解argparse命令解析模块
  6. html div 显示动画效果,css3-动画(animation)效果的实现
  7. OpenCV 地形粗糙度计算(基于DEM,C++版本)
  8. 项管(八)——十大管理过程的输入、输出、工具
  9. Centos 6.8安装open***.三种认证方式
  10. 那是什么进程 —— jusched.exe是什么? 它为何运行?