背景

疑问1.一个窗口会不会变化?

我们都知道flink有窗口和watermark的概念,当watermark大于窗口的endTime,将触发窗口中数据的计算,watermark是一个不断递增的时间戳,是不断变化的,如果我们假设一个窗口的开始时间和结束时间也是不断变化的,那么watermark就不好触发窗口计算。所以根据我们的假设,内心也是认为一个特定的窗口的开始和结束时间肯定是固定的。

疑问2.窗口是怎么初始化的?

如果一个特定的窗口是不会变化的,比如滚动窗口,我们在代码中只需要传入窗口的size,就可以完成窗口的构建,那么窗口的开始时间和结束时间是怎么获取的。

例如我们定义一个窗口:window(TumblingProcessingTimeWindows.of(Time.seconds(10))),当我们当前事件的处理时间是2021-12-20 10:17:14,那么这个事件是属于下面窗口中的哪一个?[10:17:10, 10:17:20),[10:17:11, 10:17:21),[10:17:12, 10:17:22),[10:17:14, 10:17:24),…(共十种可能)

疑问3.如果窗口是固定的,那么第一个窗口开始时间是哪个?

我们知道如果我们设置了窗口大小,那么这些窗口都是固定不变的,也就是说这些窗口都是真实存在的,不管你用不用它,那么第一个窗口是哪一个呢?
答案就是所有类型的窗口,第一个窗口的开始时间都是1970-01-01 08:00:00

ok,我们来着上面的疑问向下走。。。

下面列出窗口的示例代码

source.map(new MyMapFunction()).setParallelism(8).keyBy(s -> s.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction()).setParallelism(8).print();

通过上面TumblingProcessingTimeWindows.of(Time.seconds(10))创建是一个窗口大小是10s的窗口。

public static TumblingProcessingTimeWindows of(Time size) {return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0, WindowStagger.ALIGNED);
}

也就完成了窗口中三个变量的初始化:

private TumblingProcessingTimeWindows(long size, long offset, WindowStagger windowStagger) {if (Math.abs(offset) >= size) {throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy abs(offset) < size");}this.size = size;this.globalOffset = offset;this.windowStagger = windowStagger;
}

可以看到窗口中的size=10000(10秒), globalOffset=0, windowStagger=WindowStagger.ALIGNED(所有窗口都是从0开始)

现在看下TumblingProcessingTimeWindows分配窗口的功能:assignWindows

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {final long now = context.getCurrentProcessingTime();if (staggerOffset == null) {staggerOffset =windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);}long start =TimeWindow.getWindowStartWithOffset(now, (globalOffset + staggerOffset) % size, size);return Collections.singletonList(new TimeWindow(start, start + size));
}

有三个参数element用户发送数据流数据,timestamp该数据流的时间戳,context窗口分配向下文信息,该方法在WindowOperator被调用。该类是处理事件的窗口类,所以该类不是不会使用timestamp该数据流的时间戳。由于windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);返回0,所以staggerOffset就是0。

下面就是重点了,就是获取一个窗口的开始时间,获取开始时间后+窗口大小就是结束时间。

通过以上可知TimeWindow.getWindowStartWithOffset(now, (globalOffset + staggerOffset) % size, size);,第二个参数就是0,now是当前时间戳,size是窗口大小。

public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {return timestamp - (timestamp - offset + windowSize) % windowSize;
}

上面的公式可以简化为timestamp - timestamp % windowSize;

也即是当前时间戳-时间戳在窗口多余的时间,肯定是窗口开始时间。

下面具体例子:2021-12-20 10:17:14 通过 2021-12-20 10:17:14 - 4秒,所以该数据落在[10:17:10-10:17:20)的窗口,同样在10:17:10-10:17:19产生的数据都会落在该窗口中。

所以实际上是不存在这些窗口的[10:17:11, 10:17:21),[10:17:12, 10:17:22),[10:17:14, 10:17:24)…

总结:窗口在定义时候,可以说窗口固化了窗口.
因为我们看到通过上面算法,每个时间都会落到特定的窗口。
后面会验证第一个窗口的开始时间是1970-01-01 08:00:00

再举一个例子:窗口大小改为8秒。

下面是一分钟内,各个处理时间对一个的窗口

