/**

* 自定义Mysql Source

*/

public class CustomerMysqlSourceDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 获得自定义Source对象

DataStreamSource<UserInfo> mysqlSource = env.addSource(new MyMysqlSource());

mysqlSource.print();

env.execute("CustomerMySQLSourceDemo");

}

/**

自定义Mysql Source实现类

*/

public static class MyMysqlSource extends RichSourceFunction<UserInfo> {

private Connection connection = null;       // 定义数据库连接对象

private PreparedStatement ps = null;        // 定义PreparedStatement对象

/*

使用open方法, 这个方法在实例化类的时候会执行一次, 比较适合用来做数据库连接

*/

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

// 加载数据库驱动

Class.forName("com.mysql.jdbc.Driver");

// 创建数据库连接

String url = "jdbc:mysql://node01:3306/flinkdemo?useUnicode=true&characterEncoding=utf-8&useSSL=false";

this.connection = DriverManager.getConnection(url, "root", "123456");

// 准备PreparedStatement对象

this.ps = connection.prepareStatement("SELECT id, username, password, name FROM user");

}

/*

使用close方法, 这个方法在销毁实例的时候会执行一次, 比较适合用来关闭连接

*/

@Override

public void close() throws Exception {

super.close();

// 关闭资源

if (this.ps != null) this.ps.close();

if (this.connection != null) this.connection.close();

}

@Override

public void run(SourceContext<UserInfo> ctx) throws Exception {

ResultSet resultSet = ps.executeQuery();

while (resultSet.next()) {

int id = resultSet.getInt("id");

String username = resultSet.getString("username");

String password = resultSet.getString("password");

String name = resultSet.getString("name");

ctx.collect(new UserInfo(id, username, password, name));

}

}

@Override

public void cancel() {

System.out.println("任务被取消......");

}

}

/**

数据定义类, POJO

*/

@Data

@AllArgsConstructor

@NoArgsConstructor

public static class UserInfo {

int id;

String username;

String password;

String name;

}

}

/**

* 从指定的socket读取数据,对单词进行计算,最后将结果写入到MySQL

*/

public class JDBCSinkDemo {

public static void main(String[] args) throws Exception {

//创建Flink流计算执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000);

//创建DataStream

//Source

DataStreamSource<String> lines = env.socketTextStream("node01", 9999);

//调用Transformation

SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

@Override

public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {

String[] words = line.split(" ");

for (String word : words) {

//new Tuple2<String, Integer>(word, 1)

collector.collect(Tuple2.of(word, 1));

}

}

});

//分组

KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {

@Override

public String getKey(Tuple2<String, Integer> tp) throws Exception {

return tp.f0;

}

});

//聚合

SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);

summed.addSink(JdbcSink.sink(

"INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = ?",

(ps, t) -> {

ps.setString(1, t.f0);

ps.setInt(2, t.f1);

ps.setInt(3, t.f1);

},

new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

.withUrl("jdbc:mysql://node03:3306/test?characterEncoding=utf-8")

.withDriverName("com.mysql.jdbc.Driver")

.withUsername("root")

.withPassword("123456")

.build()));

//启动执行

env.execute("JDBCSinkDemo");

}

}

FlinkMysqlSourceFlinkMysqlSink相关推荐

最新文章

  1. python学精通要多久-学Python编程难吗 从入门到精通学习Python要多久
  2. ITK:创建高斯导数内核
  3. 每日一题——王道考研2.2.4.1
  4. 多继承、经典类与新式类、新式类的C3算法详解
  5. 企业级 SpringBoot 教程 (十四)在springboot中用redis实现消息队列
  6. 安装PHP ImageMagick笔记
  7. 马科维茨投资组合理论总结
  8. BZOJ 1260涂色 paint
  9. 微信字体调大后页面错乱问题
  10. Unity 第三方SDK框架接入 (Android Studio)
  11. 基尔霍夫电流定律KCL,基尔霍夫电压定律KVL
  12. xp透明膜p系列_一种XPP型超厚医用吸塑包装膜的制作方法
  13. Linux中文件搜索,查找,读取
  14. linux qt make文件或目录,rpm,linux_为Qt程序制作rpm包的spec文件里的路径问题,rpm,linux,qt,rpmbuild,spec - phpStudy...
  15. 供应化学试剂Boc-NH-PEG-NH2,Boc-NH-PEG-amine,叔丁氧羰基PEG氨基
  16. Velodyne Lidar公布自动驾驶技术世界安全峰会的议程
  17. 计算机系统xp和w7,告诉你十年老电脑装xp还是win7
  18. lte接口流程图_LTE信令流程图(端到端平台)[技术学习]
  19. eas 税率修改_EAS委外加工业务操作手册
  20. 中国能不能写出操作系统?

热门文章

  1. [USACO3.2]香甜的黄油 Sweet Butter
  2. 树言树语 输入法之争霸
  3. 大量教程+工具+源码下载地址汇总
  4. 【C++】RC4加密算法
  5. Linux命令之在终端里观看「星球大战」
  6. 队列的链式存储结构(链队)
  7. android驱动学习
  8. 轮播swiper中加视频video滑动不顺畅报错 property on ‘HTMLMediaElement‘: The provided double value is non-finite.
  9. ansible 建 kubernetes 证书签名请求_基于Kubernetes的云平台存储容器化实践
  10. pip install execjs报错