kafka sink mysql,kafka之七 sinkTask详解
使用kafka connector 功能实现一个数据从kafka到MySQL的sinkTask
一:实现JdbcSinkConnector类
public class JdbcSinkConnector extends SinkConnector{
private String url;
private String driven;
private String userName;
private String passwd;
public void start(Map props) {
this.url = PropertyVerify.getOrElse(props, Constant_Global.URL, "jdbc:mysql://localhost/test", "'URL' is null");
this.driven = PropertyVerify.getOrElse(props, Constant_Global.DRIVEN, "com.mysql.jdbc.Driver", "'DRIVEN' is null");
this.userName = PropertyVerify.getOrElse(props, Constant_Global.USERNAME, "root", "'USERNAME' is null");
this.passwd = PropertyVerify.getOrElse(props, Constant_Global.PASSED, "root", "'PASSED' is null");
}
public Class extends Task> taskClass() {
return JdbcSinkTask.class;
}
public List> taskConfigs(int maxTasks) {
ArrayList> configs = new ArrayList<>();
for(int i=0;i
Map conf = new HashMap();
conf.put("url", url);
conf.put("driven", driven);
conf.put("userName", userName);
conf.put("passwd", passwd);
configs.add(conf);
}
return configs;
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void stop() {
// TODO Auto-generated method stub
}
}二:实现JdbcSinkConnector类
public class JdbcSinkTask extends SinkTask{
private static final Logger LOG = LoggerFactory.getLogger(JdbcSinkTask.class);
//private Connection conn = null;
public String shcema;
private JdbcDbWriter writer;
@Override
public String version() {
return new JdbcSinkConnector().version();
}
@Override
public void flush(Map map) {
LOG.info("================flush Map start................===========================================================");
}
@Override
public void put(Collection sinkRecords) {
if(sinkRecords.isEmpty()){
return;
}
try {
writer.write(sinkRecords,shcema,email);
} catch (SQLException | IOException e) {
try {
EmailUtil.init(Constant_Global.STMP, Constant_Global.EMAILUSER, Constant_Global.EMAILPASSWD, Constant_Global.EMAILTITAL, Constant_Global.EMAILADREE, email);
EmailUtil.send(" kafka sink 数据写入有问题 ");
} catch (MessagingException e1) {
e1.printStackTrace();
}
throw new JDBCConntorException("数据写入有问题");
}
}
@Override
public void start(Map pro) {
try {
DbPool.init(pro);
writer =new JdbcDbWriter();
} catch (PropertyVetoException e1) {
e1.printStackTrace();
LOG.info("数据库配置异常=====");
}
}
@Override
public void stop() {
}
}三 :打包运行
3.1 单机版运行,配置文件在kafka/config目录下
a: cp connect-file-sink.properties connect-jdbc-sink.properties
b: vim connect-jdbc-sink.properties 配置如下
# kafka connector properties
name=canal-sink-connector #定义task名称
connector.class=com.trcloud.hamal.sink.jdbc.JdbcSinkConnector #定义自己打包中的类
tasks.max=1 #task个数
topics=words-out1 #消费的topic
url=jdbc:mysql://172.30.50.213/test #数据库参数
driven=com.mysql.jdbc.Driver #数据库驱动
userName=root #数据库用户名
passwd=123456 #数据库密码 c: 启动命令
./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-jdbc-sink.properties
3.2 单机版用于测试,生产环境建议使用分布式
a: 配置文件 vim jdbc-sink-distributed.properties
bootstrap.servers=node1:6667,node2:6667,node3:6667
group.id=test-consumer-group
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.flush.interval.ms=10000
config.storage.topic=configs-topic
status.storage.topic=connect-status 3.2 启动命令 用rest接口启动
curl -X POST /connectors HTTP/1.1
Host: kafka.test.nd1
Content-Type: application/json
Accept: application/json
{
"name": "local-dw-sink",
"config": {
"connector.class":"com.trcloud.hamal.sink.jdbc.JdbcSinkConnector",
"tasks.max":"1",
"topics":"sql-log" ,
"url":"jdbc:mysql://node4:3306/DW",
"driven":"com.mysql.jdbc.Driver",
"userName":"root",
"passwd":"1234"
}
}
kafka sink mysql,kafka之七 sinkTask详解相关推荐
- MySQL之SQL优化详解(二)
目录 MySQL之SQL优化详解(二) 1. SQL的执行顺序 1.1 手写顺序 1.2 机读顺序 2. 七种join 3. 索引 3.1 索引初探 3.2 索引分类 3.3 建与不建 4. 性能分析 ...
- laravel 调试mysql_Laravel - MySQL数据库的使用详解3(Query Builder用法2:新增、修改、删除)...
五.新增.修改.删除操作 在前文中我介绍了如何使用Query Builder(查询构造器)进行数据查询,下面接着介绍如何使用它进行数据的增.删.改操作.同样假设我们有如下用户表(user): 1,新增 ...
- MySQL高级之explain详解
MySQL高级之explain详解 文章目录 MySQL高级之explain详解 一.expalin命令详解 1.使用方式 2.结果显示 3.主要的字段信息 4.作用 二.id字段 三.select_ ...
- NodeJS+Express+mySQL服务端开发详解
NodeJS+Express+mySQL服务端开发详解 随着NodeJS的发展,现在已经被很多人熟知,NodeJS已经成为了前端开发人员必备的技能.本文不会对NodeJS过多介绍 如果你感兴趣可以访问 ...
- mysql查询优化explain命令详解
转载自 mysql查询优化explain命令详解 mysql查询优化的方法有很多种,explain是工作当中用的比较多的一种检查方式.explain翻译即解释,就是看mysql语句的查询解释计划,从解 ...
- mysql多表查询详解_MySQL多表查询详解上
时光在不经意间,总是过得出奇的快.小暑已过,进入中暑,太阳更加热烈的绽放着ta的光芒,...在外面被太阳照顾的人们啊,你们都是勤劳与可爱的人啊.在房子里已各种姿势看我这篇这章的你,既然点了进来,那就由 ...
- mysql二进制方式_MySQL数据库之MySql二进制连接方式详解
本文主要向大家介绍了MySQL数据库之MySql二进制连接方式详解 ,通过具体的内容向大家展现,希望对大家学习MySQL数据库有所帮助. 使用mysql二进制方式连接 您可以使用MySQL二进制方式进 ...
- pdo mysql limit_PHP mysql中limit用法详解(代码示例)
在MySQL中,LIMIT子句与SELECT语句一起使用,以限制结果集中的行数.LIMIT子句接受一个或两个offset和count的参数.这两个参数的值都可以是零或正整数. offset:用于指定要 ...
- rpm安装mysql5.6_利用rpm安装mysql 5.6版本详解
前言 其实之前使用yum安装MySQL确实很方便,但是默认安装的myql5.0版本的,不支持utf8mb4(utf8mb4扩展到一个字符最多能有4节,所以能支持更多的字符集,比如支持emoji表情)编 ...
最新文章
- java起源_Java的来源
- 乐峰VS聚美,明星也要吃咸盐
- boost的multi_index的使用
- Java swing中的keyListener使用事例
- gnu grub version 2.0.2设置启动顺序_如何修复grub异常
- 剑指offer面试题53 - II. 0~n-1中缺失的数字(二分查找)
- 男人要走过几条路才称得上男子汉?
- 如何在验证集加噪声_图像去噪:如何去其糟粕,取其精华?
- Ubuntu 全部命令
- ssh-key生成密钥及SSH无密码登录的配置(转载)
- Tomcat 8 解决“At least one JAR was scanned for TLDs yet contained no TLDs”问题
- 请不要再说NIO和多路复用IO是同一个东西了(内含BIO、NIO、多路复用、Netty、AIO案例测试代码)
- 平安银行软件测试难么,中国平安银行关于软件测试笔试试题(一)
- saspython知乎_SAS入门书籍有哪些值得推荐?
- 零基础快速自学SQL,2天足矣
- BP神经网络算法基本原理,BP神经网络计算过程
- java8中的date和joda time中的日期相互转换
- 99行Swift完成Markdown在线编辑服务器
- Windows取证一
- 【Linux服务器架设】搭建存储服务器-NFS
热门文章
- 给你一碗孟婆汤,你会忘记什么?
- 5-条件(如果是这样该怎么办?)
- 利用URL重写跟踪Session(多学一招)
- java1.8输出语句_[【小白学Java——干货】1.初学Java,认识语法、变量与输出语句...
- 支付页面设计灵感|最美剁手的正确姿势!
- 直观简洁的促销海报模板,明明白白搞事情!
- C4D电商促销素材模板,让你的工作效率更加高效​!
- 电商美工手里没有C4D素材模板,不能体现有多厉害
- 移动端引导页UI设计临摹模板,ui设计师进阶必备
- mac地址修改_【电脑知识】在纯DOS下更改BIOS中网卡MAC地址教程