flink 空闲窗口-withIdleness
flink 空闲窗口
flink多并行时,如果有窗口中没数据,那么有数据的窗口即使watermark到达了触发边界,barren没对齐,窗口也不会触发计算。这样的空窗口即空闲窗口。可通过设置空闲时间(withIdleness)来使有数据的窗口进行触发。
parallellism:2
windowSize: 10s
forBoundOutofOrderness: 5s
withIdleness:10s
窗口数据触发范围为[n*size, (n+1)*size+5)
窗口数据有效范围为[n*size, (n+1)*size)
输入:
a 1
a 10
a 15 -- 到达第一个窗口触发边界,因有两个task,另一个task中并无数据,所以不会正常触发
a 20 第二个窗口task2 数据
b 1 -- 过期被丢弃
b 19 第二个窗口task1 数据
b 25 第二个窗口task1 数据,且到达触发边界,窗口触发
输出:
2> (a,1)
======================= ^^^ window_num: 0 ^^^ ==================
======================= range: [0___10000 )
======================= key: a
======================= watermark: 9999
======================================================
只有一个窗口中有数据,到达空闲时间10s后才会触发1> (b,19)
2> (a,10)
======================= ^^^ window_num: 0 ^^^ ==================
======================= range: [10000___20000 )
======================= key: b
2> (side-a,10)
======================= watermark: 19999
======================================================
2> (a,15)
======================= ^^^ window_num: 1 ^^^ ==================
======================= range: [10000___20000 )
======================= key: a
======================= watermark: 19999
======================================================
两个窗口中都有数据,当有一个窗口数到达触发时间戳数据两个窗口都会触发
demo:
//定义侧流标签private static OutputTag<Tuple2<String, Integer>> outputTag = new OutputTag<Tuple2<String, Integer>>("dayle_data2"){};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Tuple2<String, Integer>> dataStreams = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {Random random = new Random();int tp2 = 0;int speed = 0;int f1 = -1;@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {while (true) {TimeUnit.SECONDS.sleep(1);tp2 = Math.abs(random.nextInt() % 7);f1 = Math.abs(++speed + tp2);ctx.collect(Tuple2.of("a", f1));System.out.println("source generator :\t" + f1);}}@Overridepublic void cancel() {}});//必须为匿名内部类OutputTag<String> erroOutputTag = new OutputTag<String>("formatExceptionData"){} ;SingleOutputStreamOperator<Tuple2<String, Integer>> socketTextStream = env.socketTextStream("localhost", 10086)//异常数据侧流输出.process(new FlatMapProcess(erroOutputTag));SingleOutputStreamOperator<Tuple2<String, Integer>> tuple2Stream = ReduceWindowPrint(socketTextStream, 5);//获取测流socketTextStream.getSideOutput(erroOutputTag).print();tuple2Stream.getSideOutput(outputTag).print();//正常处理的主流tuple2Stream.print();env.execute("event time process ");}private static SingleOutputStreamOperator<Tuple2<String, Integer>> ReduceWindowPrint(SingleOutputStreamOperator<Tuple2<String, Integer>> dataStreams, int dayleTime) {return dataStreams.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(dayleTime)).withTimestampAssigner((e, t) -> e.f1*1000)//多并行下如果有窗口数据为空,那么窗口需要barre对齐,不会触发。// 空窗口将导致下游算子都无法进行计算,// 设置idleness时间那么如果存在空窗口,当别的窗口有数据并且到达设置的时间(下面为10s)// 窗口就会触发.withIdleness(Duration.ofSeconds(10))).keyBy(e -> e.f0).window(TumblingEventTimeWindows.of(Time.seconds(10)))//允许上个窗口数据迟到时间
// .allowedLateness(Time.seconds(5))/* .sideOutputLateData(new OutputTag<Tuple2<String, Integer>>("sideOutput_dayle_data1", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})))*/.process(new ProcessWindowFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String, TimeWindow>() {int i;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);i = 0;}@Overridepublic synchronized void process(String s, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {Iterator<Tuple2<String, Integer>> iterator = elements.iterator();while (iterator.hasNext()) {Tuple2<String, Integer> next = iterator.next();out.collect(next);if (next.f1%2 == 0) {//收集数据到侧流context.<Tuple2<String, Integer>>output(outputTag,Tuple2.of("side-" + next.f0, next.f1));}}System.out.println("======================= ^^^ window_num:\t" + i++ + " ^^^ ==================");System.out.println("======================= range: [" + context.window().getStart() + "___" + context.window().getEnd() + " )");System.out.println("======================= key: " + s );System.out.println("======================= watermark: " + context.currentWatermark());System.out.println("======================================================");}}); }
}
flink 空闲窗口-withIdleness相关推荐
- flink 自定义 窗口_【Flink 精选】阐述 Watermark 机制,剖析 Watermark 的产生和传递流程...
本文阐述 Flink 的事件时间和 Watermark 机制,剖析 Watermark 产生和传递的流程. 1 Event time 和 Watermark 的关系 1.1 Event time 和 ...
- Flink之窗口 (Window) 下篇
窗口函数(Window Functions) 定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了:至于收集起来到底要做什么,其实还完全没有头绪.所以在窗口分配器之后,必须再接上一个 ...
- Flink专题四:Flink DataStream 窗口介绍及使用
由于工作需要最近学习flink 现记录下Flink介绍和实际使用过程 这是flink系列的第四篇文章 Flink DataStream 窗口介绍及使用 窗口介绍 时间窗口 翻滚窗口(数据以一个时间断为 ...
- Flink之窗口 (Window) 上篇
我们已经了解了 Flink 中事件时间和水位线的概念,那它们有什么具体应用呢?当然是做基于时间的处理计算了.其中最常见的场景,就是窗口聚合计算. 之前我们已经了解了 Flink 中基本的聚合操作.在流 ...
- 【随记】Flink 时间窗口的起始时间
话不多说,直接上手今天的主题,探索一个容易让人忽略和困惑的问题:Flink 时间窗口的起始时间 就以最简单的demo为例: timeWindow(Time.seconds(5)) 上述定义一个步长为5 ...
- flink时间窗口无新的数据进来最后一个窗口不关闭
测试反馈, 配置的flink任务提交上去后, 输入数据源符合条件,到时间窗口的size.最后一个窗口没有闭窗计算,数据并没及时输出告警 经过调试发现,watermark没有向后继续推进,导致无法闭窗, ...
- flink 自定义 窗口_Flink入门实战 (下)
一. 时间语义与 Wartermark 1. Flink 中的时间语义 在 Flink 的流式处理中,会涉及到时间的不同概念,如下图所示: Event Time:是事件创建的时间.它通常由事件中的时间 ...
- flink 自定义 窗口_《从0到1学习Flink》—— Flink Data transformation(转换)
前言 在第一篇介绍 Flink 的文章 <<从0到1学习Flink>-- Apache Flink 介绍> 中就说过 Flink 程序的结构 Flink 应用程序结构就是如上图 ...
- Flink的窗口聚合操作(Time\Count Window)
窗口基本概念:Flink中的窗口是左闭右开的窗口 Flink认为批处理是流处理的一个特例,而窗口window就是从流处理到批处理的一个桥梁,通常来讲窗口就是用来将无线数据流转换为优先数据集,从而在优先 ...
最新文章
- VUE保存页面的数据,VUE页面显示就执行某个函数,VUE页面隐藏就执行某个函数
- [转]ArcGIS.Server.9.3和ArcGIS API for Flex实现Query查询定位中心功能(七)
- [我的1024开源程序]200元仿豆瓣小程序带评论
- binlog日志_mysql日志系统
- 8 年后重登王座,Python 再度成为 TIOBE 年度编程语言
- 挖漏经验:在密码重置请求包中添加X-Forwarded-Host实现受害者账户完全劫持
- python2.7安装报错_python2.7源码安装方式
- 在线文件管理系统 下载地址
- Atitit.软件开发概念(11)--网络子系统--url编码 空格问题URLEncoder java js php
- 安装linux需要最少磁盘分区是多少,320G硬盘只安装Linux分区方案
- 计算机软件工程师考试试题,计算机考试软件工程师试题
- java 中的radix_Java Scanner radix()用法及代码示例
- Service(LoadBalancer)
- PSINS不可交换(圆锥/划桨)误差补偿
- 雷柏V500机械键盘——重复按键故障原因之一
- WebGoat v8.1.0 下载安装(windows)
- tcpdump for udp
- 四、Node.js - 数据库与身份认证
- React使用过程知识点随手记
- D14.系统模块和文件操作
热门文章
- IGBT学习记录(一)
- Nginx配置http升级https
- vue.js 密码加密_Word2007/2016/2019文档加密的方法
- 业务运营支撑系统 BOSS(Business Operation Support System)。
- 平安科技Java开发三面面经(2018年12月)
- 011-Java代理模式
- 计算机专业eng4u,加拿大高中文凭ossd项目之ENG4U英语4U课程内容介绍!
- Qt QVector “isDetached()“
- 毕业设计 - 题目_ 基于单片机的智能小车 - 嵌入式 物联网 本科毕设
- 360度全景标定方法_全景摄像机标定方法综述