FlinkMysqlSourceFlinkMysqlSink
/**
* 自定义Mysql Source
*/
public class CustomerMysqlSourceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获得自定义Source对象
DataStreamSource<UserInfo> mysqlSource = env.addSource(new MyMysqlSource());
mysqlSource.print();
env.execute("CustomerMySQLSourceDemo");
}
/**
自定义Mysql Source实现类
*/
public static class MyMysqlSource extends RichSourceFunction<UserInfo> {
private Connection connection = null; // 定义数据库连接对象
private PreparedStatement ps = null; // 定义PreparedStatement对象
/*
使用open方法, 这个方法在实例化类的时候会执行一次, 比较适合用来做数据库连接
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 加载数据库驱动
Class.forName("com.mysql.jdbc.Driver");
// 创建数据库连接
String url = "jdbc:mysql://node01:3306/flinkdemo?useUnicode=true&characterEncoding=utf-8&useSSL=false";
this.connection = DriverManager.getConnection(url, "root", "123456");
// 准备PreparedStatement对象
this.ps = connection.prepareStatement("SELECT id, username, password, name FROM user");
}
/*
使用close方法, 这个方法在销毁实例的时候会执行一次, 比较适合用来关闭连接
*/
@Override
public void close() throws Exception {
super.close();
// 关闭资源
if (this.ps != null) this.ps.close();
if (this.connection != null) this.connection.close();
}
@Override
public void run(SourceContext<UserInfo> ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
int id = resultSet.getInt("id");
String username = resultSet.getString("username");
String password = resultSet.getString("password");
String name = resultSet.getString("name");
ctx.collect(new UserInfo(id, username, password, name));
}
}
@Override
public void cancel() {
System.out.println("任务被取消......");
}
}
/**
数据定义类, POJO
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class UserInfo {
int id;
String username;
String password;
String name;
}
}
/**
* 从指定的socket读取数据,对单词进行计算,最后将结果写入到MySQL
*/
public class JDBCSinkDemo {
public static void main(String[] args) throws Exception {
//创建Flink流计算执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
//创建DataStream
//Source
DataStreamSource<String> lines = env.socketTextStream("node01", 9999);
//调用Transformation
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
//new Tuple2<String, Integer>(word, 1)
collector.collect(Tuple2.of(word, 1));
}
}
});
//分组
KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tp) throws Exception {
return tp.f0;
}
});
//聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);
summed.addSink(JdbcSink.sink(
"INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = ?",
(ps, t) -> {
ps.setString(1, t.f0);
ps.setInt(2, t.f1);
ps.setInt(3, t.f1);
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://node03:3306/test?characterEncoding=utf-8")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("123456")
.build()));
//启动执行
env.execute("JDBCSinkDemo");
}
}
FlinkMysqlSourceFlinkMysqlSink相关推荐
最新文章
- python学精通要多久-学Python编程难吗 从入门到精通学习Python要多久
- ITK:创建高斯导数内核
- 每日一题——王道考研2.2.4.1
- 多继承、经典类与新式类、新式类的C3算法详解
- 企业级 SpringBoot 教程 (十四)在springboot中用redis实现消息队列
- 安装PHP ImageMagick笔记
- 马科维茨投资组合理论总结
- BZOJ 1260涂色 paint
- 微信字体调大后页面错乱问题
- Unity 第三方SDK框架接入 (Android Studio)
- 基尔霍夫电流定律KCL,基尔霍夫电压定律KVL
- xp透明膜p系列_一种XPP型超厚医用吸塑包装膜的制作方法
- Linux中文件搜索,查找,读取
- linux qt make文件或目录,rpm,linux_为Qt程序制作rpm包的spec文件里的路径问题,rpm,linux,qt,rpmbuild,spec - phpStudy...
- 供应化学试剂Boc-NH-PEG-NH2,Boc-NH-PEG-amine,叔丁氧羰基PEG氨基
- Velodyne Lidar公布自动驾驶技术世界安全峰会的议程
- 计算机系统xp和w7,告诉你十年老电脑装xp还是win7
- lte接口流程图_LTE信令流程图(端到端平台)[技术学习]
- eas 税率修改_EAS委外加工业务操作手册
- 中国能不能写出操作系统?
热门文章
- [USACO3.2]香甜的黄油 Sweet Butter
- 树言树语 输入法之争霸
- 大量教程+工具+源码下载地址汇总
- 【C++】RC4加密算法
- Linux命令之在终端里观看「星球大战」
- 队列的链式存储结构(链队)
- android驱动学习
- 轮播swiper中加视频video滑动不顺畅报错 property on ‘HTMLMediaElement‘: The provided double value is non-finite.
- ansible 建 kubernetes 证书签名请求_基于Kubernetes的云平台存储容器化实践
- pip install execjs报错