前言

前面文章Flink中的时间语义 和WaterMark有详细介绍过Flink WaterMark。WaterMark的出现是用来解决乱序时间的处理也就是处理迟到元素的。

WaterMark可以用来平衡计算的完整性和延迟两方面。除非我们选择一种非常保守的水位线策略(最大延时设置的非常大,以至于包含了所有的元素,但结果是非常大的延迟),否则我们总需要处理迟到的元素。

迟到的元素是指当这个元素来到时,这个元素所对应的窗口已经计算完毕了(也就是说水位线已经没过窗口结束时间了)。这说明迟到这个特性只针对事件时间。


处理迟到的元素的策略

DataStream API提供了三种策略来处理迟到元素:

  • 直接抛弃迟到的元素
  • 将迟到的元素发送到另一条流中去
  • 可以更新窗口已经计算完的结果,并发出计算结果。

使用process function抛弃迟到元素

抛弃迟到的元素是event time window operator的默认行为。也就是说一个迟到的元素不会创建一个新的窗口。

process function可以通过比较迟到元素的时间戳和当前水位线的大小来很轻易的过滤掉迟到元素。

使用侧输出(side output)重定向迟到元素

迟到的元素也可以使用侧输出(side output)特性被重定向到另外的一条流中去。迟到元素所组成的侧输出流可以继续处理或者sink到持久化设施中去。

例子

    val readings = env.socketTextStream("localhost", 9999, '\n').map(line => {val arr = line.split(" ")(arr(0), arr(1).toLong * 1000)}).assignAscendingTimestamps(_._2)val countPer10Secs = readings.keyBy(_._1).timeWindow(Time.seconds(10)).sideOutputLateData(new OutputTag[(String, Long)]("late-readings")).process(new CountFunction())val lateStream = countPer10Secs.getSideOutput(new OutputTag[(String, Long)]("late-readings"))lateStream.print()

实现CountFunction:

    class CountFunction extends ProcessWindowFunction[(String, Long),String, String, TimeWindow] {override def process(key: String,context: Context,elements: Iterable[(String, Long)],out: Collector[String]): Unit = {out.collect("窗口共有" + elements.size + "条数据")}}

下面这个例子展示了ProcessFunction如何过滤掉迟到的元素然后将迟到的元素发送到侧输出流中去。

    val readings: DataStream[SensorReading] = ???val filteredReadings: DataStream[SensorReading] = readings.process(new LateReadingsFilter)// retrieve late readingsval lateReadings: DataStream[SensorReading] = filteredReadings.getSideOutput(new OutputTag[SensorReading]("late-readings"))/** A ProcessFunction that filters out late sensor readings and * re-directs them to a side output */class LateReadingsFilter extends ProcessFunction[SensorReading, SensorReading] {val lateReadingsOut = new OutputTag[SensorReading]("late-readings")override def processElement(r: SensorReading,ctx: ProcessFunction[SensorReading, SensorReading]#Context,out: Collector[SensorReading]): Unit = {// compare record timestamp with current watermarkif (r.timestamp < ctx.timerService().currentWatermark()) {// this is a late reading => redirect it to the side outputctx.output(lateReadingsOut, r)} else {out.collect(r)}}}

使用allowed lateness迟到元素更新窗口计算结果

由于存在迟到的元素,所以已经计算出的窗口结果是不准确和不完全的。我们可以使用迟到元素更新已经计算完的窗口结果。

如果我们要求一个operator支持重新计算和更新已经发出的结果,就需要在第一次发出结果以后也要保存之前所有的状态。但显然我们不能一直保存所有的状态,肯定会在某一个时间点将状态清空,而一旦状态被清空,结果就再也不能重新计算或者更新了。而迟到的元素只能被抛弃或者发送到侧输出流。

window operator API提供了方法来明确声明我们要等待迟到元素。当使用event-time window,我们可以指定一个时间段叫做allowed lateness。window operator如果设置了allowed lateness,这个window operator在水位线没过窗口结束时间时也将不会删除窗口和窗口中的状态。窗口会在一段时间内(allowed lateness设置的)保留所有的元素。

当迟到元素在allowed lateness时间内到达时,这个迟到元素会被实时处理并发送到触发器(trigger)。当水位线没过了窗口结束时间+allowed lateness时间时,窗口会被删除,并且所有后来的迟到的元素都会被丢弃。

Allowed lateness可以使用allowedLateness()方法来指定,如下所示:

    val readings: DataStream[SensorReading] = ...val countPer10Secs: DataStream[(String, Long, Int, String)] = readings.keyBy(_.id).timeWindow(Time.seconds(10))// process late readings for 5 additional seconds.allowedLateness(Time.seconds(5))// count readings and update results if late readings arrive.process(new UpdatingWindowCountFunction)/** A counting WindowProcessFunction that distinguishes between * first results and updates. */class UpdatingWindowCountFunctionextends ProcessWindowFunction[SensorReading,(String, Long, Int, String), String, TimeWindow] {override def process(id: String,ctx: Context,elements: Iterable[SensorReading],out: Collector[(String, Long, Int, String)]): Unit = {// count the number of readingsval cnt = elements.count(_ => true)// state to check if this is// the first evaluation of the window or notval isUpdate = ctx.windowState.getState(new ValueStateDescriptor[Boolean]("isUpdate",Types.of[Boolean]))if (!isUpdate.value()) {// first evaluation, emit first resultout.collect((id, ctx.window.getEnd, cnt, "first"))isUpdate.update(true)} else {// not the first evaluation, emit an updateout.collect((id, ctx.window.getEnd, cnt, "update"))}}}

