配置类在:org.apache.flink.configuration.CheckpointingOptions

配置解析:

配置 类型 默认值 描述
state.backend String 检查点存储,用于在执行过程中存储操作符的本地状态
state.checkpoint-storage String 用于恢复检查点状态的检查点存储。
state.backend.changelog.enabled Boolean false 是否开启状态变更日志
state.checkpoints.num-retained Integer 1 保留的已完成检查点的最大数目
state.backend.async Boolean true 已弃用,所有状态快照都是异步的。
state.backend.incremental Boolean false 状态后端是否应该创建增量检查点,如果允许,对于增量检查点,存储的只是与前一个检查点不同的部分,而不是完整的检查点状态。
state.backend.local-recovery Boolean false 状态后端配置本地恢复,默认本地恢复处于去激活状态。
taskmanager.state.local.root-dirs String config参数定义根目录,用于存储基于文件的状态,用于本地恢复。
state.savepoints.dir String 保存点的默认目录。由状态后端使用,向文件系统写入保存点(HashMapStateBackend, EmbeddedRocksDBStateBackend)
state.checkpoints.dir String 用于在Flink支持的文件系统中存储数据文件和检查点元数据的默认目录。存储路径必须能够访问所有参与的进程/节点(即:所有TaskManagers和JobManagers)。
state.storage.fs.memory-threshold MemorySize 20kb 状态数据文件的最小大小。所有小于此值的状态块都内联存储在根检查点元数据文件中。
state.storage.fs.write-buffer-size Integer 4 * 1024 写入文件系统的检查点流的写入缓冲区的默认大小。

配置代码如下:

