1 Flink中的状态

  当数据流中的许多操作只查看一个每次事件(如事件解析器),一些操作会跨多个事件的信息(如窗口操作)。这些操作称为有状态。状态由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态。可以简单的任务状态就是一个本地变量,可以被任务的业务逻辑访问。

  有些算子有些任务是没有状态的,如map操作,只跟输入数据有关。像窗口操作不管是增量窗口函数还是全窗口函数都要保持里面的信息的,一开始在窗口到达结束时间之前是不输出数据的,所以最后输出数据的时候,他的计算是要依赖之前的,全窗口可以认为是把所有数据都作为状态保存下来。增量聚合窗口来一个聚合一次要保存的是中间聚合状态。像ProcessFunction可以有状态也可以没有状态。

  无状态流处理和有状态流处理的主要区别:无状态流处理分别接收每条输入数据,根据最新输入的数据生成输出数据;有状态流处理会维护状态,根据每条输入记录进行更新,并基于最新输入的记录和当前的状态值生成输出记录,即综合考虑多个事件之后的结果。

需要状态操作的一些例子如下:

  • 应用程序搜索某些事件模式时,状态将存储迄今遇到的事件序列。
  • 每分钟/小时/天聚合事件时,将状态保存挂起的聚合。
  • 在数据流上训练机器学习模型时,状态保存模型参数的当前版本。
  • 需要管理历史数据时,状态允许有效访问过去发生的事件。

2 状态类型

  每个状态都是当前任务去管理维护,每个状态都是和当前算子关联在一起的,如果需要Flink真正的把他管理起来的话在运行时的时候Flink就必须要知道当前状态定义的类型是什么,所以一开始必须注册对应的状态,要有所谓的描述器。Flink有两种基本的状态:Operator State算子状态和Keyed State键控状态,他们的主要区别就是作用范围不一样,算子状态的作用范围就是限定为算子任务(也就是当前一个分区执行的时候,所有数据来了都能访问到状态)。键控状态中并不是当前分区所有的数据都能访问所有的状态,而是按照keyby之后的key做划分,当前key只能访问自己的状态

2.1 Operator State

  每个算子状态绑定到一个并行算子实例,作用范围限定为算子任务,同一并行任务的状态是共享的,并行处理的所有数据都可以访问到相同的状态。Kafka Connector就是使用算子状态的很好的一个例子,Kafka consumer的每个并行实例都维护一个主题分区和偏移,作为算子状态。当并行性发生变化时,算子状态接口支持在并行运算符实例之间重新分配状态。可以有不同的方案来进行这种再分配。

  因为同一个并行任务处理的所有数据都可以访问到当前的状态,所以就相当于本地变量

  算子状态有3种基本数据结构:①列表状态(List state):状态表示为一组数据的列表②联合列表状态(Union list state):也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。③广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。那就可以访问到别的并行子任务的状态。

  算子状态运用的时候可能应用场景没那么多,一般都是keyby之后根据不同的key做分区讨论。如果所有数据来了全部统一处理的话一般还要划分成不同的状态要保存为链表,并行度调整的时候可以根据这个列表拆开,做进一步调整。

  联合列表状态与列表状态的区别:主要是并行度调整状态怎样重新分配,列表状态本身分配的时候直接分配;联合列表状态的话就是把所有元素都联合起来,然后由每个任务自己定义最后留下哪些,也就是自己截取要哪一部分。

2.2 Keyed State

  Keyed State只能在KeyedStream后使用,键控状态总是相对于键,根据键来维护和访问的

  Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。键控状态基于每个key去管理,一般keyby进行HashCode重分区后基于它自己独享的内存空间就会针对每一个不同的key分别保存一份独立的存储状态,而且接下来来了一个新的数据只能访问自己的状态,不能访问其他key的,Flink会为每一个key维护一个状态。

  Flink的Keyed State支持的数据类型如下:

