Flink官网提供了JdbcSink的功能,如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(...).addSink(JdbcSink.sink("insert into books (id, title, author, price, qty) values (?,?,?,?,?)",(ps, t) -> {ps.setInt(1, t.id);ps.setString(2, t.title);ps.setString(3, t.author);ps.setDouble(4, t.price);ps.setInt(5, t.qty);},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(getDbMetadata().getUrl()).withDriverName(getDbMetadata().getDriverClass()).build()));
env.execute();

但是,在下才疏学浅,中间那个…看的在下一脸懵逼。
自己从网上查找资料发现只有创建一个RichSinkFunction的方法,在下就是想用官网的那个方法。于是经过在gitee寻找依赖,在代码中寻找关键字JdbcSink,终于找到了,Apache Flink的整个项目的代码。起初我还不知道,发现这个demo项目怎么这么大,还有几个依赖下载不下来。
在Flink的测试类中找到了JdbcSink相关的Java测试代码。
然后经过在下的不懈努力,终于写成了一个scala的测试类。如下:

import org.apache.flink.connector.jdbc.JdbcConnectionOptions.JdbcConnectionOptionsBuilder
import org.apache.flink.connector.jdbc.{JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}import java.sql.{PreparedStatement, Types}case class TestEntry(id: Int, title: String, author: String, price: Double, qty: Int)object JDBCSinkTestJob {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.fromElements(TestEntry(1001, "Java public for dummies", "Tan Ah Teck", 11.11, 11),TestEntry(1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22),TestEntry(1003, "More Java for more dummies", "Mohammad Ali", 33.33, 33),TestEntry(1004, "A Cup of Java", "Kumar", 44.44, 44),TestEntry(1005, "A Teaspoon of Java", "Kevin Jones", 55.55, 55),TestEntry(1006, "A Teaspoon of Java 1.4", "Kevin Jones", 66.66, 66),TestEntry(1007, "A Teaspoon of Java 1.5", "Kevin Jones", 77.77, 77),TestEntry(1008, "A Teaspoon of Java 1.6", "Kevin Jones", 88.88, 88),TestEntry(1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99),TestEntry(1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), 99, 1010)).addSink(JdbcSink.sink(String.format("insert into %s (id, title, author, price, qty) values (?,?,?,?,?)", "books"),new JdbcStatementBuilder[TestEntry] {override def accept(ps: PreparedStatement, t: TestEntry): Unit = {ps.setInt(1, t.id);ps.setString(2, t.title);ps.setString(3, t.author);if (t.price == null) {ps.setNull(4, Types.DOUBLE);} else {ps.setDouble(4, t.price);}ps.setInt(5, t.qty);}},new JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://hadoop10:3306/test?useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("123456").withDriverName("com.mysql.jdbc.Driver").build()))env.execute()}}

这个测试类,把测试代码的变量给提取了出来。
写代码的过程还需要导入MySQL需要的依赖,自行搜索。
测试结果

Perfect,在下非常的高兴,决定打两把游戏奖赏一下自己。

参考资料:
gitee中的Flink项目代码

Flink使用JdbcSink下沉数据到数据库MySQL相关推荐

  1. 大数据之数据库mysql优化实战(一)

    2019独角兽企业重金招聘Python工程师标准>>> :facepunch: 大数据之数据库mysql优化实战(一) 首先你要有数据,不然怎么测试,几百条就算了,还没跑就完了. 本 ...

  2. mysql查出倒序第一条数据_[数据库]mysql 记录根据日期字段倒序输出

    [数据库]mysql 记录根据日期字段倒序输出 0 2016-07-21 11:00:17 我们知道倒序输出是很简单的 select * from table order by id desc 直接这 ...

  3. python网页数据存入数据库_python网络爬虫抓取动态网页并将数据存入数据库MySQL...

    简述 以下的代码是使用python实现的网络爬虫,抓取动态网页 http://hb.qq.com/baoliao/ .此网页中的最新.精华下面的内容是由JavaScript动态生成的.审查网页元素与网 ...

  4. python网站数据写入mysql_python网络爬虫抓取动态网页并将数据存入数据库MySQL

    简述 以下的代码是使用python实现的网络爬虫,抓取动态网页 http://hb.qq.com/baoliao/ .此网页中的最新.精华下面的内容是由JavaScript动态生成的.审查网页元素与网 ...

  5. Java 批量插入数据到数据库(MySQL)中

    实现Java批量插入数据库数据: package Proxy;import java.io.BufferedReader; import java.io.File; import java.io.Fi ...

  6. excel表数据导入数据库mysql中,并解决导入时间格式问题

    1.准备好Excel表数据 id category_id category_pid title art_desc content imageurl tags   create_time   3 1 E ...

  7. mysql 导出中间 数据_MYSQL数据库之间的数据导出与导入

    源数据库地址: 172.16.1.7 目标数据库地址: 172.16.1.51 步骤: (1) 进入172.16.1.7服务器,登录mysql数据库 mysqldump -uusername -ppa ...

  8. nodejs操作sqlserver数据_pyspark操作MySQL、SQLServer数据库进行数据处理操作

    欢迎访问本人的CSDN博客[Together_CZ],我是沂水寒城. https://yishuihancheng.blog.csdn.net 在大数据处理领域里面,Hadoop和spark可以说是最 ...

  9. python爬取新闻并归数据库_Python爬取数据并写入MySQL数据库操作示例

    Python爬取数据并写入MySQL数据库的实例 首先我们来爬取 http://html-color-codes.info/color-names/ 的一些数据. 按 F12 或 ctrl+u 审查元 ...

最新文章

  1. java在线作业系统_在线作业系统论文
  2. win10设置默认输入法_为什么说win10越来越好用了?(技巧篇)
  3. eclipse的tomcat运行mave web项目
  4. P8U8 最中听的话,不要做每件事都考虑太多。
  5. vue的matcher_一张思维导图辅助你深入了解 Vue | Vue-Router | Vuex 源码架构
  6. python用cx_Oracle连接oracle编码问题解决办法
  7. SAP License:SAP PM的应用从实施开始
  8. 金牌访谈栏目《架构师说》重磅上线!
  9. UVA11152 Safe Salutations【计算几何】
  10. Echarts数据可视化toolbox工具框,开发全解+完美注释
  11. 【色空win7动漫美女诱惑主题】
  12. 用matlab对图像进行二维傅里叶变换
  13. 音视频中的码率控制(CBR、VBR、CVBR、FIXQP)
  14. 计算机语言中double是什么意思,C语言中double是什么意思?_后端开发
  15. 你所热爱的,就是你的生活
  16. 滚滚长江东逝水历史的天空
  17. 查看电脑是否能插内存条
  18. 计算机辅助建筑设计英文全称是,“CAAD”是“Computer Aided Architectural Design”的缩写,意思是“计算机辅助建筑设计”...
  19. 百度地图框选标注坐标返回标注信息
  20. UITableView上下滚动卡顿(获取网络数据,下载图片之后)

热门文章

  1. IDEA编译代码报错,找不到符号:找不到符号包
  2. hdu1256-画8
  3. 计算机快速格式化u盘启动,小编教你如何解决u盘打不开提示格式化
  4. C语言中的指针,指针存在的意义
  5. twitter和新浪微博比较
  6. java 公历 农历_Java给定公历日期计算相应农历/阴历日期
  7. Codepage的定义和历史
  8. H12-821题库详解
  9. jdk下载与安装教程win10_jdk下载与安装教程win7
  10. 带通滤波器和带阻滤波器详细解析:(定义,区别,工作原理,经典电路图,应用)