一、应用场景

  • 1、事件触发
    数据库的触发器(trigger)能够一定程度上实现我们需要的事件触发,但通常需要配合定时任务才能实现。
  • 2、数据同步
    典型的解决方法有
    • 双写
    • 变更数据抓取(change data capture, CDC)
    其中变更数据抓取既能解决事件触发的问题也可以很好的解决数据同步的问题

二、Debezium

Debezium 是 Redhat 开源的数据变更抓取组件,其利用了 Kafka Connect 所以拥有高可用与开箱即用的调度接口,Debezium的Snapshot Mode 可以 initial 设置为将表中的现有数据全部导入 Kafka也可以设置为schema_only只获取增量数据,并且全量数据与增量数据形式一致,可以统一处理。
目前支持MySQL、MongoDB、PostgreSQL、Oracle、SQL Server、Db2等数据库。

三、Debezium-Oracle

基于Oracle 数据库的日志模式下可使用LogMiner和XStream进行日志解析,默认使用LogMiner。
官网:https://debezium.io/
docker-compose 启动参考: https://github.com/debezium/debezium-examples/tree/master/tutorial

  1. 下载 Oracle Instant Client
    链接: https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html
  2. docker-compose.yaml
    可根据官方docker-compose-oracle.yaml自定义docker-compose.yaml
version: '3.7'
services:connect:image: debezium/connect-with-oracle-jdbc:1.6build:context: debezium-with-oracle-jdbcargs:DEBEZIUM_VERSION: 1.6ports:- 8083:8083- 5005:5005environment:- BOOTSTRAP_SERVERS=192.168.51.43:9092- GROUP_ID=1- CONFIG_STORAGE_TOPIC=debezium_connect_configs- OFFSET_STORAGE_TOPIC=debezium_connect_offsets- STATUS_STORAGE_TOPIC=debezium_connect_statuses- LD_LIBRARY_PATH=/instant_clientvolumes:- "/opt/dockerfile/debezium/config:/kafka/config"
  1. docker-compose.yaml同级目录下创建debezium-with-oracle-jdbc文件夹,并在debezium-with-oracle-jdbc下创建oracle_instantclient文件夹

  2. 解压下载的Oracle Instant Client到 debezium-with-oracle-jdbc/oracle_instantclient

  3. 在 debezium-with-oracle-jdbc下创建 Dockerfile

FROM debezium/connect:1.6
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
ENV INSTANT_CLIENT_DIR=/instant_client/USER root
RUN yum -y install libaio && yum clean allUSER kafka
# Deploy Oracle client and driversCOPY oracle_instantclient/* $INSTANT_CLIENT_DIR
COPY oracle_instantclient/xstreams.jar /kafka/libs
COPY oracle_instantclient/ojdbc8.jar /kafka/libs

如debezium无法完成Kafka topic的自动创建,请检查kafka的配置或使用以下命令创建TOPIC

./kafka-topics.sh --create --topic `topic` --replication-factor 1 --partitions 1 --zookeeper `zk_ip:zk_port` --config cleanup.policy=compact
  1. 创建专用只读用户
    因为这里oracle用的是11g,官方文档的创建用户并附权仅支持12c及以上的版本,经过一段时间的摸索11g的用户权限如下
