文章目录

  • rebalance 触发条件
  • 分区分配策略
  • rebalance generation
  • 消费者状态机
  • rebalance 协议
  • 消费者端 rebalance 流程
  • Broker 端重平衡场景解析
    • 新成员入组
    • 组成员主动离场
    • 组成员崩溃离场
    • 重平衡时协调者对组内成员提交位移的处理
  • rebalance 监听器

consumer group 是用于实现高伸缩性、高容错性的 consumer 机制。组内多个 consumer 实例可以同时读取 Kafka 消息,而且一旦有某个 consumer “挂”了,consumer group 会立即将己崩溃 consumer 负责的分区转交给其他 consumer 来负责,从而保证整个 group 可以继续工作,不会丢失数据。
consumer group 的 rebalance 本质上是一组协议,它规定了一个 consumer group 是如何达成一致来分配订阅 topic 的所有分区的 。 假设某个组下有 20 个 consumer 实例,该组订阅了一个有着 100 个分区的 topic 。 正常情况下, Kafka 会为每个 consumer 平均分配 5 个分区。这个分
配过程就被称为 rebalance 。 当 consumer 成功地执行 rebalance 后,组订阅 topic 的每个分区只会分配给组内的一个 consumer 实例。
Kafka 内置的一个全新的组协调协议(group coordination protocol) 。对于每个组而言, Kafka 的某个 broker 会被选举为组协调者(group coordinator)。coordinator 负责对组的状态进行管理,它的主要职责就是当新成员到达时促成组内所有成员达成新的分区分配方案,即 coordinator 负责对组执行 rebalance 操作。

rebalance 触发条件

组 rebalance 触发的条件有以下 3 个:

  1. 组成员发生变更,比如新 consumer 加入组,或己有 consumer 主动离开组,再或是己有 consumer 崩溃时则触发 rebalance 。
  2. 组订阅 topic 数发生变更,比如使用基于正则表达式的订阅,当匹配正则表达式的新 topic 被创建时则会触发 rebalance。
  3. 组订阅 topic 的分区数发生变更,比如使用命令行脚本增加了订阅 topic 的分区数。

