使用kafka connector 功能实现一个数据从kafka到MySQL的sinkTask

一:实现JdbcSinkConnector类

public class JdbcSinkConnector extends SinkConnector{

private String url;

private String driven;

private String userName;

private String passwd;

public void start(Map props) {

this.url = PropertyVerify.getOrElse(props, Constant_Global.URL, "jdbc:mysql://localhost/test", "'URL' is null");

this.driven = PropertyVerify.getOrElse(props, Constant_Global.DRIVEN, "com.mysql.jdbc.Driver", "'DRIVEN' is null");

this.userName = PropertyVerify.getOrElse(props, Constant_Global.USERNAME, "root", "'USERNAME' is null");

this.passwd = PropertyVerify.getOrElse(props, Constant_Global.PASSED, "root", "'PASSED' is null");

}

public Class extends Task> taskClass() {

return JdbcSinkTask.class;

}

public List> taskConfigs(int maxTasks) {

ArrayList> configs = new ArrayList<>();

for(int i=0;i

Map conf = new HashMap();

conf.put("url", url);

conf.put("driven", driven);

conf.put("userName", userName);

conf.put("passwd", passwd);

configs.add(conf);

}

return configs;

}

@Override

public String version() {

return AppInfoParser.getVersion();

}

@Override

public void stop() {

// TODO Auto-generated method stub

}

}二:实现JdbcSinkConnector类

public class JdbcSinkTask extends SinkTask{

private static final Logger LOG = LoggerFactory.getLogger(JdbcSinkTask.class);

//private Connection conn = null;

public String shcema;

private JdbcDbWriter writer;

@Override

public String version() {

return new JdbcSinkConnector().version();

}

@Override

public void flush(Map map) {

LOG.info("================flush Map start................===========================================================");

}

@Override

public void put(Collection sinkRecords) {

if(sinkRecords.isEmpty()){

return;

}

try {

writer.write(sinkRecords,shcema,email);

} catch (SQLException | IOException e) {

try {

EmailUtil.init(Constant_Global.STMP, Constant_Global.EMAILUSER, Constant_Global.EMAILPASSWD, Constant_Global.EMAILTITAL, Constant_Global.EMAILADREE, email);

EmailUtil.send(" kafka sink 数据写入有问题 ");

} catch (MessagingException e1) {

e1.printStackTrace();

}

throw new JDBCConntorException("数据写入有问题");

}

}

@Override

public void start(Map pro) {

try {

DbPool.init(pro);

writer =new JdbcDbWriter();

} catch (PropertyVetoException e1) {

e1.printStackTrace();

LOG.info("数据库配置异常=====");

}

}

@Override

public void stop() {

}

}三 :打包运行

3.1 单机版运行,配置文件在kafka/config目录下

a: cp   connect-file-sink.properties    connect-jdbc-sink.properties

b: vim connect-jdbc-sink.properties  配置如下

# kafka connector properties

name=canal-sink-connector #定义task名称

connector.class=com.trcloud.hamal.sink.jdbc.JdbcSinkConnector #定义自己打包中的类

tasks.max=1 #task个数

topics=words-out1 #消费的topic

url=jdbc:mysql://172.30.50.213/test #数据库参数

driven=com.mysql.jdbc.Driver #数据库驱动

userName=root #数据库用户名

passwd=123456 #数据库密码        c: 启动命令

./connect-standalone.sh   ../config/connect-standalone.properties  ../config/connect-jdbc-sink.properties

3.2 单机版用于测试,生产环境建议使用分布式

a: 配置文件 vim  jdbc-sink-distributed.properties

bootstrap.servers=node1:6667,node2:6667,node3:6667

group.id=test-consumer-group

key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false

value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets

offset.flush.interval.ms=10000

config.storage.topic=configs-topic

status.storage.topic=connect-status     3.2 启动命令 用rest接口启动

curl -X POST /connectors HTTP/1.1

Host: kafka.test.nd1

Content-Type: application/json

Accept: application/json

{

"name": "local-dw-sink",

"config": {

"connector.class":"com.trcloud.hamal.sink.jdbc.JdbcSinkConnector",

"tasks.max":"1",

"topics":"sql-log" ,

"url":"jdbc:mysql://node4:3306/DW",

"driven":"com.mysql.jdbc.Driver",

"userName":"root",

"passwd":"1234"

}

}

