【README】

  • 本文使用的kafka是最新的 kafka3.0.0;本文kafka集群有3个节点分别是 centos201, centos202, centos203 ; brokerid 分别为 1,2,3;
  • 本文主要用于测试 再均衡监听器;当有新消费者加入时,会发生分区再均衡;再均衡前就会调用再均衡监听器的 onPartitionsRevoked()方法;
  • 本文的测试主题 hello12,有3个分区,每个分区2个副本;


【1】再均衡监听器

1)应用场景

在消费者退出或进行分区再均衡前,会做一些清理工作,如提交偏移量,或关闭数据库连接;这些工作可以通过监听器来实现;

2)再均衡监听器 ConsumerRebalanceListener

实现 该监听器即可,有3个方法

  1. onPartitionsRevoked:在分区均衡开始【前】和消费者停止读取消息【后】被调用;
  2. onPartitionsAssigned:分区再均衡【后】和消费者开始读取消息【前】被调用 ;
  3. onPartitionsLost:分区宕机时调用(本文不涉及);
/*** @Description 消费者分区再均衡监听器实现类* @author xiao tang* @version 1.0.0* @createTime 2021年12月11日*/
public class ConsumerBalanceListenerImpl implements ConsumerRebalanceListener {/** 消费者 */private Consumer consumer;/** 主题分区偏移量数组  */private MyTopicPartitionOffset[] topicPartitionOffsetArr;/*** @description 构造器* @param consumer 消费者* @param curOffsets 当前偏移量* @author xiao tang* @date 2021/12/11*/public ConsumerBalanceListenerImpl(Consumer consumer, MyTopicPartitionOffset[] topicPartitionOffsetArr) {this.consumer = consumer;this.topicPartitionOffsetArr = topicPartitionOffsetArr;}/** * @description 在分区均衡开始【前】和消费者停止读取消息【后】被调用* @param partitions 分区列表(分区号从0开始计数)* @author xiao tang* @date 2021/12/12 */@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {System.out.println("=== 分区再均衡触发onPartitionsRevoked()方法");// 提交偏移量回调(或记录错误日志)OffsetCommitCallback offsetCommitCallback = new OffsetCommitCallbackImpl();// 打印日志Arrays.stream(topicPartitionOffsetArr).filter(x->x.partition()>-1).forEach(x->System.out.printf("提交偏移量信息,partition【%d】offset【%s】\n", x.partition(), x.offset()));// 把数组转为主题分区与偏移量映射,并提交最后一次处理的偏移量 (可以异步,也可以同步)// 同步提交一直重试或报超时异常// 异步提交传入提交回调,失败自行处理consumer.commitAsync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr), offsetCommitCallback);// 停止程序的原因在于做实验,下次从本次提交的偏移量开始消费throw new RuntimeException("发生分区再均衡,程序结束");}/*** @description 分区再均衡【后】和消费者开始读取消息【前】被调用* @param partitions 主题分区列表* @author xiao tang* @date 2021/12/12*/@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {// do sth}@Overridepublic void onPartitionsLost(Collection<TopicPartition> partitions) {ConsumerRebalanceListener.super.onPartitionsLost(partitions);}
}

为了测试,分区再均衡监听器中,onPartitionsRevoked() 方法提交最后已消费消息的偏移量后,就抛出运行时异常结束运行,让其他消费者消费以便查看监听器是否成功提交偏移量;

3)消费者工具

