Flink 调优:Checkpoint 问题排查

1. Flink Checkpoint 流程

在使用 Flink 时, 我们基本都会用到 Checkpoint,也难免不会遇到 Checkpoint 慢或者失败等问题,如果想要排查这些问题,那么必须先知道 Checkpoint 的生产流程。一个 Task 的 Checkpoint 流程包括以下几个步骤:

  1. JobManager 向 Source 算子发送 Barrier ,初始化 Checkpoint;

  2. Source 算子收到 Barrier 之后,Checkpoint 自己的 State,并向下游发送 Barrier;

  3. 下游收到 Barrier 后,进行 Barrier Alignment 处理;

  4. Task 开始同步阶段的 Snapshot;

  5. Task 开始异步阶段的 Snapshot;

  6. Task 做完 Checkpoint 之后,再上报 JobManager。

    2. Checkpoint 监控

    通过 Flink UI,我们可以看到 Flink Job 的运行状态、运行日志、Checkpoint 和反压等情况。现在我们就认识下 Flink UI 中与 Checkpoint 相关的部分,图 1 是 Flink 1.3 版本的 UI


图1


图2

其中左侧侧边栏的:

  1. Task Manager:可以查看各个 Task 的配置、日志、资源等相关信息;
  2. Job Manager:可以查看 JobManager 的配置、资源、日志等相关信息。

图 1 右侧下方与 Checkpoint 相关的主要是 Checkpoints,在排查 Checkpoint 的相关问题时,也可能会用到 SubtasksBack Pressures(图 2)。

  • Subtasks:可以查看 JobGraph 的各个节点 Subtask 的吞吐量等情况,能够据此判断数据倾斜情况;

  • Back Pressures

    :用于观察 JobGraph 各节点算子的反压情况,其中:

    • OK: 0 <= Ratio <= 0.10
    • LOW: 0.10 < Ratio <= 0.5
    • HIGH: 0.5 < Ratio <= 1
  • Checkpoints:与 Checkpoint 相关的信息基本都在这里了。

    • Overview:相当于是 Checkpoint Dashboard;

    • History:各个 Checkpoint 的执行信息;

    • Summary:整个 Job 所有 Checkpoint 的 End to End Duration、State Size 和 Buffered During Alignment 的最大值、最小值和均值;

    • Configuration:Checkpoint 的配置信息,不多说了,一眼可以看到

      Summary 和 Configuration 比较简单就不多做赘述了,而 Overview 则如图 1 所示,比较简洁明了,需要说明的是 ID 都是 Checkpoint ID——可用于在 Task Manager 日志和 Job Manager 日志中查找对应的信息,More details 是该 Checkpoint 的生产明细信息。而 History 如图 2 所示:

3. Checkpoint 失败

对于追查 Checkpoint 失败的具体原因,从日志角度来说大体有这几个步骤:

  1. 在 Flink UI 的 Checkpoints 中,找到失败的 Checkpoint 的 ID;
  2. 用 Checkpoint ID 去 Job Manager 日志中,定位该 Checkpoint 失败发生的 Execution 以及 Task Manager;
  3. 在 Task Manager 查找该 Checkpoint 失败的具体原因。

通常情况下,Checkpoint 大概失败有两种情况:

  • Checkpoint Decline
  • Checkpoint Expire

3.1 Checkpoint Decline

按照前文的思路,假设我们已经在 Flink UI 中找到了失败的 Checkpoint ID 是 16883。然后,我们就去 Job Manager 日志中定位 ID 16883:

Decline checkpoint 16883 by task ab66f08bf898b7d25b4fe69bc74ce2e1 of job 7af7749825e6bef10cbd909f2746acfc

其中,ab66f08bf898b7d25b4fe69bc74ce2e1 是 Execution ID,7af7749825e6bef10cbd909f2746acfc 是当前 Job I。

