1. 取值及定义#
    auto.offset.reset有以下三个可选值:

latest (默认)
earliest
none
三者均有共同定义:
对于同一个消费者组,若已有提交的offset,则从提交的offset开始接着消费

意思就是,只要这个消费者组消费过了,不管auto.offset.reset指定成什么值,效果都一样,每次启动都是已有的最新的offset开始接着往后消费

不同的点为:

latest(默认):对于同一个消费者组,若没有提交过offset,则只消费消费者连接topic后,新产生的数据
就是说如果这个topic有历史消息,现在新启动了一个消费者组,且auto.offset.reset=latest,此时已存在的历史消息无法消费到,那保持消费者组运行,如果此时topic有新消息进来了,这时新消息才会被消费到。而一旦有消费,则必然会提交offset
这时候如果该消费者组意外下线了,topic仍然有消息进来,接着该消费者组在后面恢复上线了,它仍然可以从下线时的offset处开始接着消费,此时走的就是共同定义

earliest:对于同一个消费者组,若没有提交过offset,则从头开始消费
就是说如果这个topic有历史消息存在,现在新启动了一个消费者组,且auto.offset.reset=earliest,那将会从头开始消费,这就是与latest不同之处。
一旦该消费者组消费过topic后,此时就有该消费者组的offset了,这种情况下即使指定了auto.offset.reset=earliest,再重新启动该消费者组,效果是与latest一样的,也就是此时走的是共同的定义

none:对于同一个消费者组,若没有提交过offset,会抛异常
一般生产环境基本用不到该参数

  1. 新建全新topic#
    ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic TestOffsetResetTopic --partitions 1 --replication-factor 1 --create

  2. 往新建的topic发送消息#
    便于测试,用Java代码发送5条消息

public class TestProducer {

public static void main(String[] args) throws InterruptedException {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(properties);String topic = "TestOffsetResetTopic";for (int i = 0; i < 5; i++) {String value = "message_" + i + "_" + LocalDateTime.now();System.out.println("Send value: " + value);producer.send(new ProducerRecord<>(topic, value), (metadata, exception) -> {if (exception == null) {String str = MessageFormat.format("Send success! topic: {0}, partition: {1}, offset: {2}", metadata.topic(), metadata.partition(), metadata.offset());System.out.println(str);}});Thread.sleep(500);}producer.close();
}

}

发送消息成功:

Send value: message_0_2022-09-16T18:26:15.943749600
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 0
Send value: message_1_2022-09-16T18:26:17.066608900
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 1
Send value: message_2_2022-09-16T18:26:17.568667200
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 2
Send value: message_3_2022-09-16T18:26:18.069093600
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 3
Send value: message_4_2022-09-16T18:26:18.583288100
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 4

现在TestOffsetResetTopic这个topic有5条消息,且还没有任何消费者组对其进行消费过,也就是没有任何offset

  1. 测试latest#
    已知topic已经存在5条历史消息,此时启动一个消费者

public class TestConsumerLatest {

public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 指定消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");// 设置 auto.offset.resetproperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");String topic = "TestOffsetResetTopic";KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Collections.singletonList(topic));// 消费数据while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}

}

发现如上面所述,历史已存在的5条消息不会消费到,消费者没有任何动静,现在保持消费者在线

启动TestProducer再发5条消息,会发现这后面新发的,offset从5开始的消息就被消费了

ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1663329725731, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_0_2022-09-16T20:02:05.523581500)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1663329726251, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_1_2022-09-16T20:02:06.251399400)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1663329726764, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_2_2022-09-16T20:02:06.764186200)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1663329727264, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_3_2022-09-16T20:02:07.264268500)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 9, CreateTime = 1663329727778, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_4_2022-09-16T20:02:07.778469700)

此时该消费者组对于这个topic的offset已经为9了,现在停掉这个消费者(下线),再启动TestProducer发5条消息,接着再启动TestConsumerLatest,会发现紧接上一次的offset之后开始,即从10继续消费

如果测试发现没动静,请多等一会,估计机器性能太差…

  1. 测试earliest#
    新建一个测试消费者,设置auto.offset.reset为earliest,注意groupid为新的group2,表示对于topic来说是全新的消费者组

public class TestConsumerEarliest {

public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 指定消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");// 设置 auto.offset.resetproperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");String topic = "TestOffsetResetTopic";KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Collections.singletonList(topic));// 消费数据while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}

}

一运行发现已有的10条消息(最开始5条加上面一次测试又发了5条,一共10条)是可以被消费到的,且消费完后,对于这个topic就已经有了group2这个组的offset了,无论之后启停,只要groupid不变,都会从最新的offset往后开始消费

  1. 测试none#
    新建一个测试消费者,设置auto.offset.reset为none,注意groupid为新的group3,表示对于topic来说是全新的消费者组