id 处理时间 窗口对应的开始时间
0 2021-12-20 16:57:13 2021-12-20 16:57:12
1 2021-12-20 16:57:14 2021-12-20 16:57:12
2 2021-12-20 16:57:15 2021-12-20 16:57:12
3 2021-12-20 16:57:16 2021-12-20 16:57:12
4 2021-12-20 16:57:17 2021-12-20 16:57:12
5 2021-12-20 16:57:18 2021-12-20 16:57:12
6 2021-12-20 16:57:19 2021-12-20 16:57:12
7 2021-12-20 16:57:20 2021-12-20 16:57:20
8 2021-12-20 16:57:21 2021-12-20 16:57:20
9 2021-12-20 16:57:22 2021-12-20 16:57:20
10 2021-12-20 16:57:23 2021-12-20 16:57:20
11 2021-12-20 16:57:24 2021-12-20 16:57:20
12 2021-12-20 16:57:25 2021-12-20 16:57:20
13 2021-12-20 16:57:26 2021-12-20 16:57:20
14 2021-12-20 16:57:27 2021-12-20 16:57:20
15 2021-12-20 16:57:28 2021-12-20 16:57:28
16 2021-12-20 16:57:29 2021-12-20 16:57:28
17 2021-12-20 16:57:30 2021-12-20 16:57:28
18 2021-12-20 16:57:31 2021-12-20 16:57:28
19 2021-12-20 16:57:32 2021-12-20 16:57:28
20 2021-12-20 16:57:33 2021-12-20 16:57:28
21 2021-12-20 16:57:34 2021-12-20 16:57:28
22 2021-12-20 16:57:35 2021-12-20 16:57:28
23 2021-12-20 16:57:36 2021-12-20 16:57:36
24 2021-12-20 16:57:37 2021-12-20 16:57:36
25 2021-12-20 16:57:38 2021-12-20 16:57:36
26 2021-12-20 16:57:39 2021-12-20 16:57:36
27 2021-12-20 16:57:40 2021-12-20 16:57:36
28 2021-12-20 16:57:41 2021-12-20 16:57:36
29 2021-12-20 16:57:42 2021-12-20 16:57:36
30 2021-12-20 16:57:43 2021-12-20 16:57:36
31 2021-12-20 16:57:44 2021-12-20 16:57:44
32 2021-12-20 16:57:45 2021-12-20 16:57:44
33 2021-12-20 16:57:46 2021-12-20 16:57:44
34 2021-12-20 16:57:47 2021-12-20 16:57:44
35 2021-12-20 16:57:48 2021-12-20 16:57:44
36 2021-12-20 16:57:49 2021-12-20 16:57:44
37 2021-12-20 16:57:50 2021-12-20 16:57:44
38 2021-12-20 16:57:51 2021-12-20 16:57:44
39 2021-12-20 16:57:52 2021-12-20 16:57:52
40 2021-12-20 16:57:53 2021-12-20 16:57:52
41 2021-12-20 16:57:54 2021-12-20 16:57:52
42 2021-12-20 16:57:55 2021-12-20 16:57:52
43 2021-12-20 16:57:56 2021-12-20 16:57:52
44 2021-12-20 16:57:57 2021-12-20 16:57:52
45 2021-12-20 16:57:58 2021-12-20 16:57:52
46 2021-12-20 16:57:59 2021-12-20 16:57:52
47 2021-12-20 16:58:00 2021-12-20 16:58:00
48 2021-12-20 16:58:01 2021-12-20 16:58:00
49 2021-12-20 16:58:02 2021-12-20 16:58:00
50 2021-12-20 16:58:03 2021-12-20 16:58:00
51 2021-12-20 16:58:04 2021-12-20 16:58:00
52 2021-12-20 16:58:05 2021-12-20 16:58:00
53 2021-12-20 16:58:06 2021-12-20 16:58:00
54 2021-12-20 16:58:07 2021-12-20 16:58:00
55 2021-12-20 16:58:08 2021-12-20 16:58:08
56 2021-12-20 16:58:09 2021-12-20 16:58:08
57 2021-12-20 16:58:10 2021-12-20 16:58:08
58 2021-12-20 16:58:11 2021-12-20 16:58:08
59 2021-12-20 16:58:12 2021-12-20 16:58:08

用上面的时间验证第一个窗口的开始时间。例如上面第一个时间是2021-12-20 16:57:13那么也就是1639990633000,所以(1639990633000-0)/8000=204998829 (取整),然后204998829 * 8000 = 1639990632000 也就是2021-12-20 16:57:12就是这个窗口的开始时间。
ok 上面0就是1970-01-01 08:00:00

我们WindowOperator就是封窗用户WindowFunction处理功能的类,下面贴下代码

@Override
public void processElement(StreamRecord<IN> element) throws Exception {final Collection<W> elementWindows =windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);...
}

对于事件时间也是一样处理,只是不是使用当前时间计算所属的窗口,而是使用事件时间

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {if (timestamp > Long.MIN_VALUE) {if (staggerOffset == null) {staggerOffset =windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);}// Long.MIN_VALUE is currently assigned when no timestamp is presentlong start =TimeWindow.getWindowStartWithOffset(timestamp, (globalOffset + staggerOffset) % size, size);return Collections.singletonList(new TimeWindow(start, start + size));} else {throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). "+ "Is the time characteristic set to 'ProcessingTime', or did you forget to call "+ "'DataStream.assignTimestampsAndWatermarks(...)'?");}
}

