Kafka Rebalance测试

关于kafka的Rebalance机制,其实就是规定同一个consumer group下所有的consumer如何协调工作的,分配订阅Topic分区的。Rebalance发生时,Group 下所有 consumer 实例都会协调在一起共同参与,kafka 能够保证尽量达到最公平的分配。但是 Rebalance 过程对 consumer group 会造成比较严重的影响。在 Rebalance 的过程中 consumer group 下的所有消费者实例都会停止工作,等待 Rebalance 过程完成。

关于触发Rebalance的条件主要有3个,今天主要测试下第一种情况引起的Rebalance:

  1. 组内consumer成员数量发生变化。
  2. 订阅的topic发生变化。
  3. 订阅的topic的分区数发生变化。

首先我这里创建了一个有3个分区的topic

代码中每隔20秒创建一个KafkaConsumer,Consumer属于同一个group,都订阅topic_1123,总共创建了4个消费者,最后关闭了消费者2

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;import java.time.Duration;
import java.util.*;public class KafkaConsumerTest {static {//设置log级别LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();List<Logger> loggerList = loggerContext.getLoggerList();loggerList.forEach(logger -> logger.setLevel(Level.INFO));loggerContext.getLogger(ConsumerConfig.class).setLevel(Level.ERROR);}private static final Logger log = (Logger) LoggerFactory.getLogger(KafkaConsumerTest.class);private static boolean flag = true;public static void main(String[] args) throws InterruptedException {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.80.130:9092");//发送心跳请求频率的参数,这个值设置得越小,Consumer 实例发送心跳请求的频率就越高props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000 * 2);//此时间内服务端未收到consumer心跳请求,服务端认为消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认3s。props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000 * 6);//获取消息后提交偏移量的最大时间(默认5分钟),超时服务端会认为消费者失效,触发Rebalance,并且提交offset会失败props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000);//是否自动提交offsetprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//每次Poll的最大数量。注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。默认500props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);//消息的反序列化方式。props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//属于同一个组的消费实例,会负载消费消息。props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");// kafka安全认证相关配置,kafka没有配置认证授权可忽略props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_256.mechanismName());props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"reader\" password=\"reader\";");log.info("=========================================================第一个消费者加入=========================================================");new Thread(() -> {KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("topic_1123"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {log.info("消费者1:offset: " + consumerRecord.offset() + "----- message: " + consumerRecord.value());}// 同步提交offset,会阻塞消费者线程直至位移提交完成consumer.commitSync();}}).start();//睡一会儿Thread.sleep(20 * 1000);log.info("=========================================================第二个消费者加入=========================================================");//第二个消费者加入new Thread(() -> {KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(props);consumer2.subscribe(Collections.singletonList("topic_1123"));while (flag) {ConsumerRecords<String, String> consumerRecords = consumer2.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {log.info("消费者2:offset: " + consumerRecord.offset() + "----- message: " + consumerRecord.value());}// 同步提交offset,会阻塞消费者线程直至位移提交完成consumer2.commitSync();}consumer2.close();}).start();//睡一会儿Thread.sleep(20 * 1000);log.info("=========================================================第三个消费者加入=========================================================");//第三个消费者加入new Thread(() -> {KafkaConsumer<String, String> consumer3 = new KafkaConsumer<>(props);consumer3.subscribe(Collections.singletonList("topic_1123"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer3.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {log.info("消费者3:offset: " + consumerRecord.offset() + "----- message: " + consumerRecord.value());}// 同步提交offset,会阻塞消费者线程直至位移提交完成consumer3.commitSync();}}).start();//睡一会儿Thread.sleep(20 * 1000);log.info("=========================================================第四个消费者加入=========================================================");//第四个消费者加入new Thread(() -> {KafkaConsumer<String, String> consumer4 = new KafkaConsumer<>(props);consumer4.subscribe(Collections.singletonList("topic_1123"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer4.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {log.info("消费者4:offset: " + consumerRecord.offset() + "----- message: " + consumerRecord.value());}// 同步提交offset,会阻塞消费者线程直至位移提交完成consumer4.commitSync();}}).start();//睡一会儿Thread.sleep(20 * 1000);log.info("=========================================================关闭一个消费者=========================================================");//关闭第二个消费者flag = false;}
}

结果分析:
1、第一个consumer创建时,先是申请加入组joining group,然后分配消费方案——3个分区都是consumer1负责消费。

2、在第二个consumer创建时,consumer2会申请加入组joining group,因为组内已经有consumer1存在,所以会触发rebalance,撤销consumer1之前分配的分区,consumer1也会重新加入组,参与新一轮的分配策略。最后,consumer1消费0和1分区,consumer2消费2分区。


3、在第三个consumer创建时,可以看到和consumer2加入组的逻辑基本一致,consumer3加入joining group,触发rebalance,撤销consumer1分配的分区,consumer1加入组,撤销consumer2分配的分区,consumer2加入组,Coordinator重新分配消费策略。最后,consumer1消费0分区,consumer2消费1分区,consumer3消费2分区。

4、在第四个consumer创建时,仍然会申请joining group,触发rebalance,其他已经在组内的consumer会撤销之前的订阅,重新加入组,等待Coordinator重新分配消费策略。但是,最后可以发现consumer-group_test-4并未分配到分区,即不会消息到任何消息。

5、最后,我关闭了consumer2,仍然触发了rebalance,consumer1、3、4重新加入组参与分区分配,consumer1消费0分区,consumer3消费1分区,consumer4消费2分区。

结论:
其实整个rebalance过程Coordinator进行控制的,Consumer正常的添加和销毁导致rebalance都是正常的,但是在某些情况下,Coordinator会错误的任务Consumer已经“死亡”,从而将Consumer提出Group,触发rebalance。
在Consumer端有几个参数是影响Coordinator判断的关键:

  • session.timeout.ms:当Consumer完成分区分配后,会定期地向Coordinator发送心跳请求,表明它还活着,如果Coordinator在超过session.timeout.ms时间还没有收到Consumer发送的心跳请求,那么会认为此Consumer已经“死亡”,从而将Consumer踢出Group。此值默认值为10s。
  • heartbeat.interval.ms:控制发送心跳请求频率的参数,这个值设置得越小,Consumer实例发送心跳请求的频率就越高,默认值3秒。此值要小于session.timeout.ms,一般设置为session.timeout.ms的1/3。这个值设置得越小,Consumer实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance
  • max.poll.interval.ms :它限定了Consumer端应用程序两次调用poll方法的最大时间间隔。它的默认值是5分钟,表示你的Consumer程序如果在5分钟之内无法消费完poll方法返回的消息,那么Consumer会主动发起 “离开组” 的请求,Coordinator 也会开启新一轮Rebalance。

Kafka Rebalance测试相关推荐

  1. kafka吞吐量测试

    参考文章: https://cloud.tencent.com/developer/article/1587057 kafka吞吐量测试 (1) 测试kafka生产者的吞吐量 [root@hadoop ...

  2. kafka压力测试说明

    1 整体环境说明 1.1 硬件环境 1. 磁盘:SATA磁盘2块,磁盘阵列为RAID1 2. CPU****:2个4核CPU.具体参数:Intel(R) Xeon(R) CPU E5405 @ 2.0 ...

  3. Kafka吞吐量测试案例

    Kafka吞吐量测试案例 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 领英公司参考连接:https://www.slideshare.net/JiangjieQin/produc ...

  4. Kafka rebalance 的几种原因与解决方案

    网上有很多文章讲述 Kafka rebalance 的原理,本文是列举常见的几种 rebalance 场景: 如果一个 consumer 刚启动,则会向 broker 发送 JoinGroup 请求, ...

  5. 4. Flume和Kafka连通测试

    1. Kafka 安装与配置 1.1 下载Kafka 在官网下载kafka,kafka_2.11-2.1.1.tgz. 1.2 解压 解压至指定目录.tar -zxvf -C 你的目录 1.3 启动k ...

  6. kafka rebalance 部分分区没有owner

    转发请注明原创地址http://www.cnblogs.com/dongxiao-yang/p/6234673.html 最近业务同学反馈kafka上线的时候某个topic的部分分区一直没有owner ...

  7. java客户端作为kafka消费者测试

    [README] 本文主要对 java客户端作为kafka 消费者进行测试, 生产者由 kafka客户端扮演: [1]普通消费者 设置消费者组: 重置消费者的offset, 即每次都从最头开始消费(默 ...

  8. Kafka Rebalance机制

    一.什么是 Rebalance Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区. 假设目前 ...

  9. Kafka压力测试(写入MQ消息压测和消费MQ消息压测)

    1.测试目的 本次性能测试在正式环境下单台服务器上Kafka处理MQ消息能力进行压力测试.测试包括对Kafka写入MQ消息和消费MQ消息进行压力测试,根据10w.100w和1000w级别的消息处理结果 ...

最新文章

  1. Java程序员如何运用所掌握的技术构建一个完整的业务架构
  2. 使用reserve函数避免vector和string的内存重新分配
  3. SON Web Token设计单点登录系统
  4. 如何在C#中实现图片缩放
  5. 虚树(bzoj 3572: [Hnoi2014]世界树)
  6. zero to one (2)
  7. oracle系统AP对应的凭证编号,AP主要账户及会计分录
  8. 全球及中国信用卡生成器行业投资模式及投资策略分析报告2022-2028年
  9. 安卓学习日志 Day15 — 数据库基础
  10. 西门子触摸屏数据历史数据记录_西门子触摸屏参数跟数据简单说明
  11. 计算机键盘都起什么作用,电脑键盘各个按键功能 电脑键盘各个按键有什么功能...
  12. 重磅!厦门大学信息学院11篇论文入选AI顶会AAAI 2021
  13. form表单回车会自动提交
  14. SpringCloud基础
  15. mac系统可以进行软件测试吗,iPhone和Mac如何加入Apple Beta版软件测试计划
  16. #你好Unity3D#避免玩家作弊(来自我的长微博)
  17. 广告中的DSP、SSP和ADX
  18. 用过滤器实现file类实现输出目录以及子目下的结尾为.java的文件
  19. python类中的特殊方法_python类中的特殊成员方法介绍
  20. 基于cpt的组网实验_基于E-PUCK 2.0多智能体自主协同 高频投影定位系统

热门文章

  1. 自主可控计算机扯的重要性,2019自主可控计算机大会在京召开
  2. nc、ss、ps命令
  3. 基于Python语言的Web框架flask实现的校园二手物品发布平台
  4. Win10系统误删Winsock和Winsock2注册表,导致连不上Wifi以及无法上网等问题的解决方法
  5. 如何利用MATLAB求矩阵的逆阵?
  6. 智慧水务管控一体化平台,实现水务数字化管理
  7. 公司电池包生产线的优化研究
  8. dede织梦 常用标签
  9. 十分好用的跨浏览器测试工具,建议收藏!!!
  10. __builtin_memset的-Warray-bounds 警告