/** A collection of all configuration options that relate to checkpoints and savepoints. */
public class CheckpointingOptions {// ------------------------------------------------------------------------//  general checkpoint options// ------------------------------------------------------------------------/*** The checkpoint storage used to store operator state locally within the cluster during* execution.** <p>The implementation can be specified either via their shortcut name, or via the class name* of a {@code StateBackendFactory}. If a StateBackendFactory class name is specified, the* factory is instantiated (via its zero-argument constructor) and its {@code* StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called.** <p>Recognized shortcut names are 'hashmap' and 'rocksdb'.** @deprecated Use {@link StateBackendOptions#STATE_BACKEND}.*/@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS)@Documentation.ExcludeFromDocumentation("Hidden for deprecated")@Deprecatedpublic static final ConfigOption<String> STATE_BACKEND =ConfigOptions.key("state.backend").stringType().noDefaultValue().withDescription(Description.builder().text("The state backend to be used to store state.").linebreak().text("The implementation can be specified either via their shortcut "+ " name, or via the class name of a %s. "+ "If a factory is specified it is instantiated via its "+ "zero argument constructor and its %s "+ "method is called.",TextElement.code("StateBackendFactory"),TextElement.code("StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)")).linebreak().text("Recognized shortcut names are 'hashmap' and 'rocksdb'.").build());/*** The checkpoint storage used to checkpoint state for recovery.** <p>The implementation can be specified either via their shortcut name, or via the class name* of a {@code CheckpointStorageFactory}. If a CheckpointStorageFactory class name is specified,* the factory is instantiated (via its zero-argument constructor) and its {@code* CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called.** <p>Recognized shortcut names are 'jobmanager' and 'filesystem'.*/@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS, position = 2)public static final ConfigOption<String> CHECKPOINT_STORAGE =ConfigOptions.key("state.checkpoint-storage").stringType().noDefaultValue().withDescription(Description.builder().text("The checkpoint storage implementation to be used to checkpoint state.").linebreak().text("The implementation can be specified either via their shortcut "+ " name, or via the class name of a %s. "+ "If a factory is specified it is instantiated via its "+ "zero argument constructor and its %s "+ " method is called.",TextElement.code("CheckpointStorageFactory"),TextElement.code("CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)")).linebreak().text("Recognized shortcut names are 'jobmanager' and 'filesystem'.").build());/** Whether to enable state change log. */@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS)@Documentation.ExcludeFromDocumentation("Hidden for now")public static final ConfigOption<Boolean> ENABLE_STATE_CHANGE_LOG =ConfigOptions.key("state.backend.changelog.enabled").booleanType().defaultValue(false).withDescription("Whether to enable state backend to write state changes to StateChangelog.");/** The maximum number of completed checkpoints to retain. */@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)public static final ConfigOption<Integer> MAX_RETAINED_CHECKPOINTS =ConfigOptions.key("state.checkpoints.num-retained").defaultValue(1).withDescription("The maximum number of completed checkpoints to retain.");/** @deprecated Checkpoints are aways asynchronous. */@Deprecatedpublic static final ConfigOption<Boolean> ASYNC_SNAPSHOTS =ConfigOptions.key("state.backend.async").booleanType().defaultValue(true).withDescription("Deprecated option. All state snapshots are asynchronous.");/*** Option whether the state backend should create incremental checkpoints, if possible. For an* incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the* complete checkpoint state.** <p>Once enabled, the state size shown in web UI or fetched from rest API only represents the* delta checkpoint size instead of full checkpoint size.** <p>Some state backends may not support incremental checkpoints and ignore this option.*/@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)public static final ConfigOption<Boolean> INCREMENTAL_CHECKPOINTS =ConfigOptions.key("state.backend.incremental").defaultValue(false).withDescription("Option whether the state backend should create incremental checkpoints, if possible. For"+ " an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the"+ " complete checkpoint state. Once enabled, the state size shown in web UI or fetched from rest API"+ " only represents the delta checkpoint size instead of full checkpoint size."+ " Some state backends may not support incremental checkpoints and ignore this option.");/*** This option configures local recovery for this state backend. By default, local recovery is* deactivated.** <p>Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend* and HashMapStateBackend do not support local recovery and ignore this option.*/@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)public static final ConfigOption<Boolean> LOCAL_RECOVERY =ConfigOptions.key("state.backend.local-recovery").defaultValue(false).withDescription("This option configures local recovery for this state backend. By default, local recovery is "+ "deactivated. Local recovery currently only covers keyed state backends. Currently, the MemoryStateBackend "+ "does not support local recovery and ignores this option.");/*** The config parameter defining the root directories for storing file-based state for local* recovery.** <p>Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend* does not support local recovery and ignore this option.*/@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)public static final ConfigOption<String> LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS =ConfigOptions.key("taskmanager.state.local.root-dirs").noDefaultValue().withDescription("The config parameter defining the root directories for storing file-based state for local "+ "recovery. Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend does "+ "not support local recovery and ignore this option");// ------------------------------------------------------------------------//  Options specific to the file-system-based state backends// ------------------------------------------------------------------------/*** The default directory for savepoints. Used by the state backends that write savepoints to* file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend).*/@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS, position = 3)public static final ConfigOption<String> SAVEPOINT_DIRECTORY =ConfigOptions.key("state.savepoints.dir").noDefaultValue().withDeprecatedKeys("savepoints.state.backend.fs.dir").withDescription("The default directory for savepoints. Used by the state backends that write savepoints to"+ " file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend).");/*** The default directory used for storing the data files and meta data of checkpoints in a Flink* supported filesystem. The storage path must be accessible from all participating* processes/nodes(i.e. all TaskManagers and JobManagers).*/@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS, position = 2)public static final ConfigOption<String> CHECKPOINTS_DIRECTORY =ConfigOptions.key("state.checkpoints.dir").stringType().noDefaultValue().withDeprecatedKeys("state.backend.fs.checkpointdir").withDescription("The default directory used for storing the data files and meta data of checkpoints "+ "in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes"+ "(i.e. all TaskManagers and JobManagers).");/*** The minimum size of state data files. All state chunks smaller than that are stored inline in* the root checkpoint metadata file.*/@Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS)public static final ConfigOption<MemorySize> FS_SMALL_FILE_THRESHOLD =ConfigOptions.key("state.storage.fs.memory-threshold").memoryType().defaultValue(MemorySize.parse("20kb")).withDescription("The minimum size of state data files. All state chunks smaller than that are stored"+ " inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.").withDeprecatedKeys("state.backend.fs.memory-threshold");/*** The default size of the write buffer for the checkpoint streams that write to file systems.*/@Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS)public static final ConfigOption<Integer> FS_WRITE_BUFFER_SIZE =ConfigOptions.key("state.storage.fs.write-buffer-size").intType().defaultValue(4 * 1024).withDescription(String.format("The default size of the write buffer for the checkpoint streams that write to file systems. "+ "The actual write buffer size is determined to be the maximum of the value of this option and option '%s'.",FS_SMALL_FILE_THRESHOLD.key())).withDeprecatedKeys("state.backend.fs.write-buffer-size");
}