序号 类型 说明 方法
1 ValueState[T] 用来保存单个的值 ValueState.update(value: T)
ValueState.value()
2 ListState[T] 保存一个列表 ListState.add(value: T)
ListState.addAll(values: java.util.List[T])
ListState.update(values: java.util.List[T])
ListState.get()(注意:返回的是Iterable[T])
3 MapState[K, V] 保存Key-Value对 MapState.get(key: K)
MapState.put(key: K, value: V)
MapState.contains(key: K)
MapState.remove(key: K)
4 ReducingState[T] 保留一个值,该值表示添加到状态的所有值的汇总,需要用户提供ReduceFunction ReducingState.add(value: T)
ReducingState.get()
5 AggregatingState[I, O] 保留一个值,该值表示添加到状态的所有值的汇总,需要用户提供AggregateFunction AggregatingState.add(value: T)
AggregatingState.get()
6 FoldingState<T, ACC> 保留一个值,该值表示添加到状态的所有值的汇总,需要用户提供FoldFunction AggregatingState.add(value: T)
AggregatingState.get()

  每个状态都有clear()是清空操作。

  在进行状态编程时需要通过RuntimeContext注册StateDescriptor。StateDescriptor以状态state的名字和存储的数据类型为参数。案例如下:

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {private var sum: ValueState[(Long, Long)] = _override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {// access the state valueval tmpCurrentSum = sum.value// If it hasn't been used before, it will be nullval currentSum = if (tmpCurrentSum != null) {tmpCurrentSum} else {(0L, 0L)}// update the countval newSum = (currentSum._1 + 1, currentSum._2 + input._2)// update the statesum.update(newSum)// if the count reaches 2, emit the average and clear the stateif (newSum._1 >= 2) {out.collect((input._1, newSum._2 / newSum._1))sum.clear()}}override def open(parameters: Configuration): Unit = {sum = getRuntimeContext.getState(new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)]))}
}object ExampleCountWindowAverage extends App {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.fromCollection(List((1L, 3L),(1L, 5L),(1L, 7L),(1L, 4L),(1L, 2L))).keyBy(_._1).flatMap(new CountWindowAverage()).print()// the printed output will be (1,4) and (1,5)env.execute("ExampleManagedState")
}

声明状态操作为:

    sum = getRuntimeContext.getState(new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)]))

读取状态为:

    val tmpCurrentSum = sum.value

  更新状态为:

    sum.update(newSum)

3 状态后端

  Flink提供不同的State Backends状态后端,指定如何和在何处存储状态。

  (1)MemoryStateBackend

  状将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上,将checkpoint存储在JobManager的内存中

  (2)FsStateBackend

  本地状态存在TaskManager的JVM堆上,checkpoint存到远程的持久化文件系统(FileSystem)上

  (3)RocksDBStateBackend

  将所有状态序列化后,存入本地的RocksDB中存储。

  设置状态后端如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment()
//val checkpointPath: String = checkpoint_Path
//val backend = new RocksDBStateBackend(checkpointPath)
//env.setStateBackend(backend)env.setStateBackend(new FsStateBackend(YOUR_PATH))
env.enableCheckpointing(1000)
// 配置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(60, Time.of(10, TimeUnit.SECONDS)))

