flink 空闲窗口

flink多并行时,如果有窗口中没数据,那么有数据的窗口即使watermark到达了触发边界,barren没对齐,窗口也不会触发计算。这样的空窗口即空闲窗口。可通过设置空闲时间(withIdleness)来使有数据的窗口进行触发。

parallellism:2

windowSize: 10s

forBoundOutofOrderness: 5s

withIdleness:10s

窗口数据触发范围为[n*size, (n+1)*size+5)

窗口数据有效范围为[n*size, (n+1)*size)

输入:

a 1
a 10
a 15    -- 到达第一个窗口触发边界,因有两个task,另一个task中并无数据,所以不会正常触发
a 20    第二个窗口task2 数据
b 1     -- 过期被丢弃
b 19    第二个窗口task1 数据
b 25    第二个窗口task1 数据,且到达触发边界,窗口触发

输出:

2> (a,1)
=======================  ^^^ window_num: 0 ^^^ ==================
======================= range: [0___10000 )
======================= key: a
======================= watermark: 9999
======================================================
只有一个窗口中有数据,到达空闲时间10s后才会触发1> (b,19)
2> (a,10)
=======================  ^^^ window_num: 0 ^^^ ==================
======================= range: [10000___20000 )
======================= key: b
2> (side-a,10)
======================= watermark: 19999
======================================================
2> (a,15)
=======================  ^^^ window_num: 1 ^^^ ==================
======================= range: [10000___20000 )
======================= key: a
======================= watermark: 19999
======================================================
两个窗口中都有数据,当有一个窗口数到达触发时间戳数据两个窗口都会触发

demo:

    //定义侧流标签private static OutputTag<Tuple2<String, Integer>> outputTag = new OutputTag<Tuple2<String, Integer>>("dayle_data2"){};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Tuple2<String, Integer>> dataStreams = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {Random random = new Random();int tp2 = 0;int speed = 0;int f1 = -1;@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {while (true) {TimeUnit.SECONDS.sleep(1);tp2 = Math.abs(random.nextInt() % 7);f1 = Math.abs(++speed + tp2);ctx.collect(Tuple2.of("a", f1));System.out.println("source generator :\t" + f1);}}@Overridepublic void cancel() {}});//必须为匿名内部类OutputTag<String> erroOutputTag = new OutputTag<String>("formatExceptionData"){} ;SingleOutputStreamOperator<Tuple2<String, Integer>> socketTextStream = env.socketTextStream("localhost", 10086)//异常数据侧流输出.process(new FlatMapProcess(erroOutputTag));SingleOutputStreamOperator<Tuple2<String, Integer>> tuple2Stream = ReduceWindowPrint(socketTextStream, 5);//获取测流socketTextStream.getSideOutput(erroOutputTag).print();tuple2Stream.getSideOutput(outputTag).print();//正常处理的主流tuple2Stream.print();env.execute("event time process ");}private static SingleOutputStreamOperator<Tuple2<String, Integer>> ReduceWindowPrint(SingleOutputStreamOperator<Tuple2<String, Integer>> dataStreams, int dayleTime) {return dataStreams.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(dayleTime)).withTimestampAssigner((e, t) -> e.f1*1000)//多并行下如果有窗口数据为空,那么窗口需要barre对齐,不会触发。// 空窗口将导致下游算子都无法进行计算,// 设置idleness时间那么如果存在空窗口,当别的窗口有数据并且到达设置的时间(下面为10s)// 窗口就会触发.withIdleness(Duration.ofSeconds(10))).keyBy(e -> e.f0).window(TumblingEventTimeWindows.of(Time.seconds(10)))//允许上个窗口数据迟到时间
//                .allowedLateness(Time.seconds(5))/* .sideOutputLateData(new OutputTag<Tuple2<String, Integer>>("sideOutput_dayle_data1", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})))*/.process(new ProcessWindowFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String, TimeWindow>() {int i;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);i = 0;}@Overridepublic synchronized void process(String s, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {Iterator<Tuple2<String, Integer>> iterator = elements.iterator();while (iterator.hasNext()) {Tuple2<String, Integer> next = iterator.next();out.collect(next);if (next.f1%2 == 0) {//收集数据到侧流context.<Tuple2<String, Integer>>output(outputTag,Tuple2.of("side-" + next.f0, next.f1));}}System.out.println("=======================  ^^^ window_num:\t" + i++ + " ^^^ ==================");System.out.println("======================= range: [" + context.window().getStart() + "___" + context.window().getEnd() + " )");System.out.println("======================= key: " + s );System.out.println("======================= watermark: " + context.currentWatermark());System.out.println("======================================================");}}); }
}

