我们知道,Flink是一个默认就有状态的分析引擎,为避免Task在处理过程中挂掉了,而导致内存中的数据丢失,Flink引入了State和CheckPoint机制,其中State就是Flink的一种基于内存的状态机制,Flink提供了两种基本的状态类型。

一、基本状态Keyed State与Operator State

1、Keyed State

Keyed State:顾名思义就是基于KeyedStream上的状态,这个状态是跟特定的Key绑定的。KeyedStream流上的每一个Key,都对应一个State。意味着这些状态仅可在 KeyedStream 上使用,可以通过 stream.keyBy(...) 得到 KeyedStream。Flink针对KeyedState提供了以下可以保存State的数据结构:

(1)ValueState<T>

保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的key,因此算子接收到的每个key都可能对应一个值)。这个值可以通过update(T)进行更新,通过T value()进行检索。

(2)ListState<T>

保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过add(T)或者addAll(List<T>)进行添加元素,通过Iterable<T> get()获得整个列表。还可以通过update(List<T>)覆盖当前的列表。

(3)MapState<UK,UV>

维护了一个映射列表。你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用put(UK,UV)或者putAll(Map<UK,UV>)添加映射。使用get(UK)检索特定key。使用entries(),keys()和values()分别检索映射、键和值的可迭代视图

(4)ReducingState<T>

保存一个单值,表示添加到状态的所有值的聚合。接口与ListState类似,但使用add(T)增加元素,会使用提供ReduceFunction进行聚合。

(5)AggregatingState<IN,OUT>

保留一个单值,表示添加到状态的所有值的聚合。和ReducingState相反的是,聚合类型可能与添加到状态的元素的类型不同。接口与ListState类似,但使用add(IN)添加的元素会用指定的AggregateFunction进行聚合。

(6)FoldingState<T,ACC>

保留一个单值,表示添加到状态的所有值的聚合。与ReducingState相反,聚合类型可能与添加到状态的元素类型不同。接口与ListState类似,但使用add(T)添加的元素会用指定的FoldFunction折叠成聚合值

2、Operator State

Operator State与Key无关,而是与Operator绑定,整个Operator只对应一个State。比如:Flink中的KafkaConnector就使用了Operator State,它会在每个Connector实例中,保存该实例消费Topic的所有(partition,offset)映射。如下图:

二、状态存在形式managed与raw

Keyed State和Operator State分别有两种存在形式:managed and raw.

1、Managed State

由Flink运行时控制的数据结构表示,比如内部的hashtable或者RocksDB。比如“ValueState”,“ListState”等。Flinkruntime会对这些状态进行编码并写入checkpoint。

2、Raw State

Raw类型的State则保存在算子自己的数据结构中。checkpoint的时候,Flink并不知晓具体的内容,仅仅写入一串字节序列到checkpoint。

所有datastream的function都可以使用managed state,但是raws tate则只能在实现算子的时候使用。由于Flink可以在修改并发时更好的分发状态数据,并且能够更好的管理内存,因此官方建议使用managed state(而不是rawstate)。

如果使用managed state做需要自定义的序列化逻辑,为了后续的兼容性,不要改变Flink的默认序列化方式

三、状态有效期 (TTL)

任何类型的 keyed state 都可以有 有效期 (TTL),如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值。所有状态类型都支持单元素的TTL。 这意味着列表元素和映射元素将独立到期。

在使用状态 TTL 前,需要先构建一个配置StateTtlConfig对象。 然后把配置传递到 state descriptor 中启用 TTL 功能:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
//如ValueState使用
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

TTL配置有以下几个选项:
newBuilder参数表示数据的有效期,是必选项

TTL的更新策略有两个:
OnCreateAndWrite:仅在创建和写入时更新(默认策略)
OnReadAndWrite:读取和写入时时更新

TTL的数据在过期但还未被清理时的可见性配置如下:
NeverReturnExpired:过期数据就像不存在一样,不管是否被物理删除,都不返回过期数据(默认策略)
ReturnExpiredIfNotCleanedUp:会返回过期但未清理的数据,在数据被物理删除前都会返回

四、常见状态使用案例

这里以MapState<UK,UV>为例,存储一对映射关系,常常适用于根据某字段去重

(1)MapState<UK,UV>

输入orderLog.txt:

