一、背景
由于业务统计数据,需要根据业务日志对业务情况进行简单分析处理统计,为了更好的查询统计则选择将业务日志汇总有用的数据筛选入库,由于数据非实时性,选择将日志内容导出,并使用flink批处理进行筛选、过滤、入库,后续通过sql语句查询统计,当然flink也可以进行统计,但是非重复性工作统计多变每次得改和跑程序读比较麻烦,故选择入库。
二、准备工作

  1. 新建一个maven项目
  2. 加入flink相关依赖包,选择启动类地址
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.14.0</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><hutool-all.version>5.3.8</hutool-all.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- This dependency is provided, because it should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Add connector dependencies here. They must be in the default scope (compile). --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool-all.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-dbcp2</artifactId><version>2.1.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.21</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_${scala.binary.version}</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><configuration><createDependencyReducedPom>false</createDependencyReducedPom></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><!-- 不要拷贝 META-INF 目录下的签名,否则会引起 SecurityExceptions 。 --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"><resource>META-INF/spring.factories</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.schemas</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- 启动类地址 --><mainClass>com.test.DoveBootStrap</mainClass></transformer></transformers></configuration></execution></executions><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.2.2.RELEASE</version></dependency></dependencies></plugin>

三、具体实现

  • 利用flink自带的JDBCOutputFormat进行入库
public class DoveBootStrap {public static void main(String[] args) throws Exception {TimeInterval timer = DateUtil.timer();JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat().setDrivername("com.mysql.cj.jdbc.Driver").setDBUrl("jdbc:mysql://localhost:3306/sms?user=root&password=123456&serverTimezone=UTC").setQuery("insert into sea_dove1 (id,send_time,phone,msg,business,source) values (?,?,?,?,?,?)")//设置为每有1000条数据就提交一次,这里可以不要也行.setBatchInterval(1000).finish();//初始化批处理执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//文件地址String filePath = "D:\\log\\seadove\\10yue.json";//读取文件内容DataSource<String> dataSource = env.readTextFile(filePath);//进行了数据的筛选FlatMapOperator<String, SeaDove> mapOperator = dataSource.flatMap(new SeaDoveFlatMapFunction());//筛选内容赋值MapOperator<SeaDove, Row> map = mapOperator.map(new MapFunction<SeaDove, Row>() {@Overridepublic Row  map(SeaDove value) throws Exception {Row row = new Row(6);row.setField(0, SnowFlakeFactory.getSnowFlakeFromCache().nextId());row.setField(1, value.getSend_time());row.setField(2, value.getPhone());row.setField(3, value.getMsg());row.setField(4, value.getBusiness());row.setField(5, value.getSource());return row;}});//输出map.output(jdbcOutput);env.execute();System.out.println("JDBCOutputFormat 耗时:"+timer.interval());}
}
  • 文件数据百万入库27万约3分钟

flink批处理读取文件入库mysql相关推荐

  1. Flink 分别读取kafka和mysql作为source

