kafka rebalance故障的处理策略
1. Rebalance 触发与通知
1.1. 触发条件
Rebalance 的触发条件有三种:
- 当 Consumer Group 组成员数量发生变化
- 新成员加入
- 组成员主动离开
- 组成员崩溃
- 消费者心跳超时,导致 rebalance
- 消费者处理时间过长,导致 rebalance。
- 当订阅主题数量发生变化
- 当订阅主题的分区数发生变化
组成员崩溃外,其它都是主动触发的,能比较好地控制。
组成员崩溃 则是预料不到、意外发生的,遇到问题的时候也不好排查。但对于组成员崩溃也是有一些通用的处理策略
1.2. 通知其他 consumer 进程
Rebalance 如何通知其他 consumer 进程?
Rebalance 的通知机制是靠 Consumer 端的心跳线程
- Consumer 端会定期发送心跳请求到 Broker 端的 Coordinator,
- 当协调者决定开启 Rebalance 后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中发送给 Consumer ,
- 当 Consumer 发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就知道 Rebalance 开始了。
1.3. Rebalance相关的概念
rebalance(重平衡)其实就是重新进行 partition 的分配,从而使得 partition 的分配重新达到平衡状态
1.3.1. Coordinator介绍
Coordinator一般指的是运行在broker上的group Coordinator,用于管理Consumer Group中各个成员,每个KafkaServer都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance。
Coordinator存储的信息
对于每个Consumer Group,Coordinator会存储以下信息:
- 对每个topic,可能有多个消费组group订阅同一个topic
- 对每个Consumer Group,元数据如下:
- 订阅的topics列表
- Consumer Group配置信息,包括session timeout等
- 组中每个Consumer的元数据。包括主机名,consumer id
- 每个正在消费的topic partition的当前offsets
- Partition的ownership元数据,包括consumer与partitions映射关系
如何确定consumer group的coordinator
简单来说分为两步:
- 确定consumer group位移信息写入__consumers_offsets这个topic的哪个分区。具体计算公式:
__consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) 注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。 - 该分区leader所在的broker就是被选定的coordinator
1.3.2. Rebalance 协议 (protocol) 说明
Rebalance 本质上也是一组协议。Consumer Group 与 Coordinator 共同使用它来完成 Consumer Group 的 Rebalance
- Heartbeat请求:Consumer 需要定期给 Coordinator 发送心跳来证明自己还活着。
- LeaveGroup请求:主动告诉 Coordinator 要离开 Consumer Group
- SyncGroup请求:Group Leader Consumer 把分配方案告诉组内所有成员
- JoinGroup请求:成员请求加入组
- DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用。
Coordinator 在 Rebalance 的时候主要用到了前面4种请求
1.3.3. Consumer Group 状态机
Rebalance 一旦发生,必定会涉及到 Consumer Group 的状态流转,此时 Kafka 为我们设计了一套完整的状态机机制,来帮助 Broker Coordinator 完成整个重平衡流程
- Empty 状态表示当前组内无成员, 但是可能存在 Consumer Group 已提交的位移数据,且未过期,这种状态只能响应 JoinGroup 请求。
- Dead 状态表示组内已经没有任何成员的状态,组内的元数据已经被 Broker Coordinator 移除,这种状态响应各种请求都是一个Response:UNKNOWN_MEMBER_ID。
- PreparingRebalance 状态表示准备开始新的 Rebalance, 等待组内所有成员重新加入组内。
- CompletingRebalance 状态表示组内成员都已经加入成功,正在等待分配方案,旧版本中叫“AwaitingSync”。
- Stable 状态表示 Rebalance 已经完成, 组内 Consumer 可以开始消费了。
通过上面5种状态可以看出,Rebalance 主要分为两个步骤:加入组(对应JoinGroup请求)和等待 Leader Consumer 分配方案(SyncGroup 请求)。
2. rebalance问题处理策略
2.1. 涉及的重要参数
session.timeout.ms: consumer 向 broker 发送心跳的超时时间。例如 session.timeout.ms = 180000 表示在最长 180 秒内 broker 没收到 consumer 的心跳,那么 broker 就认为该 consumer 死亡了,会引发 rebalance。
heartbeat.interval.ms: consumer 每次向 broker 发送心跳的时间间隔。heartbeat.interval.ms = 60000 表示 consumer 每 60 秒向 broker 发送一次心跳。一般来说,session.timeout.ms 的值是 heartbeat.interval.ms 值的 3 倍以上。
max.poll.interval.ms : consumer 每两次 poll 消息的时间间隔。简单地说就是 consumer 每次消费消息的时长。如果消息处理的逻辑很重,那么此值就要相应延长。否则如果时间到了但 consumer 还么消费完,broker 会默认认为 consumer 死了,发起 rebalance。
max.poll.records: 每次消费的时候,获取多少条消息。获取的消息条数越多,需要处理的时间越长。所以每次拉取的消息数不能太多,需要保证在 max.poll.interval.ms 设置的时间内能消费完,否则会引发 rebalance。
所以消费者心跳超时、消费者处理时间过长都会引起rebalance。
2.2. 问题处理策略
2.2.1. 消费者心跳超时
消费者是通过心跳和协调者保持通讯的,如果协调者收不到心跳,那么协调者会认为这个消费者死亡了,从而发起 rebalance。
在kafka 的消费者参数设置中,与心跳相关的两个参数为:
- session.timeout.ms 设置了超时时间
- heartbeat.interval.ms 心跳时间间隔
需要调整 session.timeout.ms 和 heartbeat.interval.ms 参数,使得消费者与协调者能保持心跳。
一般来说,超时时间应该是心跳间隔的 3 倍时间,因为这样的话,在一个超时周期内就可以有多次心跳,避免网络问题导致偶发失败。
session.timeout.ms 如果设置为 180 秒,那么 heartbeat.interval.ms 最多设置为 60 秒。
2.2.2. 消费者处理时间过长
如果消费者处理时间过长,那么同样会导致协调者认为该 consumer 死亡了,从而发起rebalance。
在 kafka 的消费者参数设置中,与消费处理的两个参数为:
- max.poll.interval.ms 每次消费的处理时间
- max.poll.records 每次消费的消息数
对于这种情况,一般来说就是增加消费者处理的时间(即提高 max.poll.interval.ms 的值),减少每次处理的消息数(即减少 max.poll.records 的值)。
除此之外,超时时间参数(session.timeout.ms)与 消费者每次处理的时间(max.poll.interval.ms)也是有关联的。max.poll.interval.ms 时间不能超过 session.timeout.ms 时间。 因为在 kafka 消费者的实现中,是单线程去消费消息和执行心跳的,如果线程卡在处理消息,那么这时候即使到时间要心跳了,还是没有线程可以去执行心跳操作。
对于 rebalance 类问题的处理策略,简单来讲就是处理好心跳超时问题和消费处理超时问题
对于心跳超时问题。一般是调整超时时间(session.timeout.ms)和心跳间隔时间(heartbeat.interval.ms)的比例及数值。
阿里云官方文档建议超时时间(session.timeout.ms)设置成 25s,最长不超过 30s。那么心跳间隔时间(heartbeat.interval.ms)就不超过 10s。
对于消费处理超时问题。一般是增加消费者处理的时间(max.poll.interval.ms),减少每次处理的消息数(max.poll.records)。
阿里云官方文档建议 max.poll.records 参数要远小于当前消费组的消费能力(records < 单个线程每秒消费的条数 * 消费线程的个数 * session.timeout的秒数)。
kafka rebalance故障的处理策略相关推荐
- Kafka Rebalance测试
Kafka Rebalance测试 关于kafka的Rebalance机制,其实就是规定同一个consumer group下所有的consumer如何协调工作的,分配订阅Topic分区的.Rebala ...
- Kafka rebalance 重平衡深度解析
文章目录 rebalance 触发条件 分区分配策略 rebalance generation 消费者状态机 rebalance 协议 消费者端 rebalance 流程 Broker 端重平衡场景解 ...
- 质量流量计雷电击故障的应对策略
质量流量计雷电击故障的应对策略 质量流量计故障主要发生在运行期和调试期两个阶段.在调试期间,故障发生在调试后的早期阶段,主要原因是仪表选型不当或安装不当.其主要原因是流体中的杂质附着在电极内衬上,当环 ...
- Kafka rebalance 的几种原因与解决方案
网上有很多文章讲述 Kafka rebalance 的原理,本文是列举常见的几种 rebalance 场景: 如果一个 consumer 刚启动,则会向 broker 发送 JoinGroup 请求, ...
- kafka一直rebalance故障,重复消费
今天我司线上kafka消息代理出现错误日志,异常rebalance,而且平均间隔2到3分钟就会rebalance一次,分析日志发现比较严重.错误日志如下 08-09 11:01:11 131 pool ...
- 【kafka】记一次线上kafka一直rebalance故障 消费慢 数据积压
文章目录 1.背景 2. 分析问题 3.分析原因 4.拉取偏移量与提交偏移量 5.解决方案 5.1.增加max.poll.interval.ms处理时长 5.2设置分区拉取阈值 5.3.poll到的消 ...
- Kafka 原理以及分区分配策略剖析
欢迎关注方志朋的博客,回复"666"获面试宝典 一.简介 Apache Kafka 是一个分布式的流处理平台(分布式的基于发布/订阅模式的消息队列[Message Qu ...
- kafka中副本数据同步策略 ,acknowledge的发送策略,kafka的数据可靠性保证
ack(acknowledge)简介 为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的消息后,都需要向producer发送 ...
- KafKa - 控制器作用 及 选举策略
一.KafKa控制器作用 在 kafka 中分为 broker 和 partition 分区,其中分区副本在前几篇文章中都进行了讲解,本篇文章针对 broker 进行分析,其中在 kafka 集群中, ...
最新文章
- Vue入门三、过滤器filter
- Harbor仓库的管理
- ARCGIS导入XY坐标的EXCEL文档,出现无法选择X和Y字段的问题
- Py之easygui:easygui的简介、安装(最正确安装)、使用方法之详细攻略
- shell脚本获取系统的前一天日期,格式为yyyymmdd
- pandas Dataframe/Series 设置保留小数位数
- Linux学习总结(19)——Linux中文本编辑器vim特殊使用方法
- vs不能调试_20200717调试记录(五十四)
- redis MySQL 脏读_redis多线程情况下避免读脏数据的悲观锁解决方案
- 使用AUI框架开发微信小程序
- 网络上的计算机病毒怎么办,电脑中病毒了怎么办
- 一、软件/软件工程/软件开发模型概述
- python(第七天)
- HTML5吃豆豆游戏开发实战(一)使用Canvas绘制游戏主角
- 解析button和input type=button 的区别
- A New Approach for English-Chinese Named Entity Alignment(跨语言实体对齐)
- 第6章第23节:文字视觉化:使用图片来诠释文字的涵义 [PowerPoint精美幻灯片实战教程]
- python美观代码_为什么Python 代码要写得美观而明确
- 推荐一个后台管理系统
- 最受程序员欢迎的30款开源软件,个个都很能打,值得拥有!
热门文章
- 极光推送设置别名setAlias失败
- 先设计一个基本账户类,再通过继承基本账户类设计一个储蓄账户类,储蓄账户类中增加密码、地址、最小余额和利率等成员变量,并增加一些银行账户经常用到的成员函数。
- vant官网无法打开
- armDebian使用中科大的源
- m未能建立与ppp服务器的连接6,未能建立与ppp服务器
- Zookeeper详细介绍+dubbo简单介绍+简单大白话讲解
- android 实现表格横向混动_「PHEVREEV」插电混动与增程系统技术特点解析:节油原理与性能...
- Vue实战篇三十:实现一个简易版的头条新闻
- Java项目开发(航空管理系统)
- 去一趟云台山,感受一次山巅的遐想