/*** @Description 消费者工具* @author xiao tang* @version 1.0.0* @createTime 2021年12月12日*/
public enum MyConsumerUtils {/** 单例 */INSTANCE;/*** @description 获取主题分区与偏移量映射* @param topicPartitionOffsetArr 主题分区与偏移量数组* @return 主题分区与偏移量映射* @author xiao tang* @date 2021/12/12*/public static Map<TopicPartition, OffsetAndMetadata> getTopicPartitionOffsetMetadataMap(MyTopicPartitionOffset[] topicPartitionOffsetArr) {// 主题分区与偏移量映射Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetMetadataMap = new HashMap<>(topicPartitionOffsetArr.length);// 分区号大于-1,才是消费者接收的分区Arrays.stream(topicPartitionOffsetArr).filter(x->x.partition()>-1).forEach(x -> {topicPartitionOffsetMetadataMap.put(new TopicPartition(x.topic(), x.partition()), new OffsetAndMetadata(x.offset(), "no metadata"));});return topicPartitionOffsetMetadataMap;}
}

【2】生产者

/*** @Description 生产者* @author xiao tang* @version 1.0.0* @createTime 2021年12月03日*/
public class MyProducer {/** 主题 */public final static String TOPIC_NAME = "hello12";public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");/*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重试次数*/props.put(ProducerConfig.RETRIES_CONFIG, 0);/*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);/*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 超时时间props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);/*7. RecordAccumulator 缓冲区大小*/props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);/*8. key, value 的序列化类 */props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/** 设置压缩算法 */props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");/** 设置拦截器 */
//        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName()));/** 设置阻塞超时时间 */props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3600 * 1000);/* 9.创建生产者对象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props);/* 10.发送数据 */int order = 1;for (int i = 0; i < 10000; i++) {for (int j = 0; j < 3; j++) {Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, j, "", String.format("[%s] ", order++) + " > " + DataFactory.INSTANCE.genChar(5)));try {System.out.println("[生产者] 分区【" + future.get().partition() + "】-offset【" + future.get().offset() + "】");} catch (Exception e) {}}}/* 11.关闭资源 */producer.close();System.out.println("kafka生产者写入数据完成");}
}

【3】消费者

【3.1】带有均衡监听器的消费者1

/*** @Description 带有均衡监听器的消费者* @author xiao tang* @version 1.0.0* @createTime 2021年12月11日*/
public class MyConsumerWithRebalanceListener {public static void main(String[] args) {// 创建消费者配置信息Properties props = new Properties();// 属性配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME + "G1");//   关闭自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 设置消费消息的位置,消费最新消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");// 设置分区策略 (默认值-RangeAssignor)props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);int partitionSize = consumer.partitionsFor(MyProducer.TOPIC_NAME).size();// 创建分区偏移量数组并初始化 (仅考虑一个topic的情况)MyTopicPartitionOffset[] topicPartitionOffsetArr = new MyTopicPartitionOffset[partitionSize];IntStream.range(0, partitionSize).forEach(x -> topicPartitionOffsetArr[x] = new MyTopicPartitionOffset());// 订阅主题, 【传入分区再均衡监听器】consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME), new ConsumerBalanceListenerImpl(consumer, topicPartitionOffsetArr));// 循环拉取try {while (!Thread.interrupted()) {System.out.println(DateUtils.getNowTimestamp() + " > 带均衡监听器消费者等待消费消息");TimeUtils.sleep(1000);// 消费消息ConsumerRecords<String, String> consumerRds = consumer.poll(100);System.out.println("poll begin {");for (ConsumerRecord<String, String> rd : consumerRds) {System.out.println("消费者-WithRebalanceListener-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value());// 提交的偏移量,是 当前消息偏移量加1topicPartitionOffsetArr[rd.partition()].setAll(rd.topic(), rd.partition(), rd.offset() + 1);}System.out.println("poll end } ");// 【异步提交每个分区的偏移量】consumer.commitAsync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr), new OffsetCommitCallbackImpl());}} finally {try {// 【同步提交】 因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误consumer.commitSync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr));} finally {// 记得关闭消费者consumer.close();}}}
}

【3.2】 不带均衡监听器的消费者2 (测试用)

即一个普通消费者;

/*** @Description 不带均衡监听器的消费者* @author xiao tang* @version 1.0.0* @createTime 2021年12月11日*/
public class MyConsumerNoRebalanceListener {public static void main(String[] args) {// 创建消费者配置信息Properties props = new Properties();// 属性配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME + "G1");//   关闭自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 设置消费消息的位置,消费最新消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 设置分区策略 (默认值-RangeAssignor)props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题, 【没有分区再均衡监听器】consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME));// 循环拉取try {while(!Thread.interrupted()) {System.out.println(DateUtils.getNowTimestamp() + " > 没有均衡监听器的消费者等待消费消息");TimeUtils.sleep(1000);// 消费消息ConsumerRecords<String, String> consumerRds  = consumer.poll(100);for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("消费者-NoRebalanceListener-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value());}// 【异步提交】consumer.commitAsync(new OffsetCommitCallbackImpl());if (!consumerRds.isEmpty()) break;}} finally {try {// 【同步提交】 因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误consumer.commitSync();} finally {// 记得关闭消费者consumer.close();System.out.println("消费者关闭");}}}
}

我们可以发现,一旦消费者2消费了消息(消息不为空),就停止消费;

以便我们查看消费者2接收消息的偏移量是不是 消费者1的监听器在发生分区再均衡前提交的偏移量+1;


【4】测试

【4.1】测试步骤

step1) 运行生产者,发送消息到kafka;

step2) 运行 带有均衡监听器的消费者1 MyConsumerWithRebalanceListener, 消费消息;