create user debezium_readuser identified by 123456;grant create session to debezium_readuser;
grant flashback any table to debezium_readuser;
grant select any table to debezium_readuser;
grant select_catalog_role to debezium_readuser;
grant execute_catalog_role to debezium_readuser;
grant select any transaction to debezium_readuser;
grant create table to debezium_readuser;
grant lock any table to debezium_readuser;
grant alter any table to debezium_readuser;
grant create sequence to debezium_readuser;
grant resource to debezium_readuser;
  1. 序列化
    Debezium默认使用Kafka Connect 提供的一个 JSON 转换器可将记录键和值序列化为 JSON 文档。默认行为是 JSON 转换器包含记录的消息模式、表结构、创建表语句、字段类型等非必要信息,这使得每条记录都非常冗长变得难以解析。
{"schema": {"type": "struct","fields": [{"type": "struct","fields": [{"type": "string","optional": true,"field": "CUST_TAX_CODE"},{"type": "string","optional": true,"field": "CUST_NAME"},{"type": "struct","fields": [{"type": "int32","optional": false,"field": "scale"},{"type": "bytes","optional": false,"field": "value"}],"optional": true,"name": "io.debezium.data.VariableScaleDecimal","version": 1,"doc": "Variable scaled decimal","field": "BILLING_MACHINE_NO"},{"type": "int64","optional": true,"name": "io.debezium.time.Timestamp","version": 1,"field": "CHARGE_END_DATE"},{"type": "string","optional": true,"field": "TAX_AUTHORITY_NAME"},{"type": "string","optional": true,"field": "DEPARTMENT_NAME"},{"type": "string","optional": true,"field": "ADDR"},{"type": "string","optional": true,"field": "CONTACT"},{"type": "string","optional": true,"field": "MOBILE"},{"type": "string","optional": true,"field": "TEL"},{"type": "struct","fields": [{"type": "int32","optional": false,"field": "scale"},{"type": "bytes","optional": false,"field": "value"}],"optional": true,"name": "io.debezium.data.VariableScaleDecimal","version": 1,"doc": "Variable scaled decimal","field": "IDX"},{"type": "int64","optional": true,"name": "io.debezium.time.Timestamp","version": 1,"field": "INVOICE_DATE"},{"type": "int32","optional": true,"field": "AMMONT"}],"optional": true,"name": "CRM.CRM.TEMP.Value","field": "before"},{"type": "struct","fields": [{"type": "string","optional": true,"field": "CUST_TAX_CODE"},{"type": "string","optional": true,"field": "CUST_NAME"},{"type": "struct","fields": [{"type": "int32","optional": false,"field": "scale"},{"type": "bytes","optional": false,"field": "value"}],"optional": true,"name": "io.debezium.data.VariableScaleDecimal","version": 1,"doc": "Variable scaled decimal","field": "BILLING_MACHINE_NO"},{"type": "int64","optional": true,"name": "io.debezium.time.Timestamp","version": 1,"field": "CHARGE_END_DATE"},{"type": "string","optional": true,"field": "TAX_AUTHORITY_NAME"},{"type": "string","optional": true,"field": "DEPARTMENT_NAME"},{"type": "string","optional": true,"field": "ADDR"},{"type": "string","optional": true,"field": "CONTACT"},{"type": "string","optional": true,"field": "MOBILE"},{"type": "string","optional": true,"field": "TEL"},{"type": "struct","fields": [{"type": "int32","optional": false,"field": "scale"},{"type": "bytes","optional": false,"field": "value"}],"optional": true,"name": "io.debezium.data.VariableScaleDecimal","version": 1,"doc": "Variable scaled decimal","field": "IDX"},{"type": "int64","optional": true,"name": "io.debezium.time.Timestamp","version": 1,"field": "INVOICE_DATE"},{"type": "int32","optional": true,"field": "AMMONT"}],"optional": true,"name": "CRM.CRM.TEMP.Value","field": "after"},{"type": "struct","fields": [{"type": "string","optional": false,"field": "version"},{"type": "string","optional": false,"field": "connector"},{"type": "string","optional": false,"field": "name"},{"type": "int64","optional": false,"field": "ts_ms"},{"type": "string","optional": true,"name": "io.debezium.data.Enum","version": 1,"parameters": {"allowed": "true,last,false"},"default": "false","field": "snapshot"},{"type": "string","optional": false,"field": "db"},{"type": "string","optional": true,"field": "sequence"},{"type": "string","optional": false,"field": "schema"},{"type": "string","optional": false,"field": "table"},{"type": "string","optional": true,"field": "txId"},{"type": "string","optional": true,"field": "scn"},{"type": "string","optional": true,"field": "commit_scn"},{"type": "string","optional": true,"field": "lcr_position"}],"optional": false,"name": "io.debezium.connector.oracle.Source","field": "source"},{"type": "string","optional": false,"field": "op"},{"type": "int64","optional": true,"field": "ts_ms"},{"type": "struct","fields": [{"type": "string","optional": false,"field": "id"},{"type": "int64","optional": false,"field": "total_order"},{"type": "int64","optional": false,"field": "data_collection_order"}],"optional": true,"field": "transaction"}],"optional": false,"name": "CRM.CRM.TEMP.Envelope"},"payload": {"before": {"CUST_TAX_CODE": "1","CUST_NAME": "1111safdadsa","BILLING_MACHINE_NO": null,"CHARGE_END_DATE": null,"TAX_AUTHORITY_NAME": null,"DEPARTMENT_NAME": null,"ADDR": null,"CONTACT": null,"MOBILE": null,"TEL": null,"IDX": null,"INVOICE_DATE": null,"AMMONT": null},"after": {"CUST_TAX_CODE": "11","CUST_NAME": "1111safdadsa","BILLING_MACHINE_NO": null,"CHARGE_END_DATE": null,"TAX_AUTHORITY_NAME": null,"DEPARTMENT_NAME": null,"ADDR": null,"CONTACT": null,"MOBILE": null,"TEL": null,"IDX": null,"INVOICE_DATE": null,"AMMONT": null},"source": {"version": "1.6.1.Final","connector": "oracle","name": "CRM","ts_ms": 1630709790000,"snapshot": "false","db": "CRM","sequence": null,"schema": "CRM","table": "TEMP","txId": "05000d00b12d0200","scn": "94815695","commit_scn": "94827647","lcr_position": null},"op": "u","ts_ms": 1630681011434,"transaction": null}
}

