先上代码:

public class WordCountKeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 初始化测试单词数据流DataStreamSource<String> lineDS = env.addSource(new RichSourceFunction<String>() {private boolean isCanaled = false;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while(!isCanaled) {ctx.collect("hadoop flink spark");Thread.sleep(1000);}}@Overridepublic void cancel() {isCanaled = true;}});// 切割单词,并转换为元组SingleOutputStreamOperator<Tuple2<String, Integer>> wordTupleDS = lineDS.flatMap((String line, Collector<Tuple2<String, Integer>> ctx) -> {Arrays.stream(line.split(" ")).forEach(word -> ctx.collect(Tuple2.of(word, 1)));}).returns(Types.TUPLE(Types.STRING, Types.INT));// 按照单词进行分组KeyedStream<Tuple2<String, Integer>, Integer> keyedWordTupleDS = wordTupleDS.keyBy(t -> t.f0);// 对单词进行计数keyedWordTupleDS.flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {private transient ValueState<Tuple2<Integer, Integer>> countSumValueState;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化ValueStateValueStateDescriptor<Tuple2<Integer, Integer>> countSumValueStateDesc = new ValueStateDescriptor("countSumValueState",TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}));countSumValueState = getRuntimeContext().getState(countSumValueStateDesc);}@Overridepublic void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {if(countSumValueState.value() == null) {countSumValueState.update(Tuple2.of(0, 0));}Integer count = countSumValueState.value().f0;count++;Integer valueSum = countSumValueState.value().f1;valueSum += value.f1;countSumValueState.update(Tuple2.of(count, valueSum));// 每当达到3次,发送到下游if(count > 3) {out.collect(Tuple2.of(value.f0, valueSum));// 清除计数countSumValueState.update(Tuple2.of(0, valueSum));}}}).print();env.execute("KeyedState State");}
}

代码说明:

1、构建测试数据源,每秒钟发送一次文本,为了测试方便,这里就发一个包含三个单词的文本行

2、对句子按照空格切分,并将单词转换为元组,每个单词初始出现的次数为1

3、按照单词进行分组

4、自定义FlatMap

初始化ValueState,注意:ValueState只能在KeyedStream中使用,而且每一个ValueState都对一个一个key。每当一个并发处理ValueState,都会从上下文获取到Key的取值,所以每个处理逻辑拿到的ValueStated都是对应指定key的ValueState,这个部分是由Flink自动完成的。

注意:

带默认初始值的ValueStateDescriptor已经过期了,官方推荐让我们手动在处理时检查是否为空

instead and manually manage the default value by checking whether the contents of the state is null.

/** * Creates a new {@code ValueStateDescriptor} with the given name, default value, and the specific * serializer. * * @deprecated Use {@link #ValueStateDescriptor(String, TypeSerializer)} instead and manually * manage the default value by checking whether the contents of the state is {@code null}. * * @param name The (unique) name for the state. * @param typeSerializer The type serializer of the values in the state. * @param defaultValue The default value that will be set when requesting state without setting *                     a value before. */@Deprecatedpublic ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer, T defaultValue) {   super(name, typeSerializer, defaultValue);}

5、逻辑实现

在flatMap逻辑中判断ValueState是否已经初始化,如果没有手动给一个初始值。并进行累加后更新。每当count > 3发送计算结果到下游,并清空计数。

