「Flink」使用Managed Keyed State实现计数窗口功能
先上代码:
public class WordCountKeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 初始化测试单词数据流DataStreamSource<String> lineDS = env.addSource(new RichSourceFunction<String>() {private boolean isCanaled = false;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while(!isCanaled) {ctx.collect("hadoop flink spark");Thread.sleep(1000);}}@Overridepublic void cancel() {isCanaled = true;}});// 切割单词,并转换为元组SingleOutputStreamOperator<Tuple2<String, Integer>> wordTupleDS = lineDS.flatMap((String line, Collector<Tuple2<String, Integer>> ctx) -> {Arrays.stream(line.split(" ")).forEach(word -> ctx.collect(Tuple2.of(word, 1)));}).returns(Types.TUPLE(Types.STRING, Types.INT));// 按照单词进行分组KeyedStream<Tuple2<String, Integer>, Integer> keyedWordTupleDS = wordTupleDS.keyBy(t -> t.f0);// 对单词进行计数keyedWordTupleDS.flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {private transient ValueState<Tuple2<Integer, Integer>> countSumValueState;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化ValueStateValueStateDescriptor<Tuple2<Integer, Integer>> countSumValueStateDesc = new ValueStateDescriptor("countSumValueState",TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}));countSumValueState = getRuntimeContext().getState(countSumValueStateDesc);}@Overridepublic void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {if(countSumValueState.value() == null) {countSumValueState.update(Tuple2.of(0, 0));}Integer count = countSumValueState.value().f0;count++;Integer valueSum = countSumValueState.value().f1;valueSum += value.f1;countSumValueState.update(Tuple2.of(count, valueSum));// 每当达到3次,发送到下游if(count > 3) {out.collect(Tuple2.of(value.f0, valueSum));// 清除计数countSumValueState.update(Tuple2.of(0, valueSum));}}}).print();env.execute("KeyedState State");} }
代码说明:
1、构建测试数据源,每秒钟发送一次文本,为了测试方便,这里就发一个包含三个单词的文本行
2、对句子按照空格切分,并将单词转换为元组,每个单词初始出现的次数为1
3、按照单词进行分组
4、自定义FlatMap
初始化ValueState,注意:ValueState只能在KeyedStream中使用,而且每一个ValueState都对一个一个key。每当一个并发处理ValueState,都会从上下文获取到Key的取值,所以每个处理逻辑拿到的ValueStated都是对应指定key的ValueState,这个部分是由Flink自动完成的。
注意:
带默认初始值的ValueStateDescriptor已经过期了,官方推荐让我们手动在处理时检查是否为空
instead and manually manage the default value by checking whether the contents of the state is null.
”
/** * Creates a new {@code ValueStateDescriptor} with the given name, default value, and the specific * serializer. * * @deprecated Use {@link #ValueStateDescriptor(String, TypeSerializer)} instead and manually * manage the default value by checking whether the contents of the state is {@code null}. * * @param name The (unique) name for the state. * @param typeSerializer The type serializer of the values in the state. * @param defaultValue The default value that will be set when requesting state without setting * a value before. */@Deprecatedpublic ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer, T defaultValue) { super(name, typeSerializer, defaultValue);}
5、逻辑实现
在flatMap逻辑中判断ValueState是否已经初始化,如果没有手动给一个初始值。并进行累加后更新。每当count > 3发送计算结果到下游,并清空计数。
「Flink」使用Managed Keyed State实现计数窗口功能相关推荐
- 一开机就是coloros恢复模式_「系统」一加氢OS 11升级 这俩功能终于等到 | realme新UI十月见...
8月10日,一加正式发布了全新的氢OS 11,但功能性变化不多,主要还是集中在视觉上, 全新天气App,智能图库,文档矫正.智能剪辑.便签新增语音输入AI转换成文字,全新暗色模式动画.ORM内存管理技 ...
- 「题解」:[组合数学]:Perm 排列计数
题干: Description称一个1,2,-,N的排列P1,P2-,Pn是Magic的,当且仅当2<=i<=N时,Pi>Pi/2. 计算1,2,-N的排列中有多少是Magic的,答 ...
- 新手必备pr 2021快速入门教程「六」时间轴面板的认识及使用功能
PR2021快速入门教程,学完之后,制作抖音视频,vlog,电影混剪,日常记录等不在话下!零基础,欢迎入坑! 本节内容 上节内容我们介绍了PR素材的预览及源面板素材监视窗口的操作认识,最后还给大家讲解 ...
- 1.15.Flink state(状态)管理与恢复、什么是state、Keyed State、Operator State、状态容错(生成快照,恢复快照),checkPoint简介,重启策略等
1.15.Flink state(状态)管理与恢复 1.15.1.什么是state 1.15.2.状态(State) 1.15.3.Keyed State 1.15.4.Operator State ...
- 创建可微物理引擎Nimble,开源SOTA人体骨骼模型,斯坦福腿疾博士生用AI「助跑」人生...
来源:机器之心本文约2000字,建议阅读5分钟 身残志坚,斯坦福大学的这位人工智能 + 机器人博士生想用技术克服身体缺陷. 有一位研究者,他身患残疾,出生时便患有退行性神经疾病,该疾病一直攻击着他的外 ...
- 两个iphone怎么大量传照片_「唯物」传个视频到 iPhone,有了新方法
出个难题:老婆用 iPhone 刚录的视频,想传到老公 Android 手机上,或者反方向老公传给老婆,有什么简便方法? 以下两个,可能是普通人最容易想到的: 有网的话,通过云存储中转:iPhone ...
- ChatGPT还在2G冲浪?新模型「youChat」:我已能够解说2022世界杯
视学算法报道 编辑:蛋酱.小舟 youChat 能成为搜索引擎变革的先行者吗? ChatGPT 自推出以来就被寄予厚望,一些人认为它会取代搜索引擎,成为「改变游戏规则的人」. 真的会有这一天吗?至少, ...
- 杰理ac18芯片_杰理科技推出:「梧桐」系列双模音视频 AIoT 芯片
近日,杰理科技正式推出「梧桐」WiFi/BT 双模音视频 AIoT 芯片,包括针对高端智能音箱的芯片 AC7901.全功能芯片 AC7902.WiFi 蓝牙控制器 AC7903 系列芯片.作为旗下高端 ...
- 从 iOS 14 到 Android 12,桌面小组件是怎么「文艺复兴」的
本文转载自 极客公园 时尚界一直以来有一个著名的理论:在某一时代流行的时尚元素,在经过一段时间的沉寂之后,会被人们再次拿出来利用. 这便是「弗莱定律」,它解释了为什么在长期的历史中,为什么很多曾经时尚 ...
最新文章
- Apache HTTP Server Version 2.2 文档中文版
- 《棋牌游戏服务器》玩法服务器架构
- 【VMCloud云平台】SCOM配置(四)-监控应用可用性
- Win10下安装wireshark不能正常使用,cmd管理员身份调用net start npf命令显示无法启动该服务
- Python3.6 deep learning first step
- python中property函数_Python中的property()函数
- The VMRC console has disconnected solution
- FAT文件系统与文件恢复
- 学计算机应该买多大尺寸笔记本电脑,笔记本电脑买几寸的比较合适?
- iText API操作doc文档
- Python 分析中国城市夜间灯光数据
- 小程序免官费注册和突破实名只能绑定5个小程序的限制
- word embedding(详细讲解word embedding)
- 查询2021年天柱二中高考成绩喜报,凯里一中2013年高考喜报教学内容(11页)-原创力文档...
- C语言结构体中的位段
- 基于等效电路模型(RC)的锂离子电池参数在线辨识
- 数据分析从业者需具备的核心能力都有哪些?
- 安装VS2003出现问题——Microsoft FrontPage 2000 Web 扩展客户端安装不成功 及其解决方法
- 自适应网页ios失效
- 女士学计算机适合从事什么工作,女生学计算机专业好吗 适合的岗位有哪些