flink事件属于窗口的计算方法相关推荐

  1. 【Flink】各种窗口的使用(处理时间窗口、事件时间窗口、窗口聚合窗口)

    文章目录 一 Flink 中的 Window 1 Window (1)Window概述 (2) Window类型 a 滚动窗口(Tumbling Windows) b 滑动窗口(Sliding Win ...

  2. 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析

    目录 事件时间窗口分析 时间概念 ​​​​​​​event-time ​​​​​​​延迟数据处理 ​​​​​​​延迟数据 ​​​​​​​Watermarking 水位 ​​​​​​​官方案例演示 事件 ...

  3. 东八区转为0时区_踩坑记 | Flink 天级别窗口中存在的时区问题

    ❝ 本系列每篇文章都是从一些实际的 case 出发,分析一些生产环境中经常会遇到的问题,抛砖引玉,以帮助小伙伴们解决一些实际问题.本文介绍 Flink 时间以及时区问题,分析了在天级别的窗口时会遇到的 ...

  4. 大数据——Flink Window(窗口)机制

    Flink窗口机制 Window(窗口) Tumbling Window(翻滚窗口) Sliding Window(滑动窗口) Sliding Window(滑动窗口)设置Watermark时间 Wi ...

  5. 【Flink】Flink中的窗口API、窗口函数以及迟到数据处理问题

    目录 一.窗口 1.窗口的概念 2.窗口的分类 (1)按照驱动类型分类--时间窗口和计数窗口 (2)按照窗口分配数据的规则分类 3.窗口 API (1)按键分区窗口(Keyed Windows) (2 ...

  6. 【Flink系列】窗口系列简介

    一.窗口概念 窗口:将无限数据切割成有限的"数据块"进行处理,窗口是处理无界流的核心. 窗口更像一个"桶",将流切割成有限大小的多个存储桶,每个数据都会分发到对 ...

  7. Flink应如何处理窗口中迟到的数据?

    一.如何处理迟到的数据 三个步骤: .1 设置水位线延迟时间 因为水位线的延迟主要是用来对付分布式网络传输导致的数据乱序,而网络传输的乱序程度一般并不会很大,大多集中在几毫秒至几百毫秒.所以实际应用中 ...

  8. Flink中window 窗口和时间以及watermark水印

    我们都知道,Flink的核心是流式处理,但同时也支持批处理,Flink底层是一个流式引擎,在这个上面实现了流处理和批处理,而窗口则是批处理的实现. 在Flink中window从大的分类上主要有三种:T ...

  9. Flink学习:Flink如何打印窗口的开始时间和结束时间

    Window 一.简介 二.代码实现 三.测试 一.简介 大家知道,Flink用水位线和窗口机制配合来处理乱序事件,保证窗口计算数据的正确性,当水位线超过窗口结束时间的时候,就会触发窗口计算 水位线是 ...

最新文章

  1. fast rcnn,faster rcnn使用cudann加速问题
  2. 分布式系统中的一致性协议之两阶段提交协议(2PC)
  3. c语言case的值能动态修改吗,java中的switch case语句中,case所对应的数目是不确定的,能否动态改变case...
  4. String使用注意一
  5. 查看文件详细信息linux,linux命令stat,查看文件详细信息
  6. 详解凸优化、贝叶斯、MCMC、GCN
  7. iPhone 13 mini背部新外观曝光:双摄对角线排布
  8. idea2020.2中@test是怎么测试的_[翻译]Angular Schematics: 单元测试
  9. URL different URI
  10. java基本类型有缓冲区类型的有_Java基础(三十四)String、StringBuffer类和数据缓冲区Buffer类...
  11. 淘宝信用等级|淘宝买家信用等级|淘宝卖家信用等级(图片介绍更清晰)
  12. python爬虫英文单词_非常适合新手的一个Python爬虫项目:打造一个英文词汇量测试脚本...
  13. 详解Guitar Pro 7小节的组织定义
  14. sqlserver2008 R2数据库-不允许表修改保存,阻止保存要求重新创建表的更改
  15. dcc-garch matlab,dcc-garch原理简介和模型实现
  16. arma3自定义服务器,《绝地求生》自定义服务器要收费?类似《武装突袭3》
  17. 有光就可以上网?中国工程院院士表示,可见光通信要比5G快10倍!
  18. CSDN换了新的logo
  19. nuxt.js开发环境使用mockjs模拟数据
  20. 阿里云华为云腾讯云被攻击的危害有多大

热门文章

  1. git branch diverged
  2. 测试Bitmap和Marshal.Copy
  3. _pvp_killed_loot
  4. 在cmd下用cd命令进入不了D盘的问题
  5. mqtt服务器搭建php,MQTT 服务端
  6. pycharm zip函数_Zipline+Pycharm 安装与配置
  7. VB6读取INI文件
  8. 海思3518E开发笔记1.5——flash分区及uboot、kernel、rootfs烧写并部署
  9. JavaScript 生成流程图
  10. 模拟简单FTP服务搭建--本地用户访问