在消费者订阅主题时,传入再均衡监听器;

// 订阅主题, 【传入分区再均衡监听器】
consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME)
, new ConsumerBalanceListenerImpl(consumer, topicPartitionOffsetArr));

step3)运行 不带均衡监听器的消费者2 MyConsumerNoRebalanceListener,消费消息;

一旦消费者2加入消费者组,就会发生分区再均衡,消费者1的某些分区所有权会转给消费者2,触发消费者1 的 监听器 ConsumerBalanceListenerImpl 的 onPartitionsRevoked() 方法;

  • 然后 onPartitionsRevoked()方法提交 消费者1处理的消息的偏移量后,就原地抛出异常停止运行;

【4.2】测试结果分析

1)消费者1(带分区再均衡监听器)的监听器最后提交的偏移量日志如下:

2021-12-12 10:23:30 > 带均衡监听器消费者等待消费消息
=== 分区再均衡触发onPartitionsRevoked()方法
提交偏移量信息,partition【0】offset【1296】
提交偏移量信息,partition【1】offset【1269】
提交偏移量信息,partition【2】offset【1269】

2)消费者2接收到的起始消息的偏移量日志如下(全部):

2021-12-12 10:23:27 > 没有均衡监听器的消费者等待消费消息
2021-12-12 10:23:32 > 没有均衡监听器的消费者等待消费消息
消费者-NoRebalanceListener-分区【0】offset【1296】值=[589]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1297】值=[592]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1298】值=[595]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1299】值=[598]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1300】值=[601]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1301】值=[604]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1302】值=[607]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1303】值=[610]  > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1304】值=[613]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1269】值=[510]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1270】值=[513]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1271】值=[516]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1272】值=[519]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1273】值=[522]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1274】值=[525]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1275】值=[528]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1276】值=[531]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1277】值=[534]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1269】值=[509]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1270】值=[512]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1271】值=[515]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1272】值=[518]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1273】值=[521]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1274】值=[524]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1275】值=[527]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1276】值=[530]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1277】值=[533]  > ABCDE
消费者关闭

即 监听器提交的偏移量为:

partition【0】offset【1296】
partition【1】offset【1269】
partition【2】offset【1269】

而普通消费者接收消息的起始偏移量为

消费者-NoRebalanceListener-分区【0】offset【1296】值=[589]  > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1269】值=[510]  > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1269】值=[509]  > ABCDE

所以,偏移量是可以对上的;即再均衡监听器在发生分区再均衡前提交的消息偏移量后, 其他消费者可以接收该偏移量指定的消息;

所以,本次再均衡监听器测试是成功的


【注意】

注意1)监听器提交的偏移量是接收消息的当前偏移量+1;(注意要加1,非常重要),即加1后的偏移量作为其他消费者轮序消费的起始位置;

  • 代码:偏移量+1参见  MyConsumerWithRebalanceListener 的 接收消息的循环中的代码,如下:
 // 提交的偏移量,是 当前消息偏移量加1
topicPartitionOffsetArr[rd.partition()].setAll(
rd.topic(), rd.partition(), rd.offset() + 1); 

注意2)本文代码参考了 《kafka权威指南》 page63,但书中代码有问题;

  • 在每次for循环中创建 TopicPartition对象和 OffsetAndMetadata对象,我觉得这是没有必要的,因为只有每个分区的最后一次消息的 topic,partition,offset,是有用的,但它每次循环都创建了对象,而且把 currentOffsets 放在了while循环外面,这样肯定会造成 oom;(本文仅记录了 topic,partition,offset,而没有创建对象,这是本文的优化点)
  • 当然了,原作者写的是参考代码,可以理解;但业务生产代码绝对不能这样写;

【References】

  • kakfa权威指南;

