Window

  • 一、简介
  • 二、代码实现
  • 三、测试

一、简介

大家知道,Flink用水位线和窗口机制配合来处理乱序事件,保证窗口计算数据的正确性,当水位线超过窗口结束时间的时候,就会触发窗口计算

  • 水位线是动态生成的,根据进入窗口的最大事件时间-允许延迟时间

那么窗口的开始时间和结束时间是怎么计算的呢?这里不讨论计数窗口,因为数量统计很容易知道,只针对时间窗口的计算

  • 滚动时间窗口:按照固定的时间长度对数据进行分组,窗口之间没有重叠,例如,5秒的滚动窗口。开始时间为当前窗口大小的整数倍,结束时间为开始时间加上窗口大小,比如
  • 滑动时间窗口:按照固定的时间长度对数据进行分组,窗口之间有重叠,例如,5秒的滑动窗口,每2秒钟滑动一次。开始时间为当前窗口大小的整数倍加上窗口滑动步长的整数倍,结束时间为开始时间加上窗口大小。
  • 会话窗口:按照数据的时间间隔进行分组,当两个数据之间的时间间隔大于指定的间隔时间时,就认为前一个窗口结束,后一个窗口开始。开始时间为第一个数据的时间戳,结束时间为最后一个数据的时间戳加上间隔时间

看完上面可能还有些疑惑,没关系,下面会有具体示例

二、代码实现

自定义一个Mywindowfunction继承WindowFunction,可以在流上调用这个function从而打印

class MyWindowFunction extends WindowFunction[User, String, Tuple, TimeWindow] {override def apply(key: Tuple, window: TimeWindow, input: Iterable[User], out: Collector[String]): Unit = {val startTime = window.getStartval endTime = window.getEndval result = s"Window start time: $startTime, end time: $endTime"out.collect(result)}
}

三、测试

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.java.tuple._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorcase class User(id:Int,name:String,age:Int,timestamp:Long)
object WaterMarkTest {def main(args: Array[String]): Unit = {val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = TableEnvironment.getTableEnvironment(streamEnv)//指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1, "nie", 22, 1511658001000L),User(2, "xiao", 19, 1511658005000L)).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[User](Time.seconds(2)) {override def extractTimestamp(t: User): Long = t.timestamp})stream.keyBy(0).timeWindow(Time.seconds(2)).apply(new MyWindowFunction).print()streamEnv.execute("test")}
}class MyWindowFunction extends WindowFunction[User, String, Tuple, TimeWindow] {override def apply(key: Tuple, window: TimeWindow, input: Iterable[User], out: Collector[String]): Unit = {val startTime = window.getStartval endTime = window.getEndval result = s"Window start time: $startTime, end time: $endTime"out.collect(result)}
}

比如上面的两条数据时间戳分别为1511658001000L、1511658005000L,转换成日期格式为2017-11-26 9:0:1,2017-11-26 9:0:5,设置为滚动窗口,窗口间隔为2s,上面说到滚动窗口窗口开始时间为当前窗口大小的整数倍,结束时间为开始时间加上窗口大小,那么可以推断窗口为2017-11-26 9:0:0(整数倍) ~ 2017-11-26 9:0:2、2017-11-26 9:0:4 ~ 2017-11-26 9:0:6,运行下代码试下:

确实是一样的,窗口开始时间为事件时间往下推,直到找到窗口大小的整数倍,窗口结束时间就是开始时间加上时间间隔
这时候我们改成滑动窗口,窗口间隔时间为5s,滑动距离为2s,这时候修改一下上述代码

    stream.keyBy(0).timeWindow(Time.seconds(5),Time.seconds(3)).apply(new MyWindowFunction).print()

再运行一下:

上面说到滑动窗口开始时间为当前窗口大小的整数倍加上窗口滑动步长的整数倍,可以理解为窗口大小为5s,滑动距离为2s,事件事件分别为1511658001000L、1511658005000L,相对于1511658001000L,5s的整数倍可以是1511657995000,1511657990000,这时候加上滑动距离的整数倍,可以是1511657992000、1511657994000、1511657998000、1511657997000,为什么窗口最开始为1511657998000,肯定是综合考量的结果

Flink学习:Flink如何打印窗口的开始时间和结束时间相关推荐

  1. js通过开始时间和结束时间计算出中间的所有日期,并且转换为层级结构数组对象,用于甘特图头部日期数据

