由一次kafk数据堆积说起

因为公司的项目是由一个第三方的旧系统迁移过渡开发过来的,而且时间很急,所以有许多数据需要修正。为了不影响线上的业务,修复数据的逻辑是在另一个应用的,可以通过管理系统圈定数据范围,然后发送到kafka。
但是某一天kafka上的数据一直在堆积,高峰期一直下不去。查询了很多日志,发现很多消费者的消费速度异常地慢,而且存在重复消费的情况(业务上是允许重复消费)。最后发现kafka不断地在重平衡,导致数据一直不能尽快被消费。

那么,是什么导致了系统频繁重平衡呢?

重平衡的作用

要想知道什么是重平衡rebalance,那就要先了解消费组consumer group

什么是消费组

多个消费者consumer组成一个消费组,它们共同消费一个topic,一个topic的一个parition只能被一个consumer消费。

kafka为消费组定义了5种状态,他们分别是:Empty,Dead, PreparingRebalance,CompletingRebalance,Stable.
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QZuZ09lF-1648635120578)(https://note.youdao.com/yws/res/22077/WEBRESOURCEbf97371f8e6740ae16c614478f5ccb52)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CkgDQMoR-1648635120580)(https://note.youdao.com/yws/res/22092/WEBRESOURCEad062645525be744596521507474549a)]

生产者、kafka broker、消费者
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ikHw5DXs-1648635120581)(https://note.youdao.com/yws/res/22095/WEBRESOURCE63d131b701c2004ff89546fe98ca5503)]

rebalance 其实就是对 partition 进行重新分配

rebalance的过程

rebalance

重平衡时,消费者端会发出JoinGroup请求加入组,发送SyncGroup请求同步领导消费组(Leader Consumer)分配的方案。

JoinGrop请求

当组内成员加入组时,会向将自己订阅的主题上报。协作者收集完组内的JoinGrop后,会选择其中一个作为该消费者组的领导者。通常情况下,第一个发送 JoinGroup 请求的成员自动成为领导者。消费组领导者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案,然后开始发送`SyncGroup请求。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OT9wCAxC-1648635120581)(https://note.youdao.com/yws/res/22113/WEBRESOURCE13f5ab0821280f083cc42e48e269b482)]

SyncGroup请求

消费组领导者和其他组员发送SyncGroup请求同步分组消费信息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IJUAtOYU-1648635120582)(https://note.youdao.com/yws/res/22115/WEBRESOURCEac31cee9b906f04b176d0ba67cf1875f)]

reblance的时机

三种情况会触发rebalance:

  • 订阅 Topic 的分区数发生变化。
  • 订阅的 Topic 个数发生变化。
  • 消费组内成员个数发生变化。例如有新的 consumer 实例加入该消费组或者离开组

相比起之前的两个情况,这种情况在实际情况中更加常见。因为订阅分区数、以及订阅 topic 数都是我们主动改变才会发生,而组内消费组成员个数发生变化,则是更加随机的。

「消费组内成员个数发生变化」的几种情况:

  • 新成员加入
  • 组成员主动离开
  • 组成员崩溃

新成员加入

新成员入组是指组处于 Stable状态后,有新成员加入。如果是全新启动一个消费者组,Kafka 是有一些自己的小优化的,流程上会有些许的不同。我们这里讨论的是,组稳定了之后有新成员加入的情形。

当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。具体的过程和之前的客户端重平衡流程是一样的

组成员主动离开

何谓主动离组?就是指消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。这个场景就涉及到了第三类请求:LeaveGroup 请求。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员。

组成员崩溃

崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数session.timeout.ms控制的。

re-blance问题处理

对于「新成员加入」、「组成员主动离开」都是我们主动触发的,能比较好地控制。但是「组成员崩溃」则是我们预料不到的,遇到问题的时候也比较不好排查。

re-balance问题与kafka消费组配置的四个参数有关:

session.timeout.ms 设置了超时时间

heartbeat.interval.ms 心跳时间间隔

max.poll.interval.ms 每次消费的处理时间

max.poll.records 每次消费的消息数

session.timeout.ms 表示 consumer 向 broker 发送心跳的超时时间。例如 session.timeout.ms = 180000 表示在最长 180 秒内 broker 没收到 consumer 的心跳,那么 broker 就认为该 consumer 死亡了,会启动 rebalance。

heartbeat.interval.ms 表示 consumer 每次向 broker 发送心跳的时间间隔。heartbeat.interval.ms = 60000 表示 consumer 每 60 秒向 broker 发送一次心跳。一般来说,session.timeout.ms 的值是heartbeat.interval.ms值的 3 倍以上。

max.poll.interval.ms 表示 consumer 每两次 poll 消息的时间间隔。简单地说,其实就是 consumer 每次消费消息的时长。如果消息处理的逻辑很重,那么下次就要相应延长。否则如果时间到了 consumer 还么消费完,broker 会默认认为 consumer 死了,发起 rebalance。

max.poll.records 表示每次消费的时候,获取多少条消息。获取的消息条数越多,需要处理的时间越长。所以每次拉取的消息数不能太多,需要保证在 max.poll.interval.ms 设置的时间内能消费完,否则会发生 rebalance。

简单来说,会导致崩溃的几个点是:

消费者心跳超时,导致 rebalance。
消费者处理时间过长,导致 rebalance。

消费者心跳超时

我们知道消费者是通过心跳和协调者保持通讯的,如果协调者收不到心跳,那么协调者会认为这个消费者死亡了,从而发起 rebalance。

而 kafka 的消费者参数设置中,跟心跳相关的两个参数为:

session.timeout.ms 设置了超时时间

heartbeat.interval.ms 心跳时间间隔

消费者处理时间过长

如果消费者处理时间过长,那么同样会导致协调者认为该 consumer 死亡了,从而发起重平衡。

而 kafka 的消费者参数设置中,跟消费处理的两个参数为:

max.poll.interval.ms 每次消费的处理时间

max.poll.records 每次消费的消息数

不过Kafka从0.10.1.0开始,heartbeat就由独立的线程处理了,不受poll影响。

回到一开始的问题

顺着上面的思路,我们知道当消费者处理时间过长时,而项目里的消费逻辑的是很耗时和不可控的,所以可以做一下猜测:

  1. 消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。

  2. 当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。

  3. 并发很大,可能在规定的时间(session.time.out默认30s)内没有消费完,就会可能导致reblance重平衡,导致一部分offset自动提交失败,然后重平衡后重复消费

回归到项目代码和配置中。

@KafkaListener(topics = "xxxx"containerFactory = "kafkaDataUpdateListenerContainerFactory")public void loadDataUpdate(List<ConsumerRecord<?,?>> records, Acknowledgment ack){// 反序列化收到的数据// 任务分发,可能会分发到一些处理时间很长的方法中去// 记录日志}

从代码中发现消费者使用的是自动提交,而且是没有配置session.timeout.ms的。因为消费kafka的方法内有耗时任务,导致了offset还没有提交,与partition失联。

解决问题

找到问题产生的原因,其实就很好解决。只要在消费者正确解析了收到的数据后,立刻调用Acknowledgment.ack.acknowledge()方法提交offset就好了。

kafka自动提交导致重平衡,kafka堆积问题。

参考资料

  • 重平衡场景,写得更好,更详细!推荐!!Kafka | 消费者组重平衡全流程解析_大数据_sinat_27143551的博客-CSDN博客
  • 极客时间 Kafka重平衡消费组重平衡
  • Kafka 重平衡机制 - 后端进阶 - SegmentFault 思否
  • 为什么消费客户端频繁出现Rebalance?_客户端消费问题_常见问题_消息队列Kafka版-阿里云

kafka re-blance 重平衡、堆积、自动提交相关推荐

  1. Kafka 消费者组重平衡(Rebalance)

    Kafka Consumer Reblance 消费者组的重平衡就组内的消费者,对消费那些主题分区达成一致的过程,Kafka会尽量保证分配的均匀. consumer group 的rebalance ...

  2. 一文详细解析kafka重平衡机制

    前言 1.队列重平衡概述 如果对RocketMQ或者对消息中间件有所了解的话,消费端在进行消息消费时至少需要先进行队列(分区)的负载,即一个消费组内的多个消费者如何对订阅的主题中的队列进行负载均衡,当 ...

  3. kafka消费组与重平衡机制详解

    1.消费者组 1.1 介绍 消费者组,即 Consumer Group,应该算是 Kafka 比较有亮点的设计了. 那么何谓 Consumer Group 呢? Consumer Group 是 Ka ...

  4. kafka自动提交offset失败:Auto offset commit failed

    今天在服务日志中观察数据的消费情况时,发现了一个如下的警告,而且每隔几秒就会出现一次,虽然只是个警告, Auto offset commit failed for group order_group: ...

  5. 【Kafka】kafka 重平衡(Rebalance)

    1.概述 转载:https://www.cnblogs.com/listenfwind/p/12662968.html 说完消费者组,再来说说与消费者组息息相关的重平衡机制.重平衡可以说是kafka为 ...

  6. Kafka rebalance 重平衡深度解析

    文章目录 rebalance 触发条件 分区分配策略 rebalance generation 消费者状态机 rebalance 协议 消费者端 rebalance 流程 Broker 端重平衡场景解 ...

  7. Kafka分区分配策略以及重平衡过程总结

    Kafka自身提供了三种分区分配策略,通过消费者端配置参数partition.assignment.strategy来控制. 1.RangeAssignor分配策略(kafka默认的分区策略) 通过配 ...

  8. 从rocketmq到kafka:集群、一致性与重平衡

    rabbitmq的消息可靠性 rabbitmq-幂等引出的性能分析 从rabbitmq到rocketmq 经过上面三篇文章的学习,本篇再来学习 kafka 就会比较简单,概念都是相通的,关键是要联系和 ...

  9. Kafka学习笔记(十)kakfa消费组和重平衡

    版权声明:本文为转载文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 原文链接:https://blog.csdn.net/weixin_39468305/articl ...

最新文章

  1. 整合spring cloud云架构 - SSO单点登录之OAuth2.0登录认证(1)
  2. 容器必须设置宽度吗_UI设计必须要具备的前端知识
  3. “System.AccessViolationException”类型的未经处理的异常在 OpenCvSharp.dll 中发生 其他信息: 尝试读取或写入受保护的内存。这通常指示其他内存已损坏
  4. 浏览器滚动的详细解释 Vue 固定滚动位置的实现
  5. idea+spring boot+jrebel7.0.14热启动
  6. System学习笔记002---Windows下输入日文,あ会变成ち
  7. response.getWriter().write()与out.print()的区别
  8. 随笔小算法:从一个数据根据CRC校验出特定包
  9. 20200708每日一句
  10. 基于ENVI下的土地利用信息提取(三)
  11. 自学python 经验学习心得分享
  12. 黄金比例编程python_python实现黄金分割法
  13. win7 精简板 安装ardunio uno r3驱动
  14. 多数据库应用加强,增加表枚举约定数据库链接配置
  15. 实际项目中的消息中心
  16. arduino uno r3单片机封装图_Arduino教程 Lesson 1 驱动安装及下载Blink程序
  17. ThinkPHP最新版支付宝接口开…
  18. 孔雀鱼幼鱼的第一个月生长成长重要阶段
  19. STM32掌机教程2,掌机的原理
  20. flutter常用命令及问题

热门文章

  1. 网站前台课程设计报告
  2. 随身Wifi搭建Pupbot机器人教程(刷Debian系统)
  3. 基于Flutter实现的仿开眼视频App
  4. 高斯若尔当法算法matlab,大规模问题的分解法-D-W分解法
  5. 默纳克刷机,默纳克刷协议,默纳克显示板 外呼板协议更改 烧录 默纳克各种软件各种刷机,含主板、轿顶板、外呼板刷机软件原程序
  6. 28. 文件挂载,卸载,mount,语法选项示例,挂载光盘,挂载分区,挂载u盘,挂载移动硬盘,卸载,fuser等使用和示例
  7. 博客搭建 | 三、LeanCloud 评论系统
  8. thrift运行过程报错,多线程环境,docker环境
  9. 社群营销的方法和技巧ppt_裂变营销系统:社群裂变需掌握这些技巧-大师熊
  10. 【机器学习】集成学习:Boosting、Bagging 和 Stacking