真实应用场景中引发 rebalance 最常见的原因就是违背了第一个条件,特别是 consumer 崩溃的情况。这里的崩横不一定就是指 consumer 进程“挂掉”或 consumer 进程所在的机器岩机 。
当 consumer 无法在指定的时间内完成消息的处理,那么 coordinator 就认为该 consumer 己经崩溃,从而引发新一轮 rebalance。可以通过 [[消费者参数#max poll interval ms|max.poll.interval.ms]] 参数配置 consumer 处理逻辑最大时间。

分区分配策略

Kafka consumer 默认提供了 3 种分配策略,分别是 range 策略、round-robin 策略和 sticky 策略。通过 consumer 参数 partition.assignment. strategy 来进行配置。
所谓的分配策略决定了订阅 topic 的每个分区会被分配给哪个 consumer 。 range 策略主要是基于范围的思想。它将单个 topic 的所有分区按照顺序排列,然后把这些分区划分成固定大小的分区段井依次分配给每个 consumer。假设有 1 个消费者线程订阅了 2 个 topic; round-robin 策略则会把所有 topic 的所有分区顺序摆开,然后轮询式地分配给各个 consumer。我们以 spring-kafka 举例,假设我们使用@KafkaListener 同时监听了 2 个 Topic,每个 topic 的分区为 3,concurrency 设置为 6。
如果是 range 策略的话,则是 3 个线程,负责 2 个 Topic 共 6 个分区。其他 3 个线程空闲。如果是 round-robin 策略,则是每个线程分配一个分区。
另外 Kafka 支持自定义的分配策略,用户可以创建自己的 consumer 分配器(assignor)。

rebalance generation

某个 consumer group 可以执行任意次 rebalance。为了更好地隔离每次 rebalance 上的数据,新版本 consumer 设计了 rebalance generation 用于标识某次 rebalance, consumer 中它是一个整数,通常从 0 开始。
Kafka 引入 consumer generation 主要是为了保护 consumer group 的,特别是防止无效 offset 提交 。
比如上一届的 consumer 成员由于某些原因延迟提交了 offset,但 rebalance 之后该 group 产生了新一届的 group 成员,而这次延迟的 offset 提交携带的是旧的 generation 信息,因此这次提交会被 consumer group 拒绝。在使用 consumer 时经常碰到的 ILLEGAL GE 阳 RATION 异常就是这个原
因导致的。
事实上,每个 group 进行 rebalance 之后, generation 号都会加 1,表示 group 进入了一个新的版本。Generation 1 时 group 有 3 个成员,随后成员 2 退出组, coordinator 触发 rebalance, consumer group 进入到 Generation 2 时代,之后成员 4 加入,再次触发 rebalance, group 进入到 Generation 3 时代。

消费者状态机

重平衡一旦开启,Broker 端的协调者组件就要开始忙了,主要涉及到控制消费者组的状态流转。当前,Kafka 设计了一套消费者组状态机(State Machine),来帮助协调者完成整个重平衡流程。
Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。

状态机的各个状态流转:

一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance 状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。
当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。
如果 Kafka 日志中出现 Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.,这就是 Kafka 在尝试定期删除过期位移。只有 Empty 状态下的组,才会执行过期位移删除的操作。

rebalance 协议

上文提到过 rebalance 本质上是一组协议。Kafka 中提供了下面 5 个协议来处理 rebalance 相关事宜:

  • Join Group 请求:consumer 请求加入组。
  • SyncGroup 请求:group leader 把分配方案同步更新到组内所有成员中 。
  • Heartbeat 请求: consumer 定期向 coordinator 汇报心跳表明自己依然存活。
  • LeaveGroup 请求: consumer 主动通知 coordinator 该 consumer 即将离组 。
  • DescribeGroup 请求:查看组的所有信息,包括成员信息、协议信息、分配方案以及订阅信息等。该请求类型主要供管理员使用。coordinator 不使用该请求执行 rebalance。

在 rebalance 过程中, coordinator 主要处理 consumer 发过来 JoinGroup 和 SyncGroup 请求 。当 consumer 主动离组时会发送 LeaveGroup 请求给 coordinator。在成功 rebalance 之后,组内所有 consumer 都需要定期地向 coordinator 发送[[心跳请求]]。
而每个 consumer 也是根据 Heartbeat 请求的响应中是否包含
REBALANCE_IN_PROGRESS 来判断当前 group 是否开启了新一轮 rebalance。
通过 [[消费者参数#heartbeat interval ms|heartbeat.interval.ms]] 可以控制重平衡的通知频率。

消费者端 rebalance 流程

consumer group 在执行 rebalance 之前必须首先确定 coordinator 所在的 broker,并创建与该 broker 相互通信的 Socket 连接。确定 coordinator 的算法与确定 offset 被提交到 consumer offsets 目标分区的算法是相同的,算法如下:

  • 计算 Math.abs(groupID.hashCode) % offsets.topic.num.partitions 参数值(默认是 50),假设是 10。
  • 寻找 [[__consumer_offsets]] 分区 10 的 leader 副本所在的 broker,该 broker 即为这个 group 的 coordinator。

成功连接 coordinator 之后便可以执行 rebalance 操作。目前 rebalance 主要分为两步:加入组和同步更新分配方案。

  • 加入组:这一步中组内所有 consumer (即 group.id 相同的所有 consumer 实例)向 coordinator 发送 JoinGroup 请求 。 当收集全 JoinGroup 请求后, coordinator 从中选择一个 consumer 担任 group 的 leader,并把所有成员信息以及它们的订阅信息发送给 leader。特别需要注意的是, group 的 leader 和 coordinator 不是一个概念。leader 是某个 consumer 实例, coordinator 通常是 Kafka 集群中的一个 broker 。 另外 leader 而非 coordinator 负责为整个 group 的所有成员制定分配方案。
  • 同步更新分配方案:这一步中 leader 开始制定分配方案,即根据前面提到的分配策略决定每个 consumer 都负责哪些 topic 的哪些分区。 一旦分配完成, leader 会把这个分配方案封装进 SyncGroup 请求并发送给 coordinator。组内所有成员都会发送 SyncGroup 请求,不过只有 leader 发送的 SyncGroup 请求中包含了分配方案。coordinator 接收到分配方案后把属于每个 consumer 的方案单独抽取出来作为 SyncGroup 请求的 response 返还给各自的 consumer。

Broker 端重平衡场景解析

新成员入组

新成员入组是指组处于 Stable 状态后,有新成员加入。主要是组稳定了之后有新成员加入的情形。当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。

组成员主动离场

消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员。

组成员崩溃离场

崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由 [[消费者参数#session timeout ms|session.timeout.ms]] 控制的。也就是说,Kafka 一般不会超过 session.timeout.ms 就能感知到这个崩溃。

重平衡时协调者对组内成员提交位移的处理

正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup 请求发送。

rebalance 监听器

consumer 默认把位移提交到 [[__consumer_offsets]] 中,但是 Kafka 也支持用户把位移提交到外部存储中,比如数据库中。若要实现这个功能,用户就必须使用 rebalance 监听器 。 使用 rebalance 监听器的前提是用户使用 consumer group 。 如果使用的是独立 consumer 或是直接手动分配分区,那么 rebalance 监听器是无效的。
rebalance 监听器有一个主要的接口回调类
ConsumerRebalanceListener,里面就两个方法 onPartitionsRevoked 和 onPartitionAssigned。在 coordinator 开启新一轮 rebalance 前
onPartitionsRevoked 方法会被调用,而 rebalance 完成后会调用 onPartitionsAssigned 方法。

Kafka rebalance 重平衡深度解析相关推荐

  1. kafka re-blance 重平衡、堆积、自动提交

    由一次kafk数据堆积说起 因为公司的项目是由一个第三方的旧系统迁移过渡开发过来的,而且时间很急,所以有许多数据需要修正.为了不影响线上的业务,修复数据的逻辑是在另一个应用的,可以通过管理系统圈定数据 ...

  2. kafka 脚本发送_Apache-Flink深度解析-DataStream-Connectors之Kafka

    聊什么 为了满足本系列读者的需求,在完成<Apache Flink 漫谈系列(14) - DataStream Connectors>之前,我先介绍一下Kafka在Apache Flink ...

  3. 一文详细解析kafka重平衡机制

    前言 1.队列重平衡概述 如果对RocketMQ或者对消息中间件有所了解的话,消费端在进行消息消费时至少需要先进行队列(分区)的负载,即一个消费组内的多个消费者如何对订阅的主题中的队列进行负载均衡,当 ...

  4. Kafka分区分配策略以及重平衡过程总结

    Kafka自身提供了三种分区分配策略,通过消费者端配置参数partition.assignment.strategy来控制. 1.RangeAssignor分配策略(kafka默认的分区策略) 通过配 ...

  5. kafka rebalance 总结说明图

    用kafka的第一件事儿就得了解kafka的重平衡,即kafka的rebalance,这个很重要,不然这个kafka就会使不好的,出问题了,就理解不了,得了解一下什么是rebalance,能干啥,为啥 ...

  6. 【Kafka】kafka 重平衡(Rebalance)

    1.概述 转载:https://www.cnblogs.com/listenfwind/p/12662968.html 说完消费者组,再来说说与消费者组息息相关的重平衡机制.重平衡可以说是kafka为 ...

  7. Kafka 消费者组重平衡(Rebalance)

    Kafka Consumer Reblance 消费者组的重平衡就组内的消费者,对消费那些主题分区达成一致的过程,Kafka会尽量保证分配的均匀. consumer group 的rebalance ...

  8. Kafka深度解析(如何在producer中指定partition)(转)

    原文链接:Kafka深度解析 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统.主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能 ...

  9. 发布订阅的消息系统 Kafka的深度解析

    发布&订阅的消息系统 Kafka的深度解析 2015-01-27 10:25 Jason Guo Jason Guo的博客 字号: T | T 一个典型的kafka集群中包含若干produce ...

最新文章

  1. 程序员,你恐慌的到底是什么?
  2. NeHe OpenGL教程 第三十七课:卡通映射
  3. POJ 1189 钉子和小球
  4. WindowsServer2012 DFS配置出错原因
  5. Java库转oc_急急急!各位大神:一段JAVA代码转成OC代码。
  6. 【Gerrit】Add a Member
  7. Python实现笑脸检测+人脸口罩检测
  8. HTML5 WebRTC API无需网络获取本地IP
  9. html 下划线_web前端实战入门训练之HTML基本元素
  10. 使用MLM和TLM训练XLM
  11. 如何找到最快的DNS服务器
  12. android程序 获取flash容量大小,Android用WebView加载flash大文件偶然会出现内存溢出以及蓝色打问号小方块问题的解决方案...
  13. 如何选择和使用现货白银
  14. 相似度系列-6:单维度方法:Evaluating Coherence in Dialogue Systems using Entailment
  15. java在gc正常工作的情况下_Java GC的工作原理
  16. python3新式类_Python中新式类与经典类的区别详析
  17. [02-14] 绿色免费软件更新
  18. 多元微积分_多元连式法则2 多元连式法则与方向向量
  19. 小白 mysql5.7 非安装版配置教程+百度云资源分享
  20. 环保设施用电监管云平台、蓝天碧水保卫战解决方案

热门文章

  1. IPv6网络的可操作安全考虑——RFC9099解析(五)
  2. 同行不同命:极兔喜、韵达愁?
  3. 2020电工(高级)证考试及电工(高级)模拟考试题
  4. Solor集群——SolorCloud
  5. 数学:确定性的丧失---第十一章 形式主义与集合论公理化基础
  6. 翻转英文句子,标点位置不变
  7. 获取mimeType
  8. vue中使用mock模拟数据
  9. 计算机软件版本如何命名,软件项目版本号的命名规则及格式
  10. SV绿皮书提炼笔记(五)