    写在前面: 先看下最终数据结构展示 time('2020-10-01', '2021-01-06') 需要根据一个开始日期和一个结束日期最后返回以下数组对象 [最外层数组里的每个对象代表了某一年的所有 ...

  2. 数据库中包含开始时间、结束时间,并且查询条件也有开始时间、结束时间的查询方法...

    分类: oracle学习 数据库 例:考试表中有两个字段:startDate.endDate,分别代表考试开始时间.结束时间.现在需要查询某一时间段内正在进行的考试,实际只要满足考试的时间段和查询条件 ...

  3. 泛微oa流程表单之开始时间与结束时间限制在本周且不能跨月

    本次是主表中开始时间和结束时间限制在本周且不能跨月. 如果出现跨月会弹出一个"出现跨月"的提示,并且清空当前所选日期 <script type="text/java ...

  4. java取开始时间和结束时间_java获取指定开始时间与结束时间之间的所有日期

    本文实例为大家分享了java获取指定开始时间与结束时间之间的所有日期的具体代码,供大家参考,具体内容如下 import java.text.SimpleDateFormat; import java. ...

  5. easyui日期处理(开始时间和结束时间)

    easyui日期处理(开始时间和结束时间): <tr><td class="bis_pxzs_form_id_table_td">发证日期:</td& ...

  6. java方法,返回两个日期内的所有date集合,根据开始时间、结束时间得到两个时间段内所有的日期...

    在最近的项目中,有这么一个需求,根据开始时间.结束时间得到两个时间段内所有的日期,以下分享我的代码给大家. 1.以下的这个方法适用于jdk1.5以上的版本 /** * 根据开始时间和结束时间返回时间段 ...

  7. oracle 时间小于,jquery easyui 对于开始时间小于结束时间的判断示例

    对于开始时间小于结束时间的判断可以参考,jquery easyui里的ValidateBox进行判断 好吧!直接上代码 查看内容:按时间: 至 var varify;//用于查询验证,验证开始时间是否 ...

  8. js获取下月时间_JS获取上月,本月,下月的开始时间与结束时间

    ``###JS获取上月,本月,下月的开始时间与结束时间(记录) //获取当天的时间 function getToday() { var date = new Date(); return date . ...

  9. 更改hadoop集群yarn的webui中的开始时间和结束时间为本地时间

    yarn集群的webui地址为:http://rm:8088 执行任务后,任务的开始时间和结束时间都是utc时间,查看很不方便. 查找相关资料发现hadoop有补丁包,补丁地址:https://iss ...

最新文章

  1. matlab里面板有什么作用,MATLAB轻松享受GPU的强大功能
  2. python中单行注释_Python中的单行、多行、中文注释方法
  3. 判断一个字符串是否为回文的递归算法
  4. ASP.NET Core Razor 视图组件
  5. CF1270H Number of Components(线段树)
  6. alinq mysql_ALinq让Mysql变得如此简单_MySQL
  7. SQL中常用的的时间跟日期函数
  8. 面试官:你知道怎么求素数吗?
  9. 鱼和熊掌兼得---STM32调试接口SW动态复用为GPIO的方法
  10. 飞思卡尔mc9s08烧录方法_飞思卡尔单片机烧写程序方法(量产).pdf
  11. 宽带波束形成 matlab,关于均匀圆阵MVDR宽带波束形成的程序
  12. @click.native.prevent
  13. [原创]威胁猎人 | 2018年上半年短视频行业黑灰产研究报告
  14. 在SSH会话中如何实现 X11 Forwarding
  15. arduino esp32 读福申甲醛传感器
  16. 笔记本电脑外接显示器 卡_如何向Mac笔记本电脑添加和配置外接显示器
  17. 电脑公司GHOST WIN7 装机旗舰版 2013 09
  18. 第21章、系统设定工具(网络与打印机)与硬件侦测
  19. 利用时间字符串计算时间差
  20. 【踩坑实录】Mission planner+Ardupilot飞控固件配置教程

热门文章

  1. cccc 天梯赛后总结
  2. aircrack-ng在嵌入式系统的使用
  3. 2010年SQLite学习笔记之三
  4. 定制电液伺服阀控制器
  5. 从哪里下载OpenJDK
  6. 双端影视APP搭建详细教程
  7. c语言的翻译叫什么_C语言如何把它翻译成中文
  8. xmanager xstart连接linux桌面,如何使用xstart运行CDE,KDE以及Gnome?
  9. 【Python学习 】Python的模块或py文件导入
  10. RGB格式详解(三)---RGB像素格式