这时可以在debezium的配置文件 connect-distributed.properties 中设置

key.converter.schemas.enable=false
value.converter.schemas.enable=false

完整的connect-distributed.properties配置文件

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=falseoffset.storage.replication.factor=1
#offset.storage.partitions=25offset.flush.interval.ms=60000plugin.path=/kafka/connect
task.shutdown.graceful.timeout.ms=10000
offset.flush.timeout.ms=5000
value.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

官方序列化文档:https://debezium.io/documentation/reference/1.6/configuration/avro.html
修改配置之后

{"before": {"CUST_TAX_CODE": "112","CUST_NAME": "1111safdadsa","BILLING_MACHINE_NO": null,"CHARGE_END_DATE": null,"TAX_AUTHORITY_NAME": null,"DEPARTMENT_NAME": null,"ADDR": null,"CONTACT": null,"MOBILE": null,"TEL": null,"IDX": null,"INVOICE_DATE": null,"AMMONT": 1},"after": {"CUST_TAX_CODE": "1123","CUST_NAME": "1111safdadsa","BILLING_MACHINE_NO": null,"CHARGE_END_DATE": null,"TAX_AUTHORITY_NAME": null,"DEPARTMENT_NAME": null,"ADDR": null,"CONTACT": null,"MOBILE": null,"TEL": null,"IDX": null,"INVOICE_DATE": null,"AMMONT": 1},"source": {"version": "1.6.1.Final","connector": "oracle","name": "CRM","ts_ms": 1630710669000,"snapshot": "false","db": "CRM","sequence": null,"schema": "CRM","table": "TEMP","txId": "060006005cf70100","scn": "94909708","commit_scn": "94909733","lcr_position": null},"op": "u","ts_ms": 1630681870103,"transaction": null
}

Kafka 的topic格式为database.server.name
debezium-oracle连接器映射数字类型
https://debezium.io/documentation/reference/1.6/connectors/oracle.html#oracle-numeric-types
解析映射的数字类型
https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation

RESTFul

创建Connector 配置项可参考 https://debezium.io/documentation/reference/1.6/connectors/oracle.html

POST http://192.168.51.72:8083/connectors/
{"name": "test","config": {"connector.class" : "io.debezium.connector.oracle.OracleConnector","tasks.max" : "1","database.server.name" : "CRM","database.hostname" : "db_host","database.port" : "db_port","database.user" : "db_user","database.password" : "db_password","database.dbname" : "db_name","database.history.kafka.bootstrap.servers" : "kafka_ip:kafka_port","database.history.kafka.topic": "schema-changes.inventory","schema.include.list": "CRM","table.include.list": "CRM.TABLE1","snapshot.mode": "schema_only","key.converter.schemas.enable": false,"value.converter.schemas.enable": false}
}
  • 查询所有Connector