kafka再均衡监听器测试相关推荐

  1. Kafka 再均衡监听器示例

    Kafka 再均衡监听器示例 依赖 介绍 代码 生产者 生产任务 消费者 消费任务 再均衡监听器 结果 生产者 消费者 启动生产者之前 启动生产者后,第三线程关闭之前 第三线程关闭后,分区再平衡 分区 ...

  2. 【Kafka】kafka 再均衡监听器 ConsumerRebalanceListener

    1.概述 新版本 consumer 默认把位移提交到 consumer offsets 中 .其Kafka 也支持用户把位移提交到外部存储中,比如数据库中.若要实现这个功能,用户就必须使用 rebal ...

  3. 关于Kafka中的再均衡

    本文来说下Kafka中的再均衡 文章目录 概述 触发时机 协调者 交互方式 处理流程 本文小结 概述 在Kafka消费者的使用和原理中已经提到过"再均衡"的概念,我们先回顾下,一个 ...

  4. 学Kafka,就必须了解的再均衡问题!

    作者 | 草捏子 来源 | 草捏子(ID:chaycao) 头图 |  CSDN 下载自东方IC 在<Kafka消费者的使用和原理>中已经提到过"再均衡"的概念,我们先 ...

  5. Kafka | Kafka的消费再均衡是指什么?

    记录收藏一波强大网友的解读: https://www.cnblogs.com/luozhiyun/p/11909315.html 摘自原文: 就目前而言,一共有如下几种情形会触发再均衡的操作: 有新的 ...

  6. kafka消费者接收分区测试

    [README] 本文演示了当有新消费者加入组后,其他消费者接收分区情况: 本文还模拟了 broker 宕机的情况: 本文使用的是最新的 kafka3.0.0 : 本文测试案例,来源于 消费者接收分区 ...

  7. Kafka负载均衡策略

    分区器 分区器是生产者层面的负载均衡.Kafka 生产者生产消息时,根据分区器将消息投递到指定的分区中,所以 Kafka 的负载均衡很大程度上依赖于分区器. Kafka 默认的分区器是 Kafka 提 ...

  8. graylog+kafka+zookeeper(单机测试及源码),kafka+zookeeper组件部署(二)

    graylog+kafka+zookeeper(单机测试及源码),kafka+zookeeper组件部署(二) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylo ...

  9. graylog+kafka+zookeeper(单机测试及源码),微服务日志查询使用(七)

    graylog+kafka+zookeeper(单机测试及源码),微服务日志查询使用(七) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylog组件部署,查找问题 ...

最新文章

  1. CCS下DSP仿真实现双边带调制与频谱分析(查表法)
  2. 【文本描述增强】标准屏幕字段文本描述更改增强
  3. php分页功能乱码了怎么办,51、PHP文件内容分页操作,避免乱码
  4. php不能显示验证码
  5. sounds speech_speech sounds
  6. C++中若类中没有默认构造函数,如何使用对象数组
  7. js 正则表达式判断价格
  8. [leetcode]476. 数字的补数
  9. PHP中文分词扩展 SCWS
  10. SAP 创始人:打造“一连串的应用程序”
  11. 3726.调整数组-AcWing题库
  12. 一文剖析电影“流浪地球”推广营销方式
  13. python 找色点击_[原创]python实现触动精灵引擎找色找字函数,为机器学习添加手脚...
  14. 2的99次方用计算机,神奇的平方计算口诀!让孩子秒变计算机!
  15. 传统的期货交易方式为计算机撮合成交,[期货知识]期货价格是怎么形成的-计算机撮合成交 - 南方财富网...
  16. 疫情期间,程序员是如何靠副业赚钱的?​
  17. java:熊怪吃核桃
  18. Keil软件介绍与烧录工具的使用
  19. 16,甲流疫情死亡率
  20. rk3288 RTC实现定时开关机

热门文章

  1. 1143 Lowest Common Ancestor 甲级
  2. 2020牛客暑期多校训练营(第二场)
  3. Codeforces Round #764 (Div. 3)
  4. [UOJ299][CTSC2017] 游戏
  5. [杂题训练]CF1228E Another Filling the Grid(容斥),CF936C Lock Puzzle(构造)
  6. P3629-[APIO2010]巡逻【树的直径】
  7. 【二分】递增(luogu 3902)
  8. 初一模拟赛总结(3.16)
  9. 实践出真知之Spring Cloud之基于Eureka、Ribbon、Feign的真实案例
  10. mybatis入门(七)之日志