整理CDC捕获消息后发送到kafka各类消息格式
一、mysql-ogg 资料
delete
{"table":"TABLE","op_type":"D","op_ts":"TIME","current_ts":"TIME","pos":"00000000010000045759","before":{"c1": 1,"c2": "C2","c3": 0.56,"c4": 0.9526831,"c5": "2021-02-03","c6": 100,"c7": 100.125,"c8": "textcontent","c9": 0.12345,"c10": "2020-02-01","c11": 11,"c12": 12.12,"c13": "2021-02-13","c14": 14.14,"c15": "lastcolumn"}
}
insert
{"table":"TABLE","op_type":"I","op_ts":"TIME","current_ts":"TIME","pos":"00000000010000071669","after":{"c1": 1,"c2": "C2","c3": 0.56,"c4": 0.9526831,"c5": "2021-02-03","c6": 100,"c7": 100.125,"c8": "textcontent","c9": 0.12345,"c10": "2020-02-01","c11": 11,"c12": 12.12,"c13": "2021-02-13","c14": 14.14,"c15": "lastcolumn"}
}
update
{"table":"TABLE","op_type":"U","op_ts":"TIME","current_ts":"TIME","pos":"00000000010000071669","primary_keys":["c1"],"before":{"c1": 1,"c2": "C2","c3": 0.56,"c4": 0.9526831,"c5": "2021-02-03","c6": 100,"c7": 100.125,"c8": "textcontent","c9": 0.12345,"c10": "2020-02-01","c11": 11,"c12": 12.12,"c13": "2021-02-13","c14": 14.14,"c15": "lastcolumn"},"after":{"c1": 1,"c2": "C2","c3": 0.56,"c4": 0.9526831,"c5": "2021-02-03","c6": 100,"c7": 100.125,"c8": "textcontent","c9": 0.12345,"c10": "2020-02-01","c11": 11,"c12": 12.12,"c13": "2021-02-13","c14": 14.14,"c15": "lastcolumn111111111111"}
}
二、mysql-canal 资料
INSERT INTO order
(order_id, amount) VALUES ('10086', 999);
{"data": [{"id": "1","order_id": "10086","amount": "999.0","create_time": "2020-03-02 05:12:49"}],"database": "test","es": 1583143969000,"id": 3,"isDdl": false,"mysqlType": {"id": "BIGINT","order_id": "VARCHAR(64)","amount": "DECIMAL(10,2)","create_time": "DATETIME"},"old": null,"pkNames": ["id"],"sql": "","sqlType": {"id": -5,"order_id": 12,"amount": 3,"create_time": 93},"table": "order","ts": 1583143969460,"type": "INSERT"
}
UPDATE order
SET amount = 10087 WHERE order_id = '10086';
{"data": [{"id": "1","order_id": "10086","amount": "10087.0","create_time": "2020-03-02 05:12:49"}],"database": "test","es": 1583143974000,"id": 4,"isDdl": false,"mysqlType": {"id": "BIGINT","order_id": "VARCHAR(64)","amount": "DECIMAL(10,2)","create_time": "DATETIME"},"old": [{"amount": "999.0"}],"pkNames": ["id"],"sql": "","sqlType": {"id": -5,"order_id": 12,"amount": 3,"create_time": 93},"table": "order","ts": 1583143974870,"type": "UPDATE"
}
DELETE FROM order
WHERE order_id = '10086';
{"data": [{"id": "1","order_id": "10086","amount": "10087.0","create_time": "2020-03-02 05:12:49"}],"database": "test","es": 1583143980000,"id": 5,"isDdl": false,"mysqlType": {"id": "BIGINT","order_id": "VARCHAR(64)","amount": "DECIMAL(10,2)","create_time": "DATETIME"},"old": null,"pkNames": ["id"],"sql": "","sqlType": {"id": -5,"order_id": 12,"amount": 3,"create_time": 93},"table": "order","ts": 1583143981091,"type": "DELETE"
}
三、Oracle-ogg 资料
delete
{"table":"TABLE","op_type":"D","op_ts":"TIME","current_ts":"TIME","pos":"00000000010000045759","before": {"c1": 1,"c2": "C2","c3": 0.56,"c4": 0.9526831,"c5": "2021-02-03","c6": 100,"c7": 100.125,"c8": "textcontent","c9": 0.12345,"c10": "2020-02-01","c11": 11,"c12": 12.12,"c13": "2021-02-13","c14": 14.14,"c15": "lastcolumn"}
}
insert
{"table":"TABLE","op_type":"I","op_ts":"TIME","current_ts":"TIME","pos":"00000000010000071669","after":{"c1": 1,"c2": "C2","c3": 0.56,"c4": 0.9526831,"c5": "2021-02-03","c6": 100,"c7": 100.125,"c8": "textcontent","c9": 0.12345,"c10": "2020-02-01","c11": 11,"c12": 12.12,"c13": "2021-02-13","c14": 14.14,"c15": "lastcolumn"}
}
update
{"table":"TABLE","op_type":"U","op_ts":"TIME","current_ts":"TIME","pos":"00000000010000071669","before":{"c1": 1,"c2": "C2","c3": 0.56,"c4": 0.9526831,"c5": "2021-02-03","c6": 100,"c7": 100.125,"c8": "textcontent","c9": 0.12345,"c10": "2020-02-01","c11": 11,"c12": 12.12,"c13": "2021-02-13","c14": 14.14,"c15": "lastcolumn"},"after":{"c1": 1,"c2": "C2","c3": 0.56,"c4": 0.9526831,"c5": "2021-02-03","c6": 100,"c7": 100.125,"c8": "textcontent","c9": 0.12345,"c10": "2020-02-01","c11": 11,"c12": 12.12,"c13": "2021-02-13","c14": 14.14,"c15": "lastcolumn111111111111"}
}
四、Oracle-Kafka connect资料1 资料2
{"schema": {"type": "struct","fields": [{"type": "int64","optional": false,"field": "SCN"}, {"type": "string","optional": false,"field": "SEG_OWNER"}, {"type": "string","optional": false,"field": "TABLE_NAME"}, {"type": "int64","optional": false,"name": "org.apache.kafka.connect.data.Timestamp","version": 1,"field": "TIMESTAMP"}, {"type": "string","optional": false,"field": "SQL_REDO"}, {"type": "string","optional": false,"field": "OPERATION"}, {"type": "struct","fields": [{"type": "double","optional": false,"field": "ID"}, {"type": "string","optional": true,"field": "NAME"}, {"type": "int64","optional": true,"name": "org.apache.kafka.connect.data.Timestamp","version": 1,"field": "CREATETIME"}],"optional": true,"name": "value","field": "data"}, {"type": "struct","fields": [{"type": "double","optional": false,"field": "ID"}, {"type": "string","optional": true,"field": "NAME"}, {"type": "int64","optional": true,"name": "org.apache.kafka.connect.data.Timestamp","version": 1,"field": "CREATETIME"}],"optional": true,"name": "value","field": "before"}],"optional": false,"name": "zztar.whtest.t1.row"},"payload": {"SCN": 2847668,"SEG_OWNER": "WHTEST","TABLE_NAME": "T1","TIMESTAMP": 1600829206000,"SQL_REDO": "insert into \"WHTEST\".\"T1\"(\"ID\",\"NAME\",\"CREATETIME\") values (557005146,'533888119 ',TIMESTAMP ' 2020-09-23 10:46:46')","OPERATION": "INSERT","data": {"ID": 5.57005146E8,"NAME": "533888119","CREATETIME": 1600829206000},"before": null}
}
{"schema": {"type": "struct","fields": [{"type": "int64","optional": false,"field": "SCN"}, {"type": "string","optional": false,"field": "SEG_OWNER"}, {"type": "string","optional": false,"field": "TABLE_NAME"}, {"type": "int64","optional": false,"name": "org.apache.kafka.connect.data.Timestamp","version": 1,"field": "TIMESTAMP"}, {"type": "string","optional": false,"field": "SQL_REDO"}, {"type": "string","optional": false,"field": "OPERATION"}, {"type": "struct","fields": [{"type": "double","optional": false,"field": "ID"}, {"type": "string","optional": true,"field": "NAME"}, {"type": "int64","optional": true,"name": "org.apache.kafka.connect.data.Timestamp","version": 1,"field": "CREATETIME"}],"optional": true,"name": "value","field": "data"}, {"type": "struct","fields": [{"type": "double","optional": false,"field": "ID"}, {"type": "string","optional": true,"field": "NAME"}, {"type": "int64","optional": true,"name": "org.apache.kafka.connect.data.Timestamp","version": 1,"field": "CREATETIME"}],"optional": true,"name": "value","field": "before"}],"optional": false,"name": "zztar.whtest.t1.row"},"payload": {"SCN": 2847668,"SEG_OWNER": "WHTEST","TABLE_NAME": "T1","TIMESTAMP": 1600829206000,"SQL_REDO": "update \"WHTEST\".\"T1\"(\"ID\",\"NAME\",\"CREATETIME\") values (123,'123',TIMESTAMP '2020-09-23 10:46:46')","OPERATION": "UPADTE","data": {"ID": 123,"NAME": "123","CREATETIME": 1600829206000},"before": {"ID": 5.57005146E8,"NAME": "533888119","CREATETIME": 1600829206000}}
}
五、postgresql 资料
test=# INSERT INTO test_table (id, code) VALUES('id1', 'code1'); INSERT 0 1
{"change": [{"kind": "insert","schema": "mdmv2","table": "test_table","columnnames": ["id", "code"],"columntypes": ["character(10)", "character(10)"],"columnvalues": ["id1", "code1"]}]
}
test=# update test_table set code='code2' where id='id1'; UPDATE 1
{"change": [{"kind": "update","schema": "mdmv2","table": "test_table","columnnames": ["id", "code"],"columntypes": ["character(10)", "character(10)"],"columnvalues": ["id1", "code2"],"oldkeys": {"keynames": ["id"],"keytypes": ["character(10)"],"keyvalues": ["id1"]}}]
}
test=# delete from test_table where id='id1'; DELETE 1
{"change": [{"kind": "delete","schema": "mdmv2","table": "test_table","oldkeys": {"keynames": ["id"],"keytypes": ["character(10)"],"keyvalues": ["id1"]}}]
}
ALTER TABLE test_table REPLICA IDENTITY USING INDEX test_table_pkey;
{"change": [{"kind": "update","schema": "mdmv2","table": "test_table","columnnames": ["id", "code"],"columntypes": ["character(10)", "character(10)"],"columnvalues": ["id1", "code2"],"oldkeys": {"keynames": ["id"],"keytypes": ["character(10)"],"keyvalues": ["id1 "]}}]
}
六、sqlserver 资料
insert into [test_table] (UserName,IsOnline,LastLogin) values('测试', 1, DATEDIFF(s, '19700101',GETDATE()))
{"Id": 5,"UserName": "测试","IsOnline": true,"LastLogin": 1540635666,"_cdc_metadata": {"sys_change_operation": "I","sys_change_creation_version": "13","sys_change_version": "13","databaseName": "tgstat_ddztest","schemaName": "dbo","tableName": "test_online"}
}
update test_online Set UserName='tom' where UserName='测试'
{"Id": 5,"UserName": "tom","IsOnline": true,"LastLogin": 1540635666,"_cdc_metadata": {"sys_change_operation": "U","sys_change_creation_version": "0","sys_change_version": "14","databaseName": "tgstat_ddztest","schemaName": "dbo","tableName": "test_online"}
}
Delete test_online Where UserName='tom'
null
整理CDC捕获消息后发送到kafka各类消息格式相关推荐
- python发送微信消息_python 发送QQ或者微信消息
该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 废话少说,先上代码: # coding = utf-8 import win32gui import win32api import win32con i ...
- 骑士卡:基于Kafka搭建消息中心,上亿消息推送轻松完成
全球购骑士卡是国内领先的会员制特权电商平台,汇聚国内外"吃喝玩乐买"超 300 项会员专属优惠特权.全球购骑士卡基于移动互联生活方式,打通线上.线下消费场景,汇集时下热门.高频的商 ...
- KAFKA分布式消息系统
Kafka[1]是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录.浏览.点击.分享.喜欢)以及系统运行日志(CPU ...
- kafka 消费者消息确认_Kafka整体架构图解
Java识堂,一个高原创,高收藏,有干货的微信公众号,一起成长,一起进步,欢迎关注 1.概述 Apache Kafka最早是由LinkedIn开源出来的分布式消息系统,现在是Apache旗下的一个子项 ...
- Kafka 的消息异常情况~追日
Kafka 的消息异常情况 1.消息丢失情况 消息发送端 Producer (1) acks=0: 表示 Producer 不需要等待任何 broker 确认收到消息的回复, 就可以继续发送下一条消息 ...
- 消息队列MQ 之 Kafka
目录 前言 一.消息队列 MQ 为什么需要消息队列(MQ) 使用消息队列的好处 消息队列的两种模式 二.Kafka 概述 Kafka 简介 Kafka 的特性 三 实验 前言 一.消息队列 MQ MQ ...
- Kafka实现消息生产和消费
文章目录 一.Kafka测试消息生产与消费 二.Java程序进行Kafka收发消息 1.消息生产者 2.消息消费者 一.Kafka测试消息生产与消费 # 首先创建一个主题 [root@192 kafk ...
- 分布式消息队列 NSQ 和 Kafka 对比
谈谈分布式消息队列的一些特性,比较两种比较常用的消息队列--NSQ和Kafka 1 消息队列的作用 解耦,将一个流程加入一层数据接口拆分成两个部分,上游专注通知,下游专注处理 缓冲,应对流量的突然上涨 ...
- 微信公众号消息模板发送
微信公众号消息模板发送 微信公众号消息模板群发功能 开通申请模板消息功能 获取模板消息发送所需参数 AccessToken pojo类 TemplateData pojo类 WxTagsEntity ...
最新文章
- 如何识别 Linux 上的文件分身
- 解决selenium报错--unknown error: DevToolsActivePort file doesn‘t exist
- 运行Python的三种方法
- 计算机联锁仿真软件设计,一种基于LabVIEW的计算机联锁仿真系统的制作方法
- 编程笔试(解析及代码实现):字符串反转(字符串逆序输出)代码实现十多种方法对比(解析思路及其耗费时间)详细攻略
- 「12306奇葩验证码」反例背后的产品观
- 网易云信阙杭宁:通过IM云让开发者共享网易经验
- Excel中将时间格式转化成时间戳格式
- scala一些奇怪的操作符的效果(持续更新中)
- 【Python】Matplotlib绘制极坐标散点图
- python不同数据类型的式子_Python 基础篇:数据类型、数据运算、表达
- htons、inet_addr、
- 《代码的未来》读书笔记:内存管理与GC那点事儿
- ubuntu 搭建 php 环境
- 链接Linux工具(SecureCRT)
- abb机器人伺服电机报闸是什么_ABB机器人伺服电机维修常见4大故障处理
- 北京市三级医院电话预约挂号一览表
- 【GMS认证】关于XTS命令总结
- Java中文加密解密
- php slug,php 中的var