GET http://192.168.51.72:8083/connectors/
  • 删除指定Connector
DELETE http://192.168.51.72:8083/connectors/test
  • 查询Connector状态
GET http://192.168.51.72:8083/connectors/test/status

异常处理:

出现异常 io.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'alter table SYSTEM.LOGMNR_ATTRCOL$ exchange partition P106 with table SYS.LOGMNRLT_106_ATTRCOL$ excluding indexes without validation;'\nmismatched input 'with' expecting {'DISABLE', 'ENABLE', ';'}

{"name": "test","connector": {"state": "RUNNING","worker_id": "175.66.31.2:8083"},"tasks": [{"id": 0,"state": "FAILED","worker_id": "175.66.31.2:8083","trace": "org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)\n\tat io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:211)\n\tat io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:63)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:159)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:122)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: io.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'alter table SYSTEM.LOGMNR_ATTRCOL$ exchange partition P106 with table SYS.LOGMNRLT_106_ATTRCOL$ excluding indexes without validation;'\nmismatched input 'with' expecting {'DISABLE', 'ENABLE', ';'}\n\tat io.debezium.antlr.ParsingErrorListener.syntaxError(ParsingErrorListener.java:43)\n\tat org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41)\n\tat org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:544)\n\tat org.antlr.v4.runtime.DefaultErrorStrategy.reportInputMismatch(DefaultErrorStrategy.java:327)\n\tat org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:139)\n\tat io.debezium.ddl.parser.oracle.generated.PlSqlParser.alter_table(PlSqlParser.java:64113)\n\tat io.debezium.ddl.parser.oracle.generated.PlSqlParser.unit_statement(PlSqlParser.java:2302)\n\tat io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:68)\n\tat io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:32)\n\tat io.debezium.antlr.AntlrDdlParser.parse(AntlrDdlParser.java:82)\n\tat io.debezium.connector.oracle.antlr.OracleDdlParser.parse(OracleDdlParser.java:63)\n\tat io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:81)\n\tat io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:297)\n\tat io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.processResult(LogMinerQueryResultProcessor.java:169)\n\tat io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:187)\n\t... 8 more\nCaused by: org.antlr.v4.runtime.InputMismatchException\n\tat org.antlr.v4.runtime.DefaultErrorStrategy.sync(DefaultErrorStrategy.java:270)\n\tat io.debezium.ddl.parser.oracle.generated.PlSqlParser.alter_table(PlSqlParser.java:64052)\n\t... 17 more\n"}],"type": "source"
}

解决办法:删除debezium设置的CONFIG_STORAGE_TOPIC、OFFSET_STORAGE_TOPIC、STATUS_STORAGE_TOPIC 对应Kafka的topic,重新启动即可