然后,用 Execution ID ab66f08bf898b7d25b4fe69bc74ce2e1 在 Job Manager 日志中定位发生在哪个 Task Manager。

******** (18/36) (ab66f08bf898b7d25b4fe69bc74ce2e1) switched from SCHEDULED to DEPLOYING.
Deploying ******** (18/36) (attempt #0) to slot container_e12_1590211490022_8088_03_102035_2 on HOSTNAME

我们发现该 Execution 被调度到了 HOSTNAME 的 container_e12_1590211490022_8088_03_102035_2 Slot 上,然后再去对应的 Task Manager 中查找失败的具体原因即可。

社区还给出了另一种 Checkpoint Decline 发生在 Checkpoint Cancel 的时候:如果 Flink 在较小的 Checkpoint ID 还没有对齐的时候,收到了更大的 Checkpoint ID,则会把较小的 Checkpoint ID 给取消掉。会有如下日志

Received checkpoint barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint

这个日志表示,当前 Checkpoint 19 还在对齐阶段,就收到了 Checkpoint 20 的 Barrier。然后,会逐级通知到下游的 Task Checkpoint 19 被取消了,同时也会通知 Job Manager 当前 Checkpoint 被 Decline 了。

3.2 Checkpoint Expire

如果 Checkpoint 生产时间大于超时时间,该 Checkpoint 就会以失败而告终。按照前文的方法,假设我们已经在 Flink UI 中找到了失败的 Checkpoint ID 是 16881,接着去 Job Manager 日志中定位 ID 16881。

Checkpoint 16881 of job 7af7749825e6bef10cbd909f2746acfc expired before completing.
......
Received late message for now expired checkpoint attempt 16881 from a9c6af93c028b7d25b4fe693e4aaf09f of job 7af7749825e6bef10cbd909f2746acfc.

可以看到日志描述了,7af7749825e6bef10cbd909f2746acfc 这个 Job 的 Checkpoint 16881 在生产完成之前因超时而过期了,并且发生在 a9c6af93c028b7d25b4fe693e4aaf09f 这个 Execution 上。然后,按照上一节的步骤继续排查,就可以在 Task Manager 的日志中定位的失败的具体原因。

4. Checkpoint 慢

Checkpoint 有很多配置项,如果配置不当,会导致 Checkpoint 慢或者 Checkpoint 失败,最终都会影响应用程序的性能及稳定性。下面我们按照 Checkpoint 的流程逐一讨论。

4.1 Source Trigger Checkpoint 慢

这种情况很少见,笔者暂时还未遇到过。从资料中可以发现,因为 Source 做 Snapshot 并往下游发送 Barrier 的时候,需要抢锁(目前社区已经用 MailBox 替代当前抢锁的方式)。如果一直抢不到锁的话,则可能导致 Checkpoint 一直得不到机会进行。如果在 Source 所在的 Task Manager 日志中找不到开始做 Checkpoint 的日志,则可以考虑是否属于这种情况,可以通过 jstack 进行进一步确认锁的持有情况。

4.2 Barrier Alignment 慢

Checkpoint 时,会在 Barrier Alignment 之后进入 Snapshot 的同步阶段和异步阶段。如果应用程序 Checkpoint 时有 Barrier Alignment,而上游某些 Barrier 还未抵达,那么就无法开始产生 Snapshot,Checkpoint 也就不能继续。如果 Checkpoint 使用的是 AT_LEAST_ONCE 模式,就需要进行 Barrier Alignment。

4.3 同步阶段慢

一般情况下,同步阶段不会太慢。但是,如果我们通过 Flink UI 或日志发现同步阶段比较慢的话,对于 FsStateBackend 可以考虑查看是否开启了异步 Snapshot,如果开启了异步 Snapshot 还是慢,需要看整个 JVM 的使用情况。对于 RocksDBBackend 来说,需要用 iostate 查看磁盘的使用,另外可以查看 Task Manager 日志中关于 RocksDBBackend 的信息。

4.4 异步阶段慢

在异步阶段,Task Manager 主要将 State 持久化到存储上。对于 FsStateBackend,主要瓶颈来自于网络传输,这个阶段可以观察网络相关的 Metric,或者对应机器上能够观察到网络流量的情况。对于 RocksDBBackend,则需要从本地读取文件,写入到远程的持久化存储上,所以不仅需要考虑网络的瓶颈,还需要考虑本地磁盘的性能。如果觉得网络流量不是瓶颈,但是上传比较慢的话,还可以尝试开启多线程上传功能。

4.5 主线程没机会做 Snapshot

在 Task Manager 中,数据处理和 Barrier 处理都由主线程处理。如果主线程在处理速度太慢,就会导致 Barrier 处理慢,从而影响 Checkpoint 进度。这时候就需要使用 jstack、JProfier 或者 async-profiler 等工具分析应用层的堆栈、CPU使用情况。

4.6 Checkpoint 时间配置不当

Checkpoint 与时间相关的配置有:

  • 时间间隔:env.enableCheckpointing()
  • 超时时间:CheckpointConfig.setCheckpointTimeout()
  • 停顿时间:CheckpointConfig.setMinPauseBetweenCheckpoints()

我们配置的目标是避免由于 Checkpoint 时间间隔过长,导致生成的 State 过大,从而使网络传输过慢;避免 Checkpoint 超时时间小于生产时间。下图描述了 Checkpoint 的时间关系。

4.7 使用增量 Checkpoint

Flink Checkpoint 有两种模式,全量 Checkpoint 和增量 Checkpoint,其中全量 Checkpoint 会把当前的 State 全部备份一次到持久化存储;而增量 Checkpoint 则只备份上一次 Checkpoint 中不存在的 State,因此增量 Checkpoint 在速度上会有更大的优势。但是,目前的 Flink 中仅 RocksDBStateBackend 支持增量 Checkpoint,如果你已经使用 RocksDBStateBackend,建议通过开启增量 Checkpoint 来加速。

4.8 反压

Flink 应用程序的反压情况可在 Flink UI 中看到

应用程序中 Subtask 被标记为 HIGH,表示此时反压很严重,同时会导致下游接受很晚才能接到 Barrier,进而拖慢 Checkpoint 进度,会引起checkpoint失败。

4.9 数据倾斜

无论是离线计算(包括 SQL、Code),还是实时计算(包括 SQL、Code),数据倾斜(Data Skew)都是不得不面对的一个问题。而且实时计算的数据倾斜解决方案,要比离线计算的解决方案要复杂一些。虽然发生数据倾斜有很多种情况,不过,其解决方案的思想确实大同小异

对于 Flink 应用程序,我们可以在 Flink UI 中看观察是否有 Subtask 发生了数据倾斜。通过 Subtask 的 Records Received、Bytes Received 这类 TPS 指标,就可以知道哪些 Subtask 处理的数据量较大,即哪些 Subtask 发生了数据倾斜。

5. 总结

Checkpoint 是 Flink 比较核心的特性,也是经常用到的特性,所以难免会遇到 Checkpoint 慢或者失败等问题,本文介绍了如何查看 Checkpoint 的各项指标,怎样定位 Checkpoint 失败的根因,以及导致 Checkpoint 慢的因素。

Flink 调优:Checkpoint 问题排查相关推荐

  1. Flink调优(一)资源调优、背压问题的分析

    1.资源调优 Flink性能调优的第一步,就是为任务分配合适的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略. (1)内存 ...

  2. jvm参数调优_3_问题排查

    相关文章: http://www.importnew.com/21441.html(Java系列笔记(4) - JVM监控与调优 - Daniel·广 - 博客园) https://lanjingli ...

  3. 【Flink】Flink调优指南

    1.美图 2. Yarn.Kafka相关配置参数 2.1 jobmanger配置 jobmanger.rpc.address jm的地址. jobmanager.rpc.port jm的端口号. jo ...

  4. 中间件业务在网易轻舟容器平台的性能调优实践

    随着业务容器化的推进,经常有客户抱怨应用 QPS 无法和在物理机或者云主机上媲美,并且时常会出现 DNS 查询超时.短连接 TIME_OUT.网络丢包等问题,而在容器中进行调优与诊断的效果因为安装工具 ...

  5. Mysql高级调优篇——前言简介

    本篇开始就进入Mysql高级篇,当然我讲解的身份是Java开发工程师,并非专业的DBA,所以我们以写出高效,好用,Sql优化和开发相关的数据库方面的知识落地为目的,帮助开发解决一些sql上的问题,为迈 ...

  6. [译]GC专家系列3-GC调优

    原文链接:http://www.cubrid.org/blog/dev-platform/how-to-tune-java-garbage-collection/ 本篇是GC专家系列的第三篇.在第一篇 ...

  7. JVM性能调优(4)——性能调优工具

    目录 一.JDK工具1.JDK工具2.利用 jps 找出进程3.利用 jstat 查看VM统计信息4.利用 jmap 查看对象分布情况5.利用 jstack 分析线程栈 二.Linux 命令行工具1. ...

  8. java 垃圾回收GC(CMS、G1)原理及调优

    概述 本文介绍GC基础原理和理论,GC调优方法思路和方法,基于Hotspot jdk1.8,学习之后将了解如何对生产系统出现的GC问题进行排查解决 阅读时长约30分钟,内容主要如下: GC基础原理,涉 ...

  9. 一文搞定MySQL性能调优

    公众号回复关键词获取免费学习资料,加入前后端技术交流群和副业群.新建立的副业Q群:735764906. 数据库的操作越来越成为整个应用的性能瓶颈,这对于Web应用尤其明显.关于数据库的性能,这并不只是 ...

最新文章

  1. Django框架视图类
  2. 1.10 instanceof关键字
  3. 快速正确的修改变量的命名和如何正确规范的注释
  4. 《HiWind企业快速开发框架实战》(2)使用HiWind创建自己的项目
  5. 什么类型网站不利于seo优化
  6. 深度学习模型显示工具netron
  7. python练习项目八——下载所有XKCD 漫画
  8. C语言如何定义p1口,求助C51里如何实现P1口输入?置1了还没行哦。
  9. modbus tcp主站和从站_组态王与西门子 PLC无线Modbus通讯
  10. 从git上克隆的vue项目在本地运行步骤
  11. 笔记本软件页面分辨率低_笔记本分辨率降低怎么办_笔记本电脑电脑分辨率低怎么解决-win7之家...
  12. 【cmd】CMD中Pushd和Popd命令的用法
  13. 微软人工智能-数据分析平台.md
  14. 2021-11-12 轨迹规划了解
  15. 现代控制工程-状态空间(正在更新)
  16. python下载钉钉api_DingTalk SDK for Python
  17. 《学做智能车——卓晴》学习笔记(1)——智能汽车智能控制器方案设计
  18. 家长如何挑选学生护眼台灯?2023选这样的台灯更护眼
  19. 科学院量子计算机,中科院研究员丁洪:量子计算机前景可期,算力翻亿倍并非天方夜谭...
  20. 云上铺会员管理收银系统 v1.8.9.5

热门文章

  1. Java的教学辅助系统,信息管理系统课程辅助教学平台
  2. 轮廓系数--聚类分析
  3. 2.1.4奈氏准则、香农定理
  4. STM32F4xx固件库的说明及使用
  5. c语言位运算试题及解析,C语言面试题分类-位运算
  6. SpringBoot JPA 建立联合主键
  7. pin在c语言中的用法,pin的用法总结大全
  8. Vue开发的需要安装的软件
  9. vue的自动化测试详解
  10. jquery.html加换行符,在使用jQuery时添加元素间的换行符或空格.append()