总结

对迟到元素处理,要根据具体业务权衡利弊。
对于不是很重要的数据,并且追求实效性可以直接抛弃。
对于数据实效性可以一定容忍,可以使用WaterMark去延迟处理数据。
对于数据很重要,并且要求很实时计算,可以加入Allowed lateness 不关闭窗口延迟更新。但是注意这会消耗大量的资源。
甚至可以结合WaterMark再加上Allowed lateness来处理延迟数据。

Flink 迟到元素的处理相关推荐

  1. Flink迟到数据处理

    一.迟到的数据如何处理? Event Time语义下我们使用Watermark来判断数据是否迟到.一个迟到元素是指元素到达窗口算子时,该元素本该被分配到某个窗口,但由于延迟,窗口已经触发计算.目前Fl ...

  2. Flink迟到数据输出到测输出流

    一.迟到的数据如何处理? Event Time语义下我们使用Watermark来判断数据是否迟到.一个迟到元素是指元素到达窗口算子时,该元素本该被分配到某个窗口,但由于延迟,窗口已经触发计算.目前Fl ...

  3. flink 三种时间机制_Flink时间系列:Event Time下如何处理迟到数据

    Event Time语义下我们使用Watermark来判断数据是否迟到.一个迟到元素是指元素到达窗口算子时,该元素本该被分配到某个窗口,但由于延迟,窗口已经触发计算.目前Flink有三种处理迟到数据的 ...

  4. Flink 对于迟到数据的处理

    WaterMark 和 Window 机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理. Event Time语义下我们使用Watermark来判断数 ...

  5. Flink对迟到数据的处理的三种方式

    ** Flink对迟到数据的处理 ** 水位线可以用来平衡计算的完整性和延迟两方面.除非我们选择一种非常保守的水位线策略(最大延时设置的非常大,以至于包含了所有的元素,但结果是非常大的延迟),否则我们 ...

  6. 深度解读!新一代大数据引擎Flink厉害在哪?(附实现原理细节)

    导语 | 大数据计算分为离线计算和实时计算,其中离线计算就是我们通常说的批计算,代表技术是Hadoop MapReduce.Hive等:实时计算也被称作流计算,代表技术是Storm.Spark Str ...

  7. 追源索骥:透过源码看懂Flink核心框架的执行流程

    https://www.cnblogs.com/bethunebtj/p/9168274.html 追源索骥:透过源码看懂Flink核心框架的执行流程 前言 1.从 Hello,World WordC ...

  8. flink 不设置水印_区分理解Flink水印延迟与窗口允许延迟的概念

    link 在开窗处理事件时间(Event Time) 数据时,可设置水印延迟以及设置窗口允许延迟(allowedLateness)以保证数据的完整性.这两者因都是设置延迟时间所以刚接触时容易混淆.本文 ...

  9. flink source 同步_Flink面试题

    1.面试题一:应用架构 问题:公司怎么提交的实时任务,有多少 Job Manager? 解答: 1. 我们使用 yarn session 模式提交任务.每次提交都会创建一个新的 Flink 集群,为每 ...

最新文章

  1. python连接mongo数据库
  2. Api demo源码学习(8)--App/Activity/QuickContactsDemo --获取系统联系人信息
  3. maven私有库配置
  4. Pycharm 项目运行的多种技巧
  5. Scala的List集合和Set集合
  6. 负载均衡 > 用户指南 > 健康检查 > 健康检查概述
  7. 思考题2(人车关系)
  8. Android8.0 开机启动脚本,Android开机启动shell脚本(Android 8.0测试OK)
  9. 算法: 唯一路径62. Unique Paths
  10. 算法:Linked List Cycle(环形链表)
  11. tcp多进程文件传输服务器,TCP/IP网络编程 Chap10. 多进程服务器端
  12. 强制停用华为桌面,换第三方桌面
  13. hdu 吉哥系列故事——完美队形 (最长公共子序列)
  14. java构造扑克牌算法_java扑克牌算法
  15. flask中'bool' object has no attribute '__call__'问题
  16. mysql如何使用多核cpu_利用多核 CPU 实现并行计算
  17. trove 镜像制作
  18. 2021-10-21 markdown模板
  19. 自制p站小姐姐图片返回api.
  20. linux下emmc自动格式化和自动挂载

热门文章

  1. 全军出击机器人进房间_全军出击,“机器人总动员”来北京啦!
  2. 95后阿里P7晒出工资单:狠补了这个,真香....
  3. Redboot安装历程
  4. 在一个数组中找到几个数之和为某个数字
  5. 手机用户界面和多媒体版面有价值问题整理[j2medev.com][0406更新]
  6. 经济管理 第1章 经济与管理概述
  7. 齿轮箱故障诊断技术(一)
  8. XlsxWriter模块常用方法说明
  9. 值得长期持有的10只成长股
  10. GridView文本自动换行