kafka再均衡监听器测试
【README】
- 本文使用的kafka是最新的 kafka3.0.0;本文kafka集群有3个节点分别是 centos201, centos202, centos203 ; brokerid 分别为 1,2,3;
- 本文主要用于测试 再均衡监听器;当有新消费者加入时,会发生分区再均衡;再均衡前就会调用再均衡监听器的 onPartitionsRevoked()方法;
- 本文的测试主题 hello12,有3个分区,每个分区2个副本;
【1】再均衡监听器
1)应用场景
在消费者退出或进行分区再均衡前,会做一些清理工作,如提交偏移量,或关闭数据库连接;这些工作可以通过监听器来实现;
2)再均衡监听器 ConsumerRebalanceListener
实现 该监听器即可,有3个方法
- onPartitionsRevoked:在分区均衡开始【前】和消费者停止读取消息【后】被调用;
- onPartitionsAssigned:分区再均衡【后】和消费者开始读取消息【前】被调用 ;
- onPartitionsLost:分区宕机时调用(本文不涉及);
/*** @Description 消费者分区再均衡监听器实现类* @author xiao tang* @version 1.0.0* @createTime 2021年12月11日*/
public class ConsumerBalanceListenerImpl implements ConsumerRebalanceListener {/** 消费者 */private Consumer consumer;/** 主题分区偏移量数组 */private MyTopicPartitionOffset[] topicPartitionOffsetArr;/*** @description 构造器* @param consumer 消费者* @param curOffsets 当前偏移量* @author xiao tang* @date 2021/12/11*/public ConsumerBalanceListenerImpl(Consumer consumer, MyTopicPartitionOffset[] topicPartitionOffsetArr) {this.consumer = consumer;this.topicPartitionOffsetArr = topicPartitionOffsetArr;}/** * @description 在分区均衡开始【前】和消费者停止读取消息【后】被调用* @param partitions 分区列表(分区号从0开始计数)* @author xiao tang* @date 2021/12/12 */@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {System.out.println("=== 分区再均衡触发onPartitionsRevoked()方法");// 提交偏移量回调(或记录错误日志)OffsetCommitCallback offsetCommitCallback = new OffsetCommitCallbackImpl();// 打印日志Arrays.stream(topicPartitionOffsetArr).filter(x->x.partition()>-1).forEach(x->System.out.printf("提交偏移量信息,partition【%d】offset【%s】\n", x.partition(), x.offset()));// 把数组转为主题分区与偏移量映射,并提交最后一次处理的偏移量 (可以异步,也可以同步)// 同步提交一直重试或报超时异常// 异步提交传入提交回调,失败自行处理consumer.commitAsync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr), offsetCommitCallback);// 停止程序的原因在于做实验,下次从本次提交的偏移量开始消费throw new RuntimeException("发生分区再均衡,程序结束");}/*** @description 分区再均衡【后】和消费者开始读取消息【前】被调用* @param partitions 主题分区列表* @author xiao tang* @date 2021/12/12*/@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {// do sth}@Overridepublic void onPartitionsLost(Collection<TopicPartition> partitions) {ConsumerRebalanceListener.super.onPartitionsLost(partitions);}
}
为了测试,分区再均衡监听器中,onPartitionsRevoked() 方法提交最后已消费消息的偏移量后,就抛出运行时异常结束运行,让其他消费者消费以便查看监听器是否成功提交偏移量;
3)消费者工具
/*** @Description 消费者工具* @author xiao tang* @version 1.0.0* @createTime 2021年12月12日*/
public enum MyConsumerUtils {/** 单例 */INSTANCE;/*** @description 获取主题分区与偏移量映射* @param topicPartitionOffsetArr 主题分区与偏移量数组* @return 主题分区与偏移量映射* @author xiao tang* @date 2021/12/12*/public static Map<TopicPartition, OffsetAndMetadata> getTopicPartitionOffsetMetadataMap(MyTopicPartitionOffset[] topicPartitionOffsetArr) {// 主题分区与偏移量映射Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetMetadataMap = new HashMap<>(topicPartitionOffsetArr.length);// 分区号大于-1,才是消费者接收的分区Arrays.stream(topicPartitionOffsetArr).filter(x->x.partition()>-1).forEach(x -> {topicPartitionOffsetMetadataMap.put(new TopicPartition(x.topic(), x.partition()), new OffsetAndMetadata(x.offset(), "no metadata"));});return topicPartitionOffsetMetadataMap;}
}
【2】生产者
/*** @Description 生产者* @author xiao tang* @version 1.0.0* @createTime 2021年12月03日*/
public class MyProducer {/** 主题 */public final static String TOPIC_NAME = "hello12";public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");/*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重试次数*/props.put(ProducerConfig.RETRIES_CONFIG, 0);/*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);/*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 超时时间props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);/*7. RecordAccumulator 缓冲区大小*/props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);/*8. key, value 的序列化类 */props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/** 设置压缩算法 */props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");/** 设置拦截器 */
// props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName()));/** 设置阻塞超时时间 */props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3600 * 1000);/* 9.创建生产者对象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props);/* 10.发送数据 */int order = 1;for (int i = 0; i < 10000; i++) {for (int j = 0; j < 3; j++) {Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, j, "", String.format("[%s] ", order++) + " > " + DataFactory.INSTANCE.genChar(5)));try {System.out.println("[生产者] 分区【" + future.get().partition() + "】-offset【" + future.get().offset() + "】");} catch (Exception e) {}}}/* 11.关闭资源 */producer.close();System.out.println("kafka生产者写入数据完成");}
}
【3】消费者
【3.1】带有均衡监听器的消费者1
/*** @Description 带有均衡监听器的消费者* @author xiao tang* @version 1.0.0* @createTime 2021年12月11日*/
public class MyConsumerWithRebalanceListener {public static void main(String[] args) {// 创建消费者配置信息Properties props = new Properties();// 属性配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME + "G1");// 关闭自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 设置消费消息的位置,消费最新消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");// 设置分区策略 (默认值-RangeAssignor)props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);int partitionSize = consumer.partitionsFor(MyProducer.TOPIC_NAME).size();// 创建分区偏移量数组并初始化 (仅考虑一个topic的情况)MyTopicPartitionOffset[] topicPartitionOffsetArr = new MyTopicPartitionOffset[partitionSize];IntStream.range(0, partitionSize).forEach(x -> topicPartitionOffsetArr[x] = new MyTopicPartitionOffset());// 订阅主题, 【传入分区再均衡监听器】consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME), new ConsumerBalanceListenerImpl(consumer, topicPartitionOffsetArr));// 循环拉取try {while (!Thread.interrupted()) {System.out.println(DateUtils.getNowTimestamp() + " > 带均衡监听器消费者等待消费消息");TimeUtils.sleep(1000);// 消费消息ConsumerRecords<String, String> consumerRds = consumer.poll(100);System.out.println("poll begin {");for (ConsumerRecord<String, String> rd : consumerRds) {System.out.println("消费者-WithRebalanceListener-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value());// 提交的偏移量,是 当前消息偏移量加1topicPartitionOffsetArr[rd.partition()].setAll(rd.topic(), rd.partition(), rd.offset() + 1);}System.out.println("poll end } ");// 【异步提交每个分区的偏移量】consumer.commitAsync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr), new OffsetCommitCallbackImpl());}} finally {try {// 【同步提交】 因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误consumer.commitSync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr));} finally {// 记得关闭消费者consumer.close();}}}
}
【3.2】 不带均衡监听器的消费者2 (测试用)
即一个普通消费者;
/*** @Description 不带均衡监听器的消费者* @author xiao tang* @version 1.0.0* @createTime 2021年12月11日*/
public class MyConsumerNoRebalanceListener {public static void main(String[] args) {// 创建消费者配置信息Properties props = new Properties();// 属性配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME + "G1");// 关闭自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 设置消费消息的位置,消费最新消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 设置分区策略 (默认值-RangeAssignor)props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题, 【没有分区再均衡监听器】consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME));// 循环拉取try {while(!Thread.interrupted()) {System.out.println(DateUtils.getNowTimestamp() + " > 没有均衡监听器的消费者等待消费消息");TimeUtils.sleep(1000);// 消费消息ConsumerRecords<String, String> consumerRds = consumer.poll(100);for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("消费者-NoRebalanceListener-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value());}// 【异步提交】consumer.commitAsync(new OffsetCommitCallbackImpl());if (!consumerRds.isEmpty()) break;}} finally {try {// 【同步提交】 因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误consumer.commitSync();} finally {// 记得关闭消费者consumer.close();System.out.println("消费者关闭");}}}
}
我们可以发现,一旦消费者2消费了消息(消息不为空),就停止消费;
以便我们查看消费者2接收消息的偏移量是不是 消费者1的监听器在发生分区再均衡前提交的偏移量+1;
【4】测试
【4.1】测试步骤
step1) 运行生产者,发送消息到kafka;
step2) 运行 带有均衡监听器的消费者1 MyConsumerWithRebalanceListener, 消费消息;
在消费者订阅主题时,传入再均衡监听器;
// 订阅主题, 【传入分区再均衡监听器】
consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME)
, new ConsumerBalanceListenerImpl(consumer, topicPartitionOffsetArr));
step3)运行 不带均衡监听器的消费者2 MyConsumerNoRebalanceListener,消费消息;
一旦消费者2加入消费者组,就会发生分区再均衡,消费者1的某些分区所有权会转给消费者2,触发消费者1 的 监听器 ConsumerBalanceListenerImpl 的 onPartitionsRevoked() 方法;
- 然后 onPartitionsRevoked()方法提交 消费者1处理的消息的偏移量后,就原地抛出异常停止运行;
【4.2】测试结果分析
1)消费者1(带分区再均衡监听器)的监听器最后提交的偏移量日志如下:
2021-12-12 10:23:30 > 带均衡监听器消费者等待消费消息
=== 分区再均衡触发onPartitionsRevoked()方法
提交偏移量信息,partition【0】offset【1296】
提交偏移量信息,partition【1】offset【1269】
提交偏移量信息,partition【2】offset【1269】
2)消费者2接收到的起始消息的偏移量日志如下(全部):
2021-12-12 10:23:27 > 没有均衡监听器的消费者等待消费消息
2021-12-12 10:23:32 > 没有均衡监听器的消费者等待消费消息
消费者-NoRebalanceListener-分区【0】offset【1296】值=[589] > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1297】值=[592] > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1298】值=[595] > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1299】值=[598] > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1300】值=[601] > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1301】值=[604] > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1302】值=[607] > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1303】值=[610] > ABCDE
消费者-NoRebalanceListener-分区【0】offset【1304】值=[613] > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1269】值=[510] > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1270】值=[513] > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1271】值=[516] > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1272】值=[519] > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1273】值=[522] > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1274】值=[525] > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1275】值=[528] > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1276】值=[531] > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1277】值=[534] > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1269】值=[509] > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1270】值=[512] > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1271】值=[515] > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1272】值=[518] > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1273】值=[521] > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1274】值=[524] > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1275】值=[527] > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1276】值=[530] > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1277】值=[533] > ABCDE
消费者关闭
即 监听器提交的偏移量为:
partition【0】offset【1296】
partition【1】offset【1269】
partition【2】offset【1269】
而普通消费者接收消息的起始偏移量为
消费者-NoRebalanceListener-分区【0】offset【1296】值=[589] > ABCDE
消费者-NoRebalanceListener-分区【2】offset【1269】值=[510] > ABCDE
消费者-NoRebalanceListener-分区【1】offset【1269】值=[509] > ABCDE
所以,偏移量是可以对上的;即再均衡监听器在发生分区再均衡前提交的消息偏移量后, 其他消费者可以接收该偏移量指定的消息;
所以,本次再均衡监听器测试是成功的;
【注意】
注意1)监听器提交的偏移量是接收消息的当前偏移量+1;(注意要加1,非常重要),即加1后的偏移量作为其他消费者轮序消费的起始位置;
- 代码:偏移量+1参见 MyConsumerWithRebalanceListener 的 接收消息的循环中的代码,如下:
// 提交的偏移量,是 当前消息偏移量加1
topicPartitionOffsetArr[rd.partition()].setAll(
rd.topic(), rd.partition(), rd.offset() + 1);
注意2)本文代码参考了 《kafka权威指南》 page63,但书中代码有问题;
- 在每次for循环中创建 TopicPartition对象和 OffsetAndMetadata对象,我觉得这是没有必要的,因为只有每个分区的最后一次消息的 topic,partition,offset,是有用的,但它每次循环都创建了对象,而且把 currentOffsets 放在了while循环外面,这样肯定会造成 oom;(本文仅记录了 topic,partition,offset,而没有创建对象,这是本文的优化点)
- 当然了,原作者写的是参考代码,可以理解;但业务生产代码绝对不能这样写;
【References】
- kakfa权威指南;
kafka再均衡监听器测试相关推荐
- Kafka 再均衡监听器示例
Kafka 再均衡监听器示例 依赖 介绍 代码 生产者 生产任务 消费者 消费任务 再均衡监听器 结果 生产者 消费者 启动生产者之前 启动生产者后,第三线程关闭之前 第三线程关闭后,分区再平衡 分区 ...
- 【Kafka】kafka 再均衡监听器 ConsumerRebalanceListener
1.概述 新版本 consumer 默认把位移提交到 consumer offsets 中 .其Kafka 也支持用户把位移提交到外部存储中,比如数据库中.若要实现这个功能,用户就必须使用 rebal ...
- 关于Kafka中的再均衡
本文来说下Kafka中的再均衡 文章目录 概述 触发时机 协调者 交互方式 处理流程 本文小结 概述 在Kafka消费者的使用和原理中已经提到过"再均衡"的概念,我们先回顾下,一个 ...
- 学Kafka,就必须了解的再均衡问题!
作者 | 草捏子 来源 | 草捏子(ID:chaycao) 头图 | CSDN 下载自东方IC 在<Kafka消费者的使用和原理>中已经提到过"再均衡"的概念,我们先 ...
- Kafka | Kafka的消费再均衡是指什么?
记录收藏一波强大网友的解读: https://www.cnblogs.com/luozhiyun/p/11909315.html 摘自原文: 就目前而言,一共有如下几种情形会触发再均衡的操作: 有新的 ...
- kafka消费者接收分区测试
[README] 本文演示了当有新消费者加入组后,其他消费者接收分区情况: 本文还模拟了 broker 宕机的情况: 本文使用的是最新的 kafka3.0.0 : 本文测试案例,来源于 消费者接收分区 ...
- Kafka负载均衡策略
分区器 分区器是生产者层面的负载均衡.Kafka 生产者生产消息时,根据分区器将消息投递到指定的分区中,所以 Kafka 的负载均衡很大程度上依赖于分区器. Kafka 默认的分区器是 Kafka 提 ...
- graylog+kafka+zookeeper(单机测试及源码),kafka+zookeeper组件部署(二)
graylog+kafka+zookeeper(单机测试及源码),kafka+zookeeper组件部署(二) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylo ...
- graylog+kafka+zookeeper(单机测试及源码),微服务日志查询使用(七)
graylog+kafka+zookeeper(单机测试及源码),微服务日志查询使用(七) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylog组件部署,查找问题 ...
最新文章
- CCS下DSP仿真实现双边带调制与频谱分析(查表法)
- 【文本描述增强】标准屏幕字段文本描述更改增强
- php分页功能乱码了怎么办,51、PHP文件内容分页操作,避免乱码
- php不能显示验证码
- sounds speech_speech sounds
- C++中若类中没有默认构造函数,如何使用对象数组
- js 正则表达式判断价格
- [leetcode]476. 数字的补数
- PHP中文分词扩展 SCWS
- SAP 创始人:打造“一连串的应用程序”
- 3726.调整数组-AcWing题库
- 一文剖析电影“流浪地球”推广营销方式
- python 找色点击_[原创]python实现触动精灵引擎找色找字函数,为机器学习添加手脚...
- 2的99次方用计算机,神奇的平方计算口诀!让孩子秒变计算机!
- 传统的期货交易方式为计算机撮合成交,[期货知识]期货价格是怎么形成的-计算机撮合成交 - 南方财富网...
- 疫情期间,程序员是如何靠副业赚钱的?​
- java:熊怪吃核桃
- Keil软件介绍与烧录工具的使用
- 16,甲流疫情死亡率
- rk3288 RTC实现定时开关机
热门文章
- 1143 Lowest Common Ancestor 甲级
- 2020牛客暑期多校训练营(第二场)
- Codeforces Round #764 (Div. 3)
- [UOJ299][CTSC2017] 游戏
- [杂题训练]CF1228E Another Filling the Grid(容斥),CF936C Lock Puzzle(构造)
- P3629-[APIO2010]巡逻【树的直径】
- 【二分】递增(luogu 3902)
- 初一模拟赛总结(3.16)
- 实践出真知之Spring Cloud之基于Eureka、Ribbon、Feign的真实案例
- mybatis入门(七)之日志