    需求 首先从kafka中读取数据,然后从mysql中读取数据,然后将这两个数据进行合并处理. 环境 Flink 1.8.2 实现 public static void main(String[] ar ...

  2. runtime批处理mysql导出_【原】使用批处理BAT文件处理Mysql数据库 | 学步园

    在做项目的时候,考虑到项目中很多模块是公用,数据库也是公用,所以考虑把公用模块的数据库全部用批处理生产,这样或多或少提高了一些效率. 处理方法是: 1:用一个txt保存该项目数据库名称,方便新建工程时 ...

  3. python连接mysql,并读取文件写入mysql

    PyMysql的使用 菜鸟教程:https://www.runoob.com/python3/python3-mysql.html(安装+介绍) # 导入pymysql模块 import pymysq ...

  4. 【数据库2】生成txt/xml文件,ftp,oracle安装/表操作/虚表/日期/序列/索引/视图/链路/同义词/高可用性,mysql/文件入库/清理/表结构设计/交换/收集

    文章目录 1.生成数据:crontab 2.ftp:ftp是tcp/ip协议族中一员,分客户端和服务端 2.1 安装:linux操作系统的用户也是ftp的用户,可以配置专用的ftp用户,专用的ftp用 ...

  5. 定时指定ftp目录下csv文件入库到mysql数据库

    数据源 FTP-->单机程序业务说明 程序启动之后,通过方法每小时下载FTP 132.255.150.217上最新的一个csv表到本地,如果不是只获取最新的表,文件无需改代码,通过方法会获取ft ...

  6. 【网络安全】如何搭建MySQL恶意服务器读取文件?

    前言 注:本文不涉及对MySQL协议报文研究,仅讲解原理,并且做部分演示. 搭建MySQL恶意服务器读取文件这件事,虽然直接利用门槛较高,但是由于在网上看到了一种比较新颖的利用方式(利用社会工程学引诱 ...

  7. 全网最详细SpringBatch批处理读取分区(Paratition)文件讲解

    文章目录 一.分区Step 1.数据分区 2.分区处理 二.实现分区关键接口 1.Partitioner 2.StepExecutionSplitter 3.PartitionHandler 三.基本 ...

  8. mysql 读取文件_关于mysql:逐行读取文件而不将整个文件加载到内存中

    我正在使用50 Gb MySQL导出文件,并对其执行脚本操作列表以转换为SQLite3可加载形式(我从这里得到的线索:脚本将mysql dump sql文件转换为可以导入sqlite3的格式D b ) ...

  9. kettle数据同步从EXCL文件读取数据入库

    从EXCL文件读取数据入库 我们了解一下我们如何将EXCL文件导入到数据库当中. Kettle工具提供了一个选项的功能是把excl文件的数据提取出来. 在使用这个选项的时候,必须EXCL文件格式是标准 ...

最新文章

  1. 谷歌chrome浏览器的源码分析(五)
  2. OpenSSL(加密方式,加密算法,自签证书)
  3. Dashboard集群
  4. 「最有用」的特殊大数据:一文看懂文本信息系统的概念框架及功能
  5. 蓝宝石显卡bios_这操作竟能让显卡性能暴涨?原来不是黑科技,小白都会
  6. LeetCode2. 两数相加
  7. PyTorch学习—14.PyTorch中的学习率调整策略
  8. CSS里 @import用法
  9. (八)冰点还原安装及使用
  10. android辅助功能截屏,Android 截屏的三种方法
  11. i350在linux系统刷MAC,MAC 10.14 安装教程10-基于黑果小兵大神EFI文件的修改过程
  12. Bugku杂项 wp1
  13. 一个屌丝程序猿的人生(二十六)
  14. windows 7 数据执行保护 开启关闭方法
  15. 利用Jimi进行图片缩放操作
  16. 企业信息化:体系比软件更重要
  17. 用计算机升级ipad系统软件,iPad如何升级系统?三种ipad升级系统的方法汇总
  18. [收藏] 最全服务器基础知识科普
  19. web漏洞之文件上传漏洞
  20. MATLAB中内置的BP神经网络函数 help newff翻译【学习笔记】

热门文章

  1. HTTPS_SSL加密(HTTP终)
  2. C++ MFC万能的类向导
  3. 2018年11月16日SQL Server实验内容(触发器实验)
  4. 雨课堂知识点总结(十一)
  5. 用于三维医学图像检测的半监督学习——FocalMix: Semi-Supervised Learning for 3D Medical Image Detection
  6. [解读] You Only Train Once Loss-Conditional Training of Deep Networks
  7. 复旦大学计算机科学技术学院期末,复旦大学计算机科学技术学院多媒体技术基础试题...
  8. 黑盒测试和白盒测试的优缺点
  9. tablepc是什么平板电脑_平板电脑是什么
  10. 2020年11月26日JetbrainsAgent安装参数