Flink Checkpoint所有配置解读相关推荐

  1. flink checkpoint 重启_Flink进阶教程:Checkpoint机制原理剖析与参数配置

    在Flink状态管理详解:Keyed State和Operator List State深度解析这篇文章中,我们介绍了Flink的状态都是基于本地的,而Flink又是一个部署在多节点的分布式引擎,分布 ...

  2. flink checkpoint 恢复_Apache Flink 管理大型状态之增量 Checkpoint 详解

    邱从贤(山智),Apache Flink Contributor,中南大学硕士,2018 年加入阿里巴巴计算平台事业部,专注于 Flink 核心引擎开发,主要从事 Flink  State&C ...

  3. flink checkpoint 恢复_Flink解析 | Apache Flink结合Kafka构建端到端的ExactlyOnce处理

    周凯波(宝牛) 阿里巴巴技术专家,四川大学硕士,2010年毕业后加入阿里搜索事业部,从事搜索离线平台的研发工作,参与将搜索后台数据处理架构从MapReduce到Flink的重构.目前在阿里计算平台事业 ...

  4. Flink Checkpoint超时问题

    Flink Checkpoint超时问题 文章目录 Flink Checkpoint超时问题 问题现象 问题分析 问题1:TaskManager进程挂掉 问题2:任务长时间处于CANCELING 问题 ...

  5. Flink 生产环境配置建议

    flink-conf.yaml相关 ------------------------------------------------------------------------- checkpoi ...

  6. flink生产环境参数配置

    1.flink生产环境配置 2.flink 可配置参数 2.1常用选项 键 默认 描述 jobmanager.heap.size 1024MB JobManager的JVM堆大小. taskmanag ...

  7. Flink checkpoint失败

    目录 前言 问题描述 问题定位 checkpoint的基本原理 思路 现象 问题解决 前言 Flink容错机制的核心部分是绘制分布式数据流和操作员状态的一致快照.这些快照充当一致的检查点,如果发生故障 ...

  8. Flink CheckPoint机制 学习 测试 使用FsStateBackend状态后端 将checkpoint恢复到中断处

    Flink CheckPoint机制 1.实验目的 目的 开启一个Flink程序,使用hdfs做状态后端,手动取消job后,再次恢复job测试,观察程序是否能恢复到检查点,继续读取并处理数据: 实验原 ...

  9. Flink Checkpoint 详解

    Flink Checkpoint 详解 一.checkpoint简介 二.checkpoint原理 三.精确一次 四.状态后端 五.配置推荐 一.checkpoint简介 Checkpoint是Fli ...

最新文章

  1. ITK:将图像粘贴到另一个
  2. SDUT 3379 数据结构实验之查找七:线性之哈希表
  3. Condition总结-await和signal的总结
  4. ruby elixir_如何使用Elixir和Phoenix快速入门构建CRUD REST API
  5. cocos2dx中使用iconv转码(win32,iOS,Android)
  6. CCF201812-3 CIDR合并(100分)【位运算+文本】
  7. eclipse汉化教程(官方汉化包,傻瓜式操作,附带中英文快捷切换方式以及常见问题解决方案)
  8. Java并发练习:无锁编程
  9. 施一公:如何写好一篇学术论文?
  10. 阿里云数据工厂DataWorks
  11. zkPorter:Layer-2 的可组合可扩展性
  12. 【玩转嵌入式屏幕显示】(三)TFT-LCD屏幕打点 + 画线 + 画矩形 + 画圆Bresenham算法实现(基于打点函数,算法可移植到任何屏幕的驱动程序之上)
  13. Android:展锐battery
  14. 【达内课程】异常Exception(上)
  15. EBGP使用环回口建邻居用到ebgp-multihop和update source loopback
  16. linux搭建keepalived+tomcat+nginx 双主机热备排坑
  17. 二叉树:广义表搭建二叉树
  18. corei7 64 poky linux,Solved: arm-poky-linux - NXP Community
  19. uniapp微信H5公众号授权与支付
  20. volatile关键字对编译器优化的影响

热门文章

  1. 卸载安装Webpack
  2. android onSaveInstance
  3. sql 增删改 合表操作
  4. 中国半导体晶圆研磨设备市场趋势报告、技术动态创新及市场预测
  5. https详解之 根证书、服务器证书、用户证书的区别 jg证书
  6. 火到不行的零代码都在哪些行业应用?
  7. 愁边动寒角,夜久意难平
  8. 马蜂窝被“捅”背后:互联网江湖的原罪和暗战
  9. 什么是Docker Machine?
  10. Quartz 2D绘图简介