Flink时间窗口运用

上一篇介绍了Flink定时读取外部数据Flink 定时加载外部文件数据并广播

这一篇将介绍Flink定时输出到外部存储介质,有两种办法实现,一种是同上一篇一样,在RichXXXFunction中实现SinkFunction的方法,在其中open()方法中引入java的定时任务。

本文介绍另一种实现,基于Flink window窗口机制,将结果定时sink到外部文件。

需求:

经过flink清洗后的数据,要求每天sink一次数据到某文件中(该文件内容是json格式,需要进行追加,不属于flink范畴不细讲)。

实现:

1、数据清洗:

 DataStream<JSONObject> userSink = env.addSource(ssConsumer).map(new MapFunction<String, JSONObject>() {
//此处清洗数据,取实时数据中的某些字段public JSONObject map(String value) {JSONObject jsonObject = new JSONObject();JSONObject out = new JSONObject();try {jsonObject = JSON.parseObject(value);out.put(String.valueOf(jsonObject.get("subject_id")), jsonObject.get("subject_name"));} catch (Exception e) {JSONObject json = new JSONObject();jsonObject = json;logger.error("This value parse has error:", e);}return out;}}).filter(new FilterFunction<JSONObject>() {@Override
//此处过滤非Json格式数据public boolean filter(JSONObject value) throws Exception {if ("error".equals(value.getOrDefault("error", ""))) {return false;} else {return true;}}});

2、自定义sink,流数据输出到txt文件:

/*** @Author luran* @create 2020/4/13 18:36* @Desc*//*** 继承RichSinkFunction<String>类,其中List<JSONObject>为source端传到sink的数据类型,这个视Source端数据类型而定。*/
public class MyRishSinkFileWriter extends RichSinkFunction<List<JSONObject>> implements SinkFunction<List<JSONObject>> {private static String path;/*** open方法在sink第一次启动时调用,一般用于sink的初始化操作*/@Overridepublic void open(Configuration parameters) {try {super.open(parameters);path = PropertyReaderUtil.getStrValue("user.txt.path");File file = new File(path);if (!file.exists()) {file.createNewFile();}} catch (Exception e) {log.error("获取user.txt路径失败:", e);}}/*** invoke方法是sink数据处理逻辑的方法,source端传来的数据都在invoke方法中进行处理* 其中invoke方法中第一个参数类型与RichSinkFunctionList<JSONObject>中的泛型对应。第二个参数* 为一些上下文信息*/@Overridepublic void invoke(List<JSONObject> v, Context context) {try {JSONObject json = null;String s = JsonFileReaderUtil.readJsonData(path);if (s.length() == 0) {json = new JSONObject();} else {json = JSONObject.parseObject(s);}for (JSONObject d : v) {json.putAll(d);}String value = json.toJSONString();File file = new File(path);FileWriter fileWriter = new FileWriter(file);fileWriter.write(value);fileWriter.flush();fileWriter.close();} catch (IOException e) {log.error("写入user文件异常:", e);}}/*** close方法在sink结束时调用,一般用于资源的回收操作*/@Overridepublic void close() throws Exception {super.close();}
}

3、时间窗口的启用:

//由于用了时间窗口,输出肯定是List<>     DataStream<List<JSONObject>> dataBaseStream = userSink.windowAll(TumblingProcessingTimeWindows.of(Time.days(1))).process(new ProcessAllWindowFunction<JSONObject, List<JSONObject>, TimeWindow>() {@Overridepublic void process(Context context, Iterable<JSONObject> iterable, Collector<List<JSONObject>> collector) throws Exception {logger.info("进入时间窗口:"+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));List<JSONObject> arrayList = new ArrayList<JSONObject>();iterable.forEach(single -> {arrayList.add(single);});if (arrayList.size() > 0) {collector.collect(arrayList);}}});dataBaseStream.print("userStream");dataBaseStream.addSink(new MyRishSinkFileWriter());

测试:

方便测试,先将时间改为每30秒执行,Time.seconds(30),循环发送kafka数据:

第1个时间窗口到达:Iterable中集合了这30秒接收的所有实时数据,统一处理

第2个时间窗口达到:

总结:

Flink是实时处理,window机制可以认为是flink的批处理实现,因为需要等待水位线对齐触发timer。一般还会基于时间窗口做一些统计,如Flink按统计Kafka中每小时的数量并输出到MySQL

Flink实战—基于时间窗口定时输出sink相关推荐

  1. Flink从入门到真香(12、Flink一大利器-时间窗口)

    flink中支持多种窗口,包括:时间窗口,session窗口,统计窗口等等,能想到的基本都可以实现 时间窗口(Time Windows) 最简单常用的窗口形式是基于时间的窗口,flink支持三种种时间 ...

  2. 使用Spark Streaming SQL基于时间窗口进行数据统计

    1.背景介绍 流式计算一个很常见的场景是基于事件时间进行处理,常用于检测.监控.根据时间进行统计等系统中.比如埋点日志中每条日志记录了埋点处操作的时间,或者业务系统中记录了用户操作时间,用于统计各种操 ...

  3. 【Flink系列2】时间窗口

    引出 对于流处理系统来说,流入的消息是无限的,所以对于聚合或是连接等操作,流处理系统需要对流入的消息进行分段,然后基于每一段数据进行聚合或是连接等操作.消息的分段即称为窗口,流处理系统支持的窗口有很多 ...

  4. 基于时间的访问控制列表

    版权声明:原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 .作者信息和本声明.否则将追究法律责任.http://zpp2009.blog.51cto.com/730423/256204 ...

  5. 2021年大数据Flink(十九):案例一 基于时间的滚动和滑动窗口

    目录 案例一 基于时间的滚动和滑动窗口 需求 代码实现 案例一 基于时间的滚动和滑动窗口 需求 nc -lk 9999 有如下数据表示: 信号灯编号和通过该信号灯的车的数量 9,3 9,2 9,7 4 ...

  6. flink定时读取mysql数据_flink时间系统系列之实例讲解:如何做定时输出

    flink时间系统系列篇幅目录: 六.实例讲解:如何做定时输出 今天为大家带来flink时间系统系列最后一篇实战篇,同样也是查漏补缺篇:如何做定时输出,首先说一下定时输出的需求背景,在flink流处理 ...

  7. 万字详述 Flink SQL 4 种时间窗口语义!(收藏)

    DML:窗口聚合 大家好我是老羊,由于窗口涉及到的知识内容比较多,所以博主先为大家说明介绍下面内容时的思路,大家跟着思路走.思路如下: ⭐ 先介绍 Flink SQL 支持的 4 种时间窗口 ⭐ 分别 ...

  8. 最新 Flink 1.13 时间和窗口(时间语义、Watermark、Window 窗口、Trigger)快速入门、详细教程

    时间和窗口 文章目录 时间和窗口 一.Flink 的三种时间语义 二.水位线(Watermark) 1. Flink 中的 Watermark 机制 2. 如何生成水位线 3. 水位线的传递 三.窗口 ...

  9. 第三课 大数据技术之Fink1.13的实战学习-时间和窗口

    第三课 大数据技术之Fink1.13的实战学习-时间和窗口 文章目录 第三课 大数据技术之Fink1.13的实战学习-时间和窗口 第一节 时间定义 1.1 Flink中的时间语义 1.2 两种时间语义 ...

最新文章

  1. 7-1 打印沙漏 (20 分)
  2. spark 序列化错误 集群提交时_Spark统一内存管理机制
  3. antd 日期时间选择_【UI设计】日期选择器的常见样式总结
  4. 刚装完系统的简单优化
  5. 软件工程个人作业01 100以内四则运算自动答题系统(含整数和真分数)
  6. Tests of the Equality of Two Means
  7. 2021年浙江省职业院校技能大赛“新华三杯”大数据技术与应用比赛 比赛经验
  8. 安卓模拟器刷小米系统_小米5x原生rom系统刷机-小米5X MIUI10刷机包下载V9.7.21最新版-西西软件下载...
  9. 射频电路学习之滤波电路
  10. 基于SSM的学生社团管理系统
  11. QNAP+Transmission
  12. 打包java项目_Java项目常见打包方式
  13. 双代号网络图快速计算时差法
  14. 今晚7:30 | 推荐系统中的异构关系学习——香港大学计算机学院助理教授黄超
  15. 基于python的招聘信息可视化系统
  16. 记一次在vue项目上使用七牛文件上传的坑
  17. android图片:多选相册的实现
  18. 百度地图 android 自身地点,Android使用百度地图SDK获得当前设备位置所在的省、市(系列1)...
  19. 光猫-新版水星路由器配置(WiFi连接不上后)
  20. MySQL——SQLyog如何导出、导入数据库

热门文章

  1. 13星座性格购买iphone6s的搞笑反映
  2. 小程序开发学习(3)---.wxss详解篇
  3. 从0开始搭建Web自动化测试框架
  4. vc显示已主机服务器出现断点,记一次中断点 已到达中断点的异常
  5. 特斯拉股价周三开盘上涨近7% 因Q2交付量创纪录缓解了需求担忧
  6. 湖北省教育考试院湖北省人事考试网报名入口报名时间软考报名
  7. 基于视角变化的视频关键帧提取方法(附代码地址)
  8. 晋城一中oj 议员秘密
  9. ChatGPT:chatGPT本地部署、运行和接口调用
  10. 职业梦想是计算机的英语作文,我梦想的职业高中英语作文