Flink中的状态管理相关推荐

  1. 详解Flink中的状态管理

    流式计算分为无状态和有状态两种情况.无状态的计算观察每个独立事件,并根据最后一个事件输出结果.例如:流处理应用程序从传感器接收温度读数,并在温度超过90度时发出警告.有状态的计算则会基于多个事件输出结 ...

  2. react中数据状态管理的四种方案

    我们为什么需要状态管理? (1) 一个是为了解决相邻组件的通信问题. 虽然可以通过「状态提升」解决,但有两个问题: 每次子组件更新,都会触发负责下发状态的父组件的整体更新(使用 Context 也有这 ...

  3. Flink中的状态与容错

    1.概述 Flink支持有状态计算,根据支持得不同状态类型,分别有Keyed State和Operator State.针对状态数据得持久化,Flink提供了Checkpoint机制处理:针对状态数据 ...

  4. Flink实操 : 状态管理

    . 一 .概念 1.1. 什么是有状态的计算? 1.2. 传统的流计算系统缺少对于程序状态的有效支持 1.3. Flink丰富的状态访问和高效的容错机制 二 .Keyed State 2.1.保存st ...

  5. 【分析总结】ASP.NET中的状态管理原理

    HTTP协议是介于请求.响应的断开时网络协议,与连接式的网络协议不同,例如,与我们熟悉的TCP协议相比,客户端与服务器并没有持续的连接存在,在每一次会话之后,连接都会被断开,在下一次请求的时候客户端会 ...

  6. as点击发送广播_Apache Flink 中广播状态的实用指南

    翻译 | 王柯凝 校对 | 邱从贤(山智) 自版本 Flink 1.5.0 以来,Apache Flink 提供了一种新的状态类型,称为广播状态(Broadcast State).在本文中,将解释什么 ...

  7. React中的状态管理---Mobx

    Mobx的介绍 Mobx是一个功能强大,上手非常容易的状态管理工具.redux的作者也曾经向大家推荐过它,在不少情况下可以使用Mobx来替代掉redux. Mobx流程图 Mobx使用流程 创建项目 ...

  8. vue2中vuex状态管理的理解(菜单面包板)

    本片理解基于vue2对应的Vuex文档,结合了官网文档以及众多前辈大佬所发布的帖子,由衷表示感谢. vuex的超详细讲解和具体使用细节记录 随着我们进一步扩展约定,即组件不允许直接变更属于 store ...

  9. Flutter实践:深入探索 flutter 中的状态管理方式(1)

    利用 Flutter 内置的许多控件我们可以打造出一款不仅漂亮而且完美跨平台的 App 外壳,我利用其特性完成了类似知乎App的UI界面,然而一款完整的应用程序显然不止有外壳这么简单.填充在外壳里面的 ...

最新文章

  1. 计算机学硕哪些学校好考,什么学校研究生好考,计算机专业研究生哪个学校好考一点...
  2. Windows使用免费版Kiwisyslog搭建日志服务器
  3. Chrome Elements 标签页 和 View Source 的显示为什么有差异
  4. python读取raw图片文件_在python下读取并展示raw格式的图片实例
  5. Fedora 8安装非官方compiz-fusion
  6. oracle ogg 删除,OGG导致归档无法RMAN删除一例
  7. 湘苗培优|从入门到精通
  8. java中if的嵌套循环_嵌套的多个“Next变量”if then循环
  9. 【2020】【论文笔记】相变材料与超表面——
  10. 软考中级-软件设计师-查缺补漏
  11. IP营销要从内部打破小众圈层的壁垒,“内容+社交”必不可少
  12. 办公专用计算机配置,办公电脑用什么配置的好 2017办公电脑配置推荐
  13. 网线线序和插座插头配线规则和光纤接口分类
  14. 单像空间后方交会的程序实现
  15. 抽水马桶工作原理演示
  16. 电机振动噪声(NVH)气隙磁场推导
  17. 第2章 人机交互的相关学科
  18. 使用PM进行硬盘分区
  19. 在ABAQUS中使用多孔介质模型
  20. uber奖励和账单详解

热门文章

  1. 记录上一个项目踩过的坑
  2. 统计iOS项目的总代码行数的方法
  3. 判断字符串是否为空--string.Empty、string=、s.length==0
  4. jquery选中以什么开头的元素
  5. 再议 语法高亮插件的选择
  6. PHP中file() 函数和file_get_contents() 函数的区别
  7. %求余数 rand随机数
  8. js的oop方式和this指针问题
  9. Python之OS模块进程管理介绍--os.fork()
  10. Kconfig中的“depends on”和“select”