{"pin":"zhansan","orderId":"20201011231245423","skuId":"1226354","priceType":"new","requestTime":"1599931959000"}
{"pin":"lisi","orderId":"20201011231254678","skuId":"1226322","priceType":"normal","requestTime":"1599931359024"}
{"pin":"zhansan","orderId":"20201011231212768","skuId":"1226324","priceType":"back","requestTime":"1599931359011"}
{"pin":"lisi","orderId":"20201011231234567","skuId":"1226351","priceType":"normal","requestTime":"1599932029000"}
{"pin":"wanwu","orderId":"20201011231245424","skuId":"1226354","priceType":"new","requestTime":"1599931959000"}

OrderLog实体类:

@Data
class OrderLog {private String orderId;private String skuId;private String priceType;private Long requestTime;private Long sum = 1L;private String pin;
}

统计今日下单用户数量,根据用户账号pin字段去重

关键代码如下:

class OutPutWindowProcessFunction extends ProcessWindowFunction <OrderLog, OrderLog, String, TimeWindow>  {private static final long serialVersionUID = -6632888020403733197L;private MapState<String, Integer> mapState = null;@Overridepublic void open(Configuration parameters) throws Exception {//构造TTL过期配置StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.days(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>("day-pin-state", TypeInformation.of(String.class), TypeInformation.of(Integer.class));//设置过期配置mapStateDescriptor.enableTimeToLive(stateTtlConfig);//初始化MapState对象mapState = getRuntimeContext().getMapState(mapStateDescriptor);}@Overridepublic void process(String arg0, ProcessWindowFunction<OrderLog, OrderLog, String, TimeWindow>.Context ctx,Iterable<OrderLog> it, Collector<OrderLog> collect) throws Exception {Iterator<OrderLog> iterator = it.iterator();while (iterator.hasNext()) {OrderLog orderLog = iterator.next();//不存在记录到mapState,并输出给下一个算子if(mapState.get(orderLog.getPin()) == null) {mapState.put(orderLog.getPin(), 1);collect.collect(orderLog);}}}
}

完整代码如下:

import java.util.Iterator;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import com.alibaba.fastjson.JSON;import lombok.Data;public class TestSideOutputStream {public static final OutputTag<OrderLog> LATE_OUTPUT_TAG = new OutputTag<>("LATE_OUTPUT_TAG", TypeInformation.of(OrderLog.class));public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStreamSource<String> textDataSteam = env.readTextFile("C:\\Users\\admin\\Desktop\\orderLog.txt");SingleOutputStreamOperator<OrderLog> dayPvDataStream = textDataSteam.flatMap(new OutPutMapFunction()).assignTimestampsAndWatermarks(new AssignedWaterMarks(Time.seconds(3))) //添加watermark,设置3秒延迟.keyBy(OrderLog::getPin).window(TumblingEventTimeWindows.of(Time.minutes(1), Time.seconds(5)))  //5秒统计一次最近1分钟商品下单排名.allowedLateness(Time.minutes(30))      //允许数据迟到30分钟.sideOutputLateData(TestSideOutputStream.LATE_OUTPUT_TAG) //标记迟到数据
//              .aggregate(new CountAggregateFunction(), new OutResultWindowFunction());//聚合累加,并输出.process(new OutPutWindowProcessFunction());dayPvDataStream.addSink(new SideOutPutSinkFunction());//sink持久化操作dayPvDataStream.getSideOutput(TestSideOutputStream.LATE_OUTPUT_TAG) //通过标签获取延迟数据.keyBy(OrderLog::getPin).window(TumblingEventTimeWindows.of(Time.seconds(3)))  //对迟到数据3秒计算一次.process(new OutPutWindowProcessFunction()).addSink(new SideOutPutSinkFunction2());//对迟到的的数据,增量计算并sink持久化操作env.execute();}
}/*** 水位线,保证按事件时间处理*/
class AssignedWaterMarks extends BoundedOutOfOrdernessTimestampExtractor<OrderLog> {private static final long serialVersionUID = 2021421640499388219L;public AssignedWaterMarks(Time maxOutOfOrderness) {super(maxOutOfOrderness);}@Overridepublic long extractTimestamp(OrderLog orderLog) {return orderLog.getRequestTime();}
}/*** map转换输出*/
class OutPutMapFunction extends RichFlatMapFunction<String, OrderLog> {private static final long serialVersionUID = -6478853684295335571L;@Overridepublic void flatMap(String value, Collector<OrderLog> out) throws Exception {OrderLog orderLog = JSON.parseObject(value, OrderLog.class);out.collect(orderLog);}}/*** 窗口函数*/
class OutPutWindowProcessFunction extends ProcessWindowFunction <OrderLog, OrderLog, String, TimeWindow>  {private static final long serialVersionUID = -6632888020403733197L;private MapState<String, Integer> mapState = null;@Overridepublic void open(Configuration parameters) throws Exception {//构造TTL过期配置StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.days(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>("day-pin-state", TypeInformation.of(String.class), TypeInformation.of(Integer.class));//设置过期配置mapStateDescriptor.enableTimeToLive(stateTtlConfig);//初始化MapState对象mapState = getRuntimeContext().getMapState(mapStateDescriptor);}@Overridepublic void process(String arg0, ProcessWindowFunction<OrderLog, OrderLog, String, TimeWindow>.Context ctx,Iterable<OrderLog> it, Collector<OrderLog> collect) throws Exception {Iterator<OrderLog> iterator = it.iterator();while (iterator.hasNext()) {OrderLog orderLog = iterator.next();//不存在记录到mapState,并输出给下一个算子if(mapState.get(orderLog.getPin()) == null) {mapState.put(orderLog.getPin(), 1);collect.collect(orderLog);}}}
}/*** sink函数*/
class SideOutPutSinkFunction extends RichSinkFunction<OrderLog>  {private static final long serialVersionUID = -6632888020403733197L;@Overridepublic void invoke(OrderLog orderLog, Context context) throws Exception {//做自己的存储计算逻辑,如redis的increase进行累加System.out.println(orderLog.getPin() +"="+ orderLog.getSum());}}
/*** sink函数*/
class SideOutPutSinkFunction2 extends RichSinkFunction<OrderLog>  {private static final long serialVersionUID = -6632888020403733197L;@Overridepublic void invoke(OrderLog orderLog, Context context) throws Exception {//做自己的存储计算逻辑,如redis的increase进行累加
//      System.out.println(orderLog.getPin() +"===="+ orderLog.getSum());}}

Sink中输出如下:

lisi=1
wanwu=1
zhansan=1

(2)ValueState<Tuple2<Long, Long>>

下边是官方的一个示例:实现了一个简单的计数窗口。 我们把元组的第一个元素当作 key(在示例中都 key 都是 “1”)。 该函数将出现的次数以及总和存储在 “ValueState” 中。 一旦出现次数达到 2,则将平均值发送到下游,并清除状态重新开始

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class TestSideOutputStream3 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)).keyBy(0) 按第一个元素分组.flatMap(new CountWindowAverage()).print();env.execute();}
}class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {/*** */private static final long serialVersionUID = -8115459900572098047L;/*** The ValueState handle. The first field is the count, the second field a running sum.*/private transient ValueState<Tuple2<Long, Long>> sum;@Overridepublic void open(Configuration config) {ValueStateDescriptor<Tuple2<Long, Long>> descriptor =new ValueStateDescriptor<>("average", // the state nameTypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}) // type information); // default value of the state, if nothing was setsum = getRuntimeContext().getState(descriptor);}@Overridepublic void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {// access the state valueTuple2<Long, Long> currentSum = sum.value();if(currentSum == null) {currentSum = new Tuple2<>(0L, 0L); }// update the countcurrentSum.f0 += 1;// add the second field of the input valuecurrentSum.f1 += input.f1;// update the statesum.update(currentSum);// if the count reaches 2, emit the average and clear the stateif (currentSum.f0 >= 2) {out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));sum.clear();}}}

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/state.html

Flink中基本的State类型介绍相关推荐

  1. MyBatis中resultType返回值类型介绍

    一.返回一般数据类型 比如要根据 id 属性获得数据库中的某个字段值. mapper 接口: // 根据 id 获得数据库中的 username 字段的值 String getEmpNameById( ...

  2. ASP.NET MVC中四种过滤器类型介绍

    简介 mvc的过滤器类型有四种,分别如下: 过滤器类型 接口 默认实现 描述 Action IActionFilter ActionFilterAttribute 在动作方法之前及之后运行 Resul ...

  3. Flink中Trigger的介绍及使用

    Flink中Trigger的介绍及使用 Flink中的Trigger用来确认一个窗口是否应该出发结果的计算,每个windowAssigner都有一个默认的Trigger,先来看看Trigger的定义及 ...

  4. 《从0到1学习Flink》—— 介绍Flink中的Stream Windows

    前言 目前有许多数据分析的场景从批处理到流处理的演变, 虽然可以将批处理作为流处理的特殊情况来处理,但是分析无穷集的流数据通常需要思维方式的转变并且具有其自己的术语(例如,"windowin ...

  5. ROS系列——mavros功能包中常用话题和服务介绍,包括消息名称、类型、头文件、成员变量、示例代码

    ROS系列--mavros功能包中常用话题和服务介绍,包括消息名称.类型.头文件.成员变量.示例代码 官方链接 常用话题 订阅 1.1 系统状态 1.2 GPS数据 1.3 本地位置 1.4 三轴速度 ...

  6. php中常见的错误类型有,JavaScript中常见的错误类型有哪些?(详细介绍)

    在JavaScript中,当发生错误时会生成描述错误类型的错误对象,此错误对象包含错误类型和编号等信息,这些信息可用于后续处理等,在本篇文章中将给大家介绍常见的错误类型以及如何处理这些错误. Java ...

  7. 【Flink】介绍Flink中状态一致性的保证

    1.概述 转载:介绍Flink中状态一致性的保证 再次温习了这篇文章有了不一样的收货.侵权可删,这里是方便自己找到. 1. 一致性 1.1 介绍状态一致性 有状态的流处理,内部每个算子任务都可以有自己 ...

  8. Spring 中的Advice类型介绍

    Spring 中的 Advice 类型介绍 翻译原文链接 Introduction to Advice Types in Spring 1. 概述 在本文中,我们将讨论可以在 Spring 中创建的不 ...

  9. 14.State-理解原理即可、Flink中状态的自动管理、无状态计算和有状态计算、状态分类、Managed State Raw State\Keyed StateOperator State

    14.State-理解原理即可 14.1.Flink中状态的自动管理 14.2.无状态计算和有状态计算 14.2.2.有状态计算,需要考虑历史值,如:sum 14.2.3.状态分类 14.2.4.Ma ...

最新文章

  1. How can I create an Asynchronous function in Javascript?
  2. python将空格变成换行_python之路(2)
  3. 恢复应用_ofo年底裁员超50%,苹果应用市场恢复畅销榜,支付宝15周年微电影发布,MIUI来电留言功能即将下线,这就是今天的其他大新闻!...
  4. 特岗计算机老师年度总结,特岗教师个人年度工作总结
  5. Java并发编程系列之CountDownLatch用法及详解
  6. 小师妹学JVM之:JIT中的PrintCompilation
  7. 使用Maven进行硒测试自动化
  8. 简单地理解 Python 的装饰器
  9. Win7删除不常用的自带应用程序
  10. 视频教程-教你编写人工智能程序教程(自学必看)-Python
  11. 【入坑树莓派】烧录系统都烧录了三次(树莓派默认账户密码错误/已删除)
  12. RS232(串口线)转RJ45(网线)
  13. 天翎开源,是一种技术,更是一种信仰
  14. 3小时入门微信小程序开发 --公开课学习笔记
  15. 互联网推荐系统比较研究
  16. 简单的中文姓名生成器
  17. h5 修改title 微信_h5制作小程序 邀请函模板免费
  18. 游戏原画之组合画法-张聪-专题视频课程
  19. xp安全模式下如何修复计算机,电脑怎么进入安全模式,教您xp系统进入安全模式的方法...
  20. 图书管理系统 jsp + servlet + mysql (2023)

热门文章

  1. 加速推进市域社会治理现代化在社会治理体系中有何作用
  2. 算法工程师是不是一个「越老越吃香」的岗位?
  3. C语言选择结构(if语句)
  4. 大数据入门-五分钟读懂HDFS
  5. 华测服务器进不去系统,华测网络数据查询系统
  6. APIO20152014题解
  7. php如何获取当前日期时间函数,php如何使用date()函数获取当前时间
  8. hive分区表删除部分数据
  9. 使用hive删除分区语法
  10. 领酌酒业告诉你:喝酱香酒的八大好处