「Flink」使用Managed Keyed State实现计数窗口功能相关推荐

  1. 一开机就是coloros恢复模式_「系统」一加氢OS 11升级 这俩功能终于等到 | realme新UI十月见...

    8月10日,一加正式发布了全新的氢OS 11,但功能性变化不多,主要还是集中在视觉上, 全新天气App,智能图库,文档矫正.智能剪辑.便签新增语音输入AI转换成文字,全新暗色模式动画.ORM内存管理技 ...

  2. 「题解」:[组合数学]:Perm 排列计数

    题干: Description称一个1,2,-,N的排列P1,P2-,Pn是Magic的,当且仅当2<=i<=N时,Pi>Pi/2. 计算1,2,-N的排列中有多少是Magic的,答 ...

  3. 新手必备pr 2021快速入门教程「六」时间轴面板的认识及使用功能

    PR2021快速入门教程,学完之后,制作抖音视频,vlog,电影混剪,日常记录等不在话下!零基础,欢迎入坑! 本节内容 上节内容我们介绍了PR素材的预览及源面板素材监视窗口的操作认识,最后还给大家讲解 ...

  4. 1.15.Flink state(状态)管理与恢复、什么是state、Keyed State、Operator State、状态容错(生成快照,恢复快照),checkPoint简介,重启策略等

    1.15.Flink state(状态)管理与恢复 1.15.1.什么是state 1.15.2.状态(State) 1.15.3.Keyed State 1.15.4.Operator State ...

  5. 创建可微物理引擎Nimble,开源SOTA人体骨骼模型,斯坦福腿疾博士生用AI「助跑」人生...

    来源:机器之心本文约2000字,建议阅读5分钟 身残志坚,斯坦福大学的这位人工智能 + 机器人博士生想用技术克服身体缺陷. 有一位研究者,他身患残疾,出生时便患有退行性神经疾病,该疾病一直攻击着他的外 ...

  6. 两个iphone怎么大量传照片_「唯物」传个视频到 iPhone,有了新方法

    出个难题:老婆用 iPhone 刚录的视频,想传到老公 Android 手机上,或者反方向老公传给老婆,有什么简便方法? 以下两个,可能是普通人最容易想到的: 有网的话,通过云存储中转:iPhone ...

  7. ChatGPT还在2G冲浪?新模型「youChat」:我已能够解说2022世界杯

    视学算法报道 编辑:蛋酱.小舟 youChat 能成为搜索引擎变革的先行者吗? ChatGPT 自推出以来就被寄予厚望,一些人认为它会取代搜索引擎,成为「改变游戏规则的人」. 真的会有这一天吗?至少, ...

  8. 杰理ac18芯片_杰理科技推出:「梧桐」系列双模音视频 AIoT 芯片

    近日,杰理科技正式推出「梧桐」WiFi/BT 双模音视频 AIoT 芯片,包括针对高端智能音箱的芯片 AC7901.全功能芯片 AC7902.WiFi 蓝牙控制器 AC7903 系列芯片.作为旗下高端 ...

  9. 从 iOS 14 到 Android 12,桌面小组件是怎么「文艺复兴」的

    本文转载自 极客公园 时尚界一直以来有一个著名的理论:在某一时代流行的时尚元素,在经过一段时间的沉寂之后,会被人们再次拿出来利用. 这便是「弗莱定律」,它解释了为什么在长期的历史中,为什么很多曾经时尚 ...

最新文章

  1. Apache HTTP Server Version 2.2 文档中文版
  2. 《棋牌游戏服务器》玩法服务器架构
  3. 【VMCloud云平台】SCOM配置(四)-监控应用可用性
  4. Win10下安装wireshark不能正常使用,cmd管理员身份调用net start npf命令显示无法启动该服务
  5. Python3.6 deep learning first step
  6. python中property函数_Python中的property()函数
  7. The VMRC console has disconnected solution
  8. FAT文件系统与文件恢复
  9. 学计算机应该买多大尺寸笔记本电脑,笔记本电脑买几寸的比较合适?
  10. iText API操作doc文档
  11. Python 分析中国城市夜间灯光数据
  12. 小程序免官费注册和突破实名只能绑定5个小程序的限制
  13. word embedding(详细讲解word embedding)
  14. 查询2021年天柱二中高考成绩喜报,凯里一中2013年高考喜报教学内容(11页)-原创力文档...
  15. C语言结构体中的位段
  16. 基于等效电路模型(RC)的锂离子电池参数在线辨识
  17. 数据分析从业者需具备的核心能力都有哪些?
  18. 安装VS2003出现问题——Microsoft FrontPage 2000 Web 扩展客户端安装不成功 及其解决方法
  19. 自适应网页ios失效
  20. 女士学计算机适合从事什么工作,女生学计算机专业好吗 适合的岗位有哪些

热门文章

  1. linux系统中scp命令的用法
  2. ckeditor3 配置
  3. 236.二叉树的最近公共祖先
  4. epoll实现socket通信
  5. 【AI视野·今日CV 计算机视觉论文速览 第237期】Thu, 30 Sep 2021
  6. 读《C++ Primer(第三版)》的一些疑问(不断更新)
  7. command对象的ExecuteScalar方法
  8. git-撤销工作与的修改-回退缓存区的修改
  9. [转]双线性插值(Bilinear interpolation)
  10. VR版《五十度黑》尺度大?心疼被套路的观众