public class TestConsumerNone {

public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 指定消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group3");// 设置 auto.offset.resetproperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");String topic = "TestOffsetResetTopic";KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Collections.singletonList(topic));// 消费数据while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}

}

一运行,程序报错,因为对于topic来说是全新的消费者组,且又指定了auto.offset.reset为none,直接抛异常,程序退出

Exception in thread “main” org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [TestOffsetResetTopic-0]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:706)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2434)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1266)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at kakfa.TestConsumerNone.main(TestConsumerNone.java:31)

  1. 总结#
    如果topic已经有历史消息了,又需要消费这些历史消息,则必须要指定一个从未消费过的消费者组,同时指定auto.offset.reset为earliest,才可以消费到历史数据,之后就有提交offset。有了offset,无论是earliest还是latest,效果都是一样的了。
    如果topic没有历史消息,或者不需要处理历史消息,那按照默认latest即可。

kafka的auto.offset.reset详解与测试相关推荐

  1. Kafka之auto.offset.reset值解析

    今日在使用kafka时,发现将 auto.offset.reset 设置为earliest.latest.none 都没有达到自己预期的效果. earliest: 当各分区下有已提交的offset时, ...

  2. kafka consumer消费者 offset groupID详解

    kafka consumer:消费者可以从多个broker中读取数据.消费者可以消费多个topic中的数据. 因为Kafka的broker是无状态的,所以consumer必须使用partition o ...

  3. Kafka auto.offset.reset

    要从头消费kafka的数据,可以通过以下参数: Kafka auto.offset.reset = earliest 转载于:https://www.cnblogs.com/drjava/p/1045 ...

  4. kafka auto.offset.reset设置earliest从头开始消费

    auto.offset.reset设置为earliest spring:kafka:bootstrap-servers: 192.168.?.x:9092 consumer:auto-offset-r ...

  5. kafka_2.11-0.10.2.1中的auto.offset.reset

    在使用spark连接kafka消费topic时,发现无论怎么设置,也无法从头开始消费. 查看配置得出auto.offset.reset的以下3种设置及含义: earliest 当各分区下有已提交的of ...

  6. 基于Confluent.Kafka实现的Kafka客户端操作类使用详解

    一.引言 有段时间没有写东西了,当然不是没得写,还有MongoDB的系列没有写完呢,那个系列还要继续.今天正好是周末,有点时间,来写新东西吧.最近公司用了Kafka做为消息的中间件,最开始写的那个版本 ...

  7. kafka控制台模拟消费_Kafka 详解

    kafka简介 Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可 ...

  8. kafka部分属性配置字段详解

    broker配置 常规配置 broker.id默认值是0,可以被设置成任意整数,但是在kafka集群中必须是唯一的.建议设置成与机器名具有相关性的整数 prot kafka默认监听9092端口 zoo ...

  9. kafka consumer 总结及配置详解学习

    目录 1.Consumer Group 与 topic 订阅 1.1 Consumer 与 partition 1.2 Consumer 与Consumer Group 1.3 Coordinator ...

最新文章

  1. P1066 2^k进制数 NOIP 2006 提高组 第四题
  2. http响应Last-Modified和ETag以及Apache和Nginx中的配置
  3. c语言作业扩展名通常为什么,C语言的源程序通常的扩展名是( )
  4. 临时目录 Path.GetTempFileName()
  5. 【Linux】39.nslookup查看域名与其对应的ip
  6. wxWidgets:wxPaintEvent类用法
  7. 《剑指offer》求1+2+3+...n(不用if、else、乘除等)
  8. mysql 优化not null_mysql 优化之11:尽可能的使用 NOT NULL
  9. 字符串equal_Java String:字符串常量池
  10. mysql根据外键多条件查询_MySQL练习-主外键多表查询
  11. mysql重点知识提取
  12. 第9章 逻辑回归 学习笔记 下
  13. Java随意输入字符串,如果含有$将该字符及之前一个字符的去掉
  14. eclipse开发首选项
  15. 嵌入式FCT项目案例分享(STM32)
  16. CPU FPU DSP MPU的概念介绍
  17. 独特的电子邮箱地址-LeetCode练习(Java实现)
  18. 27 个 CSS 案例演示和 DEMO
  19. 香港机场到市区,更方便更便宜的路线
  20. LCD屏应用--笔记

热门文章

  1. MIT6.824-lab2B-2022篇(万字推导思路及代码构建)
  2. 主流优秀BLOG程序
  3. Adobe软件存在漏洞 危及所有网络视频业务
  4. Retrofit跟OkHttp源码分析
  5. 聊下2023年的计划
  6. SQLite嵌入式数据库(转)
  7. mac上好用的写作工具:ia writer 中文版
  8. 前端3D技术起源与发展
  9. 关于FIR滤波器的系数
  10. 基于 Sobol 序列和纵横交叉策略的麻雀搜索算法-附代码