Kafka Consumer 位移提交
位移的概念
每个 consumer 实例都会为它消费的分区维护属于自己的位置信息来记录当前消费了多少条消息 。在 Kafka 中,这叫位移 Offset。消费位移记录了 Consumer 要消费的下一条消息的位移。
consumer group 使用一个长整型保存 offset。同时 Kafka consumer 还引入了检查点机制( checkpointing)定期对 offset 进行持久化,从而简化了应答机制的实现 。 Kafka consumer 在内部使用一个 map 来保存其订阅 topic 所属分区的 offset。
位移提交
consumer 客户端需要定期地向 Kafka 集群汇报自己消费数据的进度,这一过程被称为位移提交(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。consumer 把位移提交到 Kafka 的一个内部 topic([[__consumer_offsets]])上。
提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。
位移提交的语义保障
假设你的 Consumer 消费了 10 条消息,你提交的位移值却是 50,那么从理论上讲,位移介于 11~49 之间的消息是有可能丢失的;相反地,如果你提交的位移值是 5,那么位移介于 5~9 之间的消息就有可能被重复消费。Kafka 只会 “无脑” 地接受你提交的位移。你对位移提交的管理直接影响了你的 Consumer 所能提供的消息语义保障。
位移管理
consumer 会在 Kafka 集群的所有 broker 中选择一个 broker 作为 consumer group 的
coordinator,用于实现组成员管理、消费分配方案制定以及提交位移等 。 为每个组选择对应
coordinator 的依据就是 [[__consumer_offsets]]。 和普通的 Kafka topic 相同,该 topic 配置有多个分区,每个分区有多个副本。它存在的唯一目的就是保存 consumer 提交的位移。
当消费者组首次启动时,由于没有初始的位移信息, coordinator 必须为其确定初始位移值,
这就是 consumer 参数 [[消费者参数#auto offset reset|auto.offset.reset]] 的作用。通常情况下, consumer 要么从最早的位移开始读取,要么从最新的位移开始读取 。
当 consumer 运行了一段时间之后,它必须要提交自己的位移值 。 如果 consumer 崩溃或被
关闭,它负责的分区就会被分配给其他 consumer,因此一定要在其他 consumer 读取这些分区前就做好位移提交工作,否则会出现消息的重复消费。
consumer 提交位移的主要机制是通过向所属的 coordinator 发送位移提交请求来实现的 。
每个位移提交请求都会往 [[__consumer_offsets]]) 对应分区上追加写入一条消息 。 消息的 key 是 group.id 、 topic 和分区的元组,而 value 就是位移值 。 如果 consumer 为同一个 group 的同一个 topic 分区提交了多次位移,那么[[__consumer_offsets]]) 对应的分区上就会有若干条 key 相同但 value 不同的消息,但显然我们只关心最新一次提交的那条消息。从某种程度来说,只有最新提交的位移值是有效的,其他消息包含的位移值其实都已经过期了 。 Kafka 通过压实(compact)策略来处理这种消息使用模式。
位移提交
自动提交
默认情况下( enable.auto.commit = true
),consumer 是自动提交位移的,自动提交间隔是 5 秒。通过设置 auto.commit.interval.ms
参数可以控制自动提交的间隔。
自动提交的缺点
假设现在自动提交间隔是 5 秒,并且在提交位移之后的 3 秒发生了 [[Rebalance]] 操作。在 [[Rebalance]] 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然可以通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。
手动提交
手动位移提交就是用户自行确定消息何时被真正处理完并可以提交位移 。手动提交位移 API 进一步细分为同步手动提交和异步手动提交,即 commitSync 和 commitAsync 方法。
commitSync()
如果调用的是 commitSync,用户程序会等待位移提交结束才执行下一条语句命令。调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。这可能会影响整个应用程序的 TPS。如果提交过程中出现异常,该方法会将异常信息抛出。
while (true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));// 执行业务逻辑process(records);try {consumer.commitSync();} catch (CommitFailedException e) {handleAndLog(e);}
}
commitAsync()
commitAsync() 是一个异步非阻塞调用。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。Kafka 给 commitAsync() 方法提供了回调函数。
while (true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));// 执行业务逻辑process(records);consumer.commitAsync((offsets,e)->{if(e != null){handleAndLog(e);}});
}
consumer 在后续 poll 调用时轮询该位移提交的结果。特别注意的是,这里的异步提交位移不是指 consumer 使用单独的线程进行位移提交。实际上 consumer 依然会在用户主线程的 poll 方法中不断轮询这次异步提交的结果。只是该提交发起时此方法是不会阻塞的,因而被称为异步提交。commitAsync() 相较于 commitSync(),它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经 「过期」 或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。
当用户调用 consumer.commitSync() 或 consumer.commitAsync() 时, consumer 会为所有它订阅的分区提交位移。
结合 commitSync()和 commitAsync()
try {while(true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));process(records);// 异步提交commitAysnc();}}catch(Exception e) {handle(e);} finally {try {// 同步提交consumer.commitSync();} finally {consumer.close();}}
这段代码同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性。
细粒度提交位移
commitSync() 和 commitAsync() 方法还有另外带参数的重载方法 commitSync(Map<TopicPartition, OffsetAndMetadata>)
和 commitAsync(Map<TopicPartition, OffsetAndMetadata>)
。用户调用这个版本的方法时需要指定一个 Map 显式地告诉 Kafka 为哪些分区提交位移。它们的参数是一个 Map 对象,键就是 TopicPartition,即消费的分区,而值是一个 OffsetAndMetadata 对象,保存的主要是位移数据。
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();int count = 0;while (true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record: records) {process(record);offsets.put(new TopicPartition(record.topic(), record.partition()),// 要提交下一条消息的位移,所以要 +1new OffsetAndMetadata(record.offset() + 1);// 每 100 条记录提交一次if(count % 100 == 0){consumer.commitAsync(offsets, null);}count++;}}
Kafka Consumer 位移提交相关推荐
- Kafka Consumer位移(Offset)提交——解决Consumer重复消费和消息丢失问题
本文目录 1.Consumer 位移(offset) 1.2 位移(offset)的作用 2. 位移(offset)提交导致的问题 2.1 消息丢失 2.2 消息重复消费 3 Consumer位移提交 ...
- kafka 同步提交 异步_Kafka 位移提交那些事儿
最近,在维护公司的一个 Kafka 消息转发器的项目,这个项目主要是为了转发不同部门不同系统的消息队列之间的消息,包括从 RocketMQ 转入到 Kafka, 从 Kafka 转出到 RocketM ...
- Kafka consumer group位移0ffset重设
本文阐述如何使用Kafka自带的kafka-consumer-groups.sh脚本随意设置消费者组(consumer group)的位移.需要特别强调的是, 这是0.11.0.0版本提供的新功能且只 ...
- Kafka:Consumer手动提交offset
在上一篇博客中介绍了使用Consumer订阅多个Topic或者多个Partition: Kafka:Consumer订阅 在上一篇博客的测试样例中,Consumer都是自动提交offset,这是通过下 ...
- Kafka Consumer多线程消费
概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...
- Kafka Consumer多线程实例
Kafka 0.9版本开始推出了Java版本的consumer,优化了coordinator的设计以及摆脱了对zookeeper的依赖.社区最近也在探讨正式用这套consumer API替换Scala ...
- Kafka consumer
Kafka consumer consumer概览 消费者组 消费者组定义:消费者使用一个消费者组名(即group.id)来标记自己,topic的每条消息都只会被发送到每个订阅它的消费者组的一个消费者 ...
- Kafka Consumer Rebalance详解
全网最全大数据面试提升手册! 文章目录 Kafka版本 rebalance rebalance策略 rebalance generation rebalance协议 rebalance流程 rebal ...
- 读Kafka Consumer源码
最近一直在关注阿里的一个开源项目:OpenMessaging OpenMessaging, which includes the establishment of industry guideline ...
最新文章
- 简析正则表达式的使用
- 每日一皮:雷神索尔的锤子为什么这么重?
- jdbc connection为什么放在webINF的lib里面
- 解决在IOS系统及微信中audio、video不能自动播放的问题
- 回文字符串—回文子串—暴力破解法
- 保障了罗振宇跨年演讲的PTS铂金版正式上线,产品体验全新升级
- 速度是 macOS 的两倍?首个支持 M1 Mac 的 Linux 发行版终于出现
- 搜索python代码的软件_python小说爬虫工具,小说搜索下载软件附源码
- Java高并发架构设计
- 数据库配置文件,db.properties、jdbc.properties
- 河北安新复合型水稻 国稻种芯·中国水稻节:雄安生态示范区
- Jetson TK1 刷机安装Ubuntu系统与Mini PCI-e无线网卡
- Airbnb是如何创造更好的邮件体验的
- 程序员养生书单,九本必读养生书籍,颈椎按摩,脊椎按摩,脱发植发
- P5055 【模板】可持久化文艺平衡树 可持久化fhqtreap
- 简单对数不等式的证明
- 手机安装ubuntu
- C++可微编程:寻找一种最佳的图像抖动模式
- 2021年全球灌装设备收入大约1194.6百万美元,预计2028年达到1604.7百万美元
- 普通大学生的真实出路