基于Debezium 1.6和Oracle 11g 的 Debezium-Oracle实战相关推荐

  1. oracle 11g 组合分区,Oracle数据库

    数据库分区是每种数据库都需具备的关键功能之一.几乎所有的Oracle数据库都使用分区功能来提高查询的性能,Oracle 11g分区功能可以简化数据库的日常管理维护工作,大大减轻了DBA(数据库设计和管 ...

  2. oracle 11g r2 价格,oracle 11gr2

    <Oracle 11g R2 DBA操作指南>是一本全面讲述Oracle 11g R2数据库系统管理的图书,之所以选择Oracle 11g R2这个版本是因为Oracle已经不再提供对Or ...

  3. 在Oracle 11g中用看Oracle的共享内存段---------IPCS

    很早之前,在一次讲课了,用了命令ipcs,发现oracle的共享内段好小,如下: oracle@mydb ~]$ ipcs -a------ Shared Memory Segments ------ ...

  4. oracle 11g goldengate与oracle 11g数据同步

    1.下载,安装goldengate软件(两个节都需要安装) glodengate下载地址:http://pan.baidu.com/s/1pLtVTJt 密码:exob [oracle@oracleo ...

  5. oracle rac添加用户组,oracle 11g rac 与 oracle 10 rac所需要建立的组和用户

    oracle 11g rac配置 1. Create OS groups using the command below. Enter these commands as the 'root' use ...

  6. Oracle 11g ora 15018,oracle 11gR2 RAC root.sh 错误 ORA-15072 ORA-15018

    Oracle 11gR2 RAC安装Clusterware结束, 在第二个节点执行root.sh脚本的是报如下错误: DiskGroup DATA1 creation failed with the ...

  7. oracle 11g 使用图解,oracle 11g adrci 工具使用方法

    oracle 11g adrci  是11g 以后才能的新功能 [oracle@rac1 ~]$ adrci ADRCI: Release 11.2.0.4.0 - Production on Mon ...

  8. oracle 11g(四)给oracle添加为系统服务(脚本)

    编写oracle服务的运行脚本 [root@server ~]#vi /etc/init.d/oracle   #!/bin/bash # chkconfig: 2345 99 10 # descri ...

  9. oracle 11g job创建,Oracle 11g 手动创建库完整实施过程

    Oracle数据库除了使用DBCA创建数据库的方式外,还可以使用命令手动创建数据库.本文为Oracle 11g手动创建数据库的完整过程,供大家参考使用. 1.修改oracle用户环境变量 $ vim ...

  10. oracle 11g rman catalog,Oracle 11g RMAN复制数据库的测试

    RMAN支持两种类型复制:活动数据库复制和基于备份的复制,主要用来建立测试库.分别进行测试 我们把要复制的数据库称为源数据库(SOURCE DB=orcl),复制后的数据库成为目标数据库(TARGET ...

最新文章

  1. BCH踏着优化升级路线,在数字货币界声名鹊起
  2. 计算机应用基础东师,2018年东师计算机应用基础.doc
  3. resin启动时报错com.caucho.config.LineConfigException的解决
  4. Spring Boot 集成 MyBatis 与 c3p0
  5. C#4.0 命名参数可选参数
  6. 未来的信息化,就是挖掘企业数据、提升战略决策
  7. Markdown 使用感受
  8. 1133 Splitting A Linked List
  9. Unity3D TextMeshPro
  10. 道德经和译文_道德经 - 道德经全文及译文 - 道德经全文 - 老子道德经
  11. 微信模板消息发送帮助类
  12. 倒立摆控制系统matlab,单轴倒立摆控制系统设计及Matlab仿真毕业设计论文(资料4)...
  13. 我和宁夏日报 【白述礼】
  14. 传统 Java 网站如何实现容器化?看看 Cars.com 如何玩转 Docker!
  15. 关于BigDecimal.ROUND_HALF_UP与ROUND_HALF_DOWN
  16. Allegro如何导入高清Logo、二维码、防静电标识等图片以及汉字
  17. 华为云物联网平台的微信小程序开发
  18. 非极大值抑制算法(Non-Maximum Suppression,NMS)
  19. 计算机犯罪保护现场,计算机犯罪现场.ppt
  20. 单品销量破百万+,登顶天猫类目第一!摇滚动物园的爆品打造攻略你学会了吗?

热门文章

  1. nit报名费用计算机基础,2015年NIT计算机应用基础考试:Windows基本操作(Windows 7)...
  2. 我的世界服务器核心文件,minecraft1.9服务端核心Paper
  3. iphonex蓝牙打不开转圈_苹果6 plus蓝牙打不开总转圈怎么处理?
  4. 软式棒垒球运动进校园
  5. 美团红包饿了么红包CPS小程序+ H5 +推出外卖红包应用,带有后台代码,安装超级简单-源码
  6. 根据主机名查询本机的ip地址
  7. BI神器Power Query(14)-- PQ制作时间维度表(3)
  8. good site for studing English
  9. 电脑C盘爆满了怎么办
  10. 原来贝叶斯统计分析这么简单?这个技巧了解一下