confluent mysql_confluent中的connect
背景
(1) 接近实时的数据计算都基于kafka(spark【doing】、flink【todo】)且spark提供sql相关操作接口
(2) 不同类型数据库提供查询接口多、异;(之前尝试直接通过接口直接再不同存储间读写,不走kafka,痛苦)
(3) 知识背景:主线(mysql:SQL、索引、CAP),多条辅线(各集群参数调优、锁机制等)
接触confluent,偶然意识:将数据先入kafka,通过sql进行处理,再写目标数据库,各种不同连接器,使得kafka很灵活,一定程度简化未知的庞大的知识脉络;
暂定的知识脉络
kafka-connect
Connectors -->Tasks --> Workers -- >Converters (抽象到具体)Connector:负责任务调度进行数据由源到目标的转换,Connector大体有两类:Source(其他db数据->kafka);Sink(数据从kafka到其他db)两类Task:一个Connector涉及多个Task协调进行工作Worker:具体的任务执行Converter:数据格式转化,confluent默认提供如下四种格式的数据转化:AvroConverter、JsonConverter、StringConverter、ByteArrayConverterConnect操作
实际操作涉及:配置文件修改+topic创建+启动connect
配置文件修改worker.properties(以connect-standalone.properties为例)bootstrap.servers=XXXX
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path = /usr/share/confluent-hub-components
plugin.path=/usr/share/java,/usr/share/confluent-hub-components
# 如下两个配置用于RestAPI进行链接器信息访问
rest.host.name=XXXX
rest.port=XXX
配置文件修改:source.properties(以mysql为例)name=XXXXconnector.class=io.confluent.connect.jdbc.JdbcSourceConnectortasks.max=2 #connector.class 不同数据库source、sink的类不同connection.url=jdbc:mysql://IP:端口/库名?user=用户名&password=密码mode=incrementingincrementing.column.name=id # 自增主键列名topic.prefix=test-mysql-jdbc- # kafka中topic前缀,拼接表名,构成真实topic配置文件修改:sink.properties(以es为例)name=XXXXconnector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnectortasks.max=1topics=test-elasticsearch-sink # kafka中的topic名字,对应ES的index名字key.ignore=trueconnection.url=http://IP:port # ES访问的IP、端口type.name=kafka-connect备注:如ES中的index不存在,自动会创建,无需定义mapping;topic创建:将配置文件中涉及到的topic提前进行创建启动connect:将涉及到的配置拼接到命令行后,下面展示standalone模式下的启动:connect-standalone worker.properties source.properties sink.properties
KSQL
hive, spark, flink都有sql支持入口,kafka也有sql,所以一定要掌握好sql
confluent mysql_confluent中的connect相关推荐
- ORA-01436: 用户数据中的CONNECT BY 循环
起始地 目的地 距离(公里) A B 1000 A C 1100 A ...
- 4.QT4中的connect的实现
信号槽通过connect进行连接,connect的源码在qobject.cpp中 QT4中的connect的声明如下 static QMetaObject::Connection connect(co ...
- ITK:在二进制映像中标记Connect组件
ITK:在二进制映像中标记Connect组件 内容提要 C++实现代码 内容提要 在二进制映像中标记连接的组件. C++实现代码 #include "itkLiThresholdImageF ...
- 数据库异常---ORA-01436: 用户数据中的 CONNECT BY loop in user data 循环
数据库 ORA-01436: 用户数据中的 CONNECT BY loop in user data 循环 技术qq交流群:JavaDream:251572072 教程下载,在线交流:创梦IT社 ...
- QT下信号与槽不在同一个线程中如何connect
QT下信号与槽不在同一个线程中如何connect
- 5.QT5中的connect的实现
在QT4中,解析信号槽是通过将信号槽的名字转化为字符串,然后通过connect解析该字符串,得到信号函数的相对序号和,然后创建信号connectionlist,但是,所有的检查都在运行时,通过解析字符 ...
- react-redux中的connect
将redux中的state与react中组件中的属性或者方法作绑定. 1.createConnect 作为高阶函数,创建Connect. function createConnect({connect ...
- UDP socket编程中使用connect
转自:http://hi.baidu.com/rwen2012/item/545a39ba741307d085dd7957 标准的udp客户端开了套接口后,一般使用sendto和recvfrom函数来 ...
- 在oracle中通过connect by prior来实现递归查询!
connect by 是结构化查询中用到的,其基本语法是: select ... from tablename start by cond1 connect by cond2 where cond3; ...
最新文章
- AD环境部署文件服务器2012,Windows_server_2012部署AD域及辅域环境.doc
- 关于jquery.AutoComplete插件的一些使用心得(编码问题,效率问题)
- PHP金额计算高精度函数
- SpringMVC自动配置
- python连接数据库oracle_python连接oracle数据库
- 统计二进制中1的个数(四种方案)
- leetcode字节跳动探索
- 容器技术Docker K8s 1 云原生技术概述
- 海信信号机后台服务器配置,海信自适应交通信号控制系统解决方案
- 建筑专业规范大全 2020版_房屋建筑工程现行规范标准目录汇编(2020版)—防水工程...
- 类库、框架、模块、组件等概念介绍
- app瘦身值图片压缩
- 《熊出没原始时代》总导演丁亮:爱与勇气 穿越古今
- 代码审计jizhiCMS 后台getshell
- oracle怎么加上双引号,Oracle中的双引号的作用
- Python小项目(学生成绩管理系统)7.排序、显示部分
- mysql删除视图sql语句_怎么样删除视图中的全部数据 用SQL语言编写。
- Delphi的多线程开发实例
- 练习:JSP相关试题
- 分享32套精美的免费 PSD 网页界面设计素材