kafka sink mysql,kafka之七 sinkTask详解相关推荐

  1. MySQL之SQL优化详解(二)

    目录 MySQL之SQL优化详解(二) 1. SQL的执行顺序 1.1 手写顺序 1.2 机读顺序 2. 七种join 3. 索引 3.1 索引初探 3.2 索引分类 3.3 建与不建 4. 性能分析 ...

  2. laravel 调试mysql_Laravel - MySQL数据库的使用详解3(Query Builder用法2:新增、修改、删除)...

    五.新增.修改.删除操作 在前文中我介绍了如何使用Query Builder(查询构造器)进行数据查询,下面接着介绍如何使用它进行数据的增.删.改操作.同样假设我们有如下用户表(user): 1,新增 ...

  3. MySQL高级之explain详解

    MySQL高级之explain详解 文章目录 MySQL高级之explain详解 一.expalin命令详解 1.使用方式 2.结果显示 3.主要的字段信息 4.作用 二.id字段 三.select_ ...

  4. NodeJS+Express+mySQL服务端开发详解

    NodeJS+Express+mySQL服务端开发详解 随着NodeJS的发展,现在已经被很多人熟知,NodeJS已经成为了前端开发人员必备的技能.本文不会对NodeJS过多介绍 如果你感兴趣可以访问 ...

  5. mysql查询优化explain命令详解

    转载自 mysql查询优化explain命令详解 mysql查询优化的方法有很多种,explain是工作当中用的比较多的一种检查方式.explain翻译即解释,就是看mysql语句的查询解释计划,从解 ...

  6. mysql多表查询详解_MySQL多表查询详解上

    时光在不经意间,总是过得出奇的快.小暑已过,进入中暑,太阳更加热烈的绽放着ta的光芒,...在外面被太阳照顾的人们啊,你们都是勤劳与可爱的人啊.在房子里已各种姿势看我这篇这章的你,既然点了进来,那就由 ...

  7. mysql二进制方式_MySQL数据库之MySql二进制连接方式详解

    本文主要向大家介绍了MySQL数据库之MySql二进制连接方式详解 ,通过具体的内容向大家展现,希望对大家学习MySQL数据库有所帮助. 使用mysql二进制方式连接 您可以使用MySQL二进制方式进 ...

  8. pdo mysql limit_PHP mysql中limit用法详解(代码示例)

    在MySQL中,LIMIT子句与SELECT语句一起使用,以限制结果集中的行数.LIMIT子句接受一个或两个offset和count的参数.这两个参数的值都可以是零或正整数. offset:用于指定要 ...

  9. rpm安装mysql5.6_利用rpm安装mysql 5.6版本详解

    前言 其实之前使用yum安装MySQL确实很方便,但是默认安装的myql5.0版本的,不支持utf8mb4(utf8mb4扩展到一个字符最多能有4节,所以能支持更多的字符集,比如支持emoji表情)编 ...

最新文章

  1. java起源_Java的来源
  2. 乐峰VS聚美,明星也要吃咸盐
  3. boost的multi_index的使用
  4. Java swing中的keyListener使用事例
  5. gnu grub version 2.0.2设置启动顺序_如何修复grub异常
  6. 剑指offer面试题53 - II. 0~n-1中缺失的数字(二分查找)
  7. 男人要走过几条路才称得上男子汉?
  8. 如何在验证集加噪声_图像去噪:如何去其糟粕,取其精华?
  9. Ubuntu 全部命令
  10. ssh-key生成密钥及SSH无密码登录的配置(转载)
  11. Tomcat 8 解决“At least one JAR was scanned for TLDs yet contained no TLDs”问题
  12. 请不要再说NIO和多路复用IO是同一个东西了(内含BIO、NIO、多路复用、Netty、AIO案例测试代码)
  13. 平安银行软件测试难么,中国平安银行关于软件测试笔试试题(一)
  14. saspython知乎_SAS入门书籍有哪些值得推荐?
  15. 零基础快速自学SQL,2天足矣
  16. BP神经网络算法基本原理,BP神经网络计算过程
  17. java8中的date和joda time中的日期相互转换
  18. 99行Swift完成Markdown在线编辑服务器
  19. Windows取证一
  20. 【Linux服务器架设】搭建存储服务器-NFS

热门文章

  1. 给你一碗孟婆汤,你会忘记什么?
  2. 5-条件(如果是这样该怎么办?)
  3. 利用URL重写跟踪Session(多学一招)
  4. java1.8输出语句_[【小白学Java——干货】1.初学Java,认识语法、变量与输出语句...
  5. 支付页面设计灵感|最美剁手的正确姿势!
  6. 直观简洁的促销海报模板,明明白白搞事情!
  7. C4D电商促销素材模板,让你的工作效率更加高效​!
  8. 电商美工手里没有C4D素材模板,不能体现有多厉害
  9. 移动端引导页UI设计临摹模板,ui设计师进阶必备
  10. mac地址修改_【电脑知识】在纯DOS下更改BIOS中网卡MAC地址教程