flink 空闲窗口-withIdleness相关推荐

  1. flink 自定义 窗口_【Flink 精选】阐述 Watermark 机制,剖析 Watermark 的产生和传递流程...

    本文阐述 Flink 的事件时间和 Watermark 机制,剖析 Watermark 产生和传递的流程. 1 Event time 和 Watermark 的关系 1.1 Event time 和 ...

  2. Flink之窗口 (Window) 下篇

    窗口函数(Window Functions) 定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了:至于收集起来到底要做什么,其实还完全没有头绪.所以在窗口分配器之后,必须再接上一个 ...

  3. Flink专题四:Flink DataStream 窗口介绍及使用

    由于工作需要最近学习flink 现记录下Flink介绍和实际使用过程 这是flink系列的第四篇文章 Flink DataStream 窗口介绍及使用 窗口介绍 时间窗口 翻滚窗口(数据以一个时间断为 ...

  4. Flink之窗口 (Window) 上篇

    我们已经了解了 Flink 中事件时间和水位线的概念,那它们有什么具体应用呢?当然是做基于时间的处理计算了.其中最常见的场景,就是窗口聚合计算. 之前我们已经了解了 Flink 中基本的聚合操作.在流 ...

  5. 【随记】Flink 时间窗口的起始时间

    话不多说,直接上手今天的主题,探索一个容易让人忽略和困惑的问题:Flink 时间窗口的起始时间 就以最简单的demo为例: timeWindow(Time.seconds(5)) 上述定义一个步长为5 ...

  6. flink时间窗口无新的数据进来最后一个窗口不关闭

    测试反馈, 配置的flink任务提交上去后, 输入数据源符合条件,到时间窗口的size.最后一个窗口没有闭窗计算,数据并没及时输出告警 经过调试发现,watermark没有向后继续推进,导致无法闭窗, ...

  7. flink 自定义 窗口_Flink入门实战 (下)

    一. 时间语义与 Wartermark 1. Flink 中的时间语义 在 Flink 的流式处理中,会涉及到时间的不同概念,如下图所示: Event Time:是事件创建的时间.它通常由事件中的时间 ...

  8. flink 自定义 窗口_《从0到1学习Flink》—— Flink Data transformation(转换)

    前言 在第一篇介绍 Flink 的文章 <<从0到1学习Flink>-- Apache Flink 介绍> 中就说过 Flink 程序的结构 Flink 应用程序结构就是如上图 ...

  9. Flink的窗口聚合操作(Time\Count Window)

    窗口基本概念:Flink中的窗口是左闭右开的窗口 Flink认为批处理是流处理的一个特例,而窗口window就是从流处理到批处理的一个桥梁,通常来讲窗口就是用来将无线数据流转换为优先数据集,从而在优先 ...

最新文章

  1. VUE保存页面的数据,VUE页面显示就执行某个函数,VUE页面隐藏就执行某个函数
  2. [转]ArcGIS.Server.9.3和ArcGIS API for Flex实现Query查询定位中心功能(七)
  3. [我的1024开源程序]200元仿豆瓣小程序带评论
  4. binlog日志_mysql日志系统
  5. 8 年后重登王座,Python 再度成为 TIOBE 年度编程语言
  6. 挖漏经验:在密码重置请求包中添加X-Forwarded-Host实现受害者账户完全劫持
  7. python2.7安装报错_python2.7源码安装方式
  8. 在线文件管理系统 下载地址
  9. Atitit.软件开发概念(11)--网络子系统--url编码 空格问题URLEncoder java js php
  10. 安装linux需要最少磁盘分区是多少,320G硬盘只安装Linux分区方案
  11. 计算机软件工程师考试试题,计算机考试软件工程师试题
  12. java 中的radix_Java Scanner radix()用法及代码示例
  13. Service(LoadBalancer)
  14. PSINS不可交换(圆锥/划桨)误差补偿
  15. 雷柏V500机械键盘——重复按键故障原因之一
  16. WebGoat v8.1.0 下载安装(windows)
  17. tcpdump for udp
  18. 四、Node.js - 数据库与身份认证
  19. React使用过程知识点随手记
  20. D14.系统模块和文件操作

热门文章

  1. IGBT学习记录(一)
  2. Nginx配置http升级https
  3. vue.js 密码加密_Word2007/2016/2019文档加密的方法
  4. 业务运营支撑系统  BOSS(Business Operation Support System)。
  5. 平安科技Java开发三面面经(2018年12月)
  6. 011-Java代理模式
  7. 计算机专业eng4u,加拿大高中文凭ossd项目之ENG4U英语4U课程内容介绍!
  8. Qt QVector “isDetached()“
  9. 毕业设计 - 题目_ 基于单片机的智能小车 - 嵌入式 物联网 本科毕设
  10. 360度全景标定方法_全景摄像机标定方法综述