ProducerConsumer两个角度分析重复消费的问题。

Producer

消息重复场景

Producersend()方法可能会出现异常,配合生产者参数retries>0,生产者会在出现可恢复异常的时候进行重试。

若出现不可恢复异常的时候,配合send()的异步发送方式,则可能在回调函数中进行消息重发。上述均可能导致消息重复。

解决方法

Kafka的幂等性就是为了避免出现生产者重试的时候出现重复写入消息的情况。

开启幂等性功能配置(该配置默认为false)如下:

prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);

Consumer

重复消费场景

一、自动提交消费位移

kafka默认消费位移的提交是自动提交,由消费者参数enable.auto.commit配置,默认为true

这个自动提交并不是每消费一条消息就自动提交消费位移,而是定期提交,这个定期提交的时间由客户端参数auto.commit.interval.ms配置,默认5秒。

也就是说,在默认情况下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。

注意:自动提交的动作在poll()方法的逻辑中,在每次向服务器端发起拉取请求之前会检查是否可以进行位移提交,如果可以就提交上一次轮询的位移。

如果在拉取消息进行消费,但是下一次提交位移之前消费者崩溃了,或者在消费者关闭之前调用了consumer.unsubscribe()方法取消订阅,那么下一次就还得在上一次消费位移的位置重新开始消费,造成重复消费!

解决办法

设置手动提交消费位移。

二、手动提交消费位移

当开启手动提交消费位移之后,依然会出现重复消费的场景。

例如当我们的代码逻辑是拉取消息之后先处理消息,然后进行位移提交

若处理消息的时候,提交位移之前消费者宕机,消费者重启后,则会出现重复消费的问题。

解决办法

根据业务需求处理,在合适的时候进行手动位移提交。

三、再均衡

再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用
性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消
费者。

需要注意的是在再均衡发生期间,消费组内的消费者是无法读取消息的。另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。一般情况下,应尽量避免不必要的再均衡的发生。

比如消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作 ,之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍,也就是发生了重复消费。

什么情况下会发生再均衡?
  1. 消费者、分区数量发生变化,使用正则表达式订阅主题,有符合条件的主题被创建。
  2. 使用consumer.unsubscribe()取消对某些主题的订阅。
  3. max.poll.interval.ms,当通过消费组管理消费者时,该配置指定拉取消息线程最长空闲时间,若超过这个时间间隔还没有发起poll操作,则消费组认为该消费者己离开了消费组,将进行再均衡操作。
解决办法

使用ConsumerRebalanceListener,再均衡监听器,它可以用来设定发生再均衡动作前后的一些准备或者收尾工作。

ConsumerRebalanceListener是一个接口,包含两个方法,具体代码如下:

//partitions 表示再均衡之前分配到的分区
void onpartitionRevoked(Collection<TopicPartition> partitions)

这个方法会在再均衡开始之前和消费者停止读取消息之后被调用。我们可以用它来处理消费位移的提交。

//partitions 表示再均衡之后分配到的分区
void onpartitionAssigned(Collection<TopicPartition> partitions)

这个方法会在重新分配分区之后和消费者开始读取消费之前被调用。

用法如下:

HashMap<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(initConfig());
consumer.subscribe(Collections.singletonList(KafkaConfig.TOPIC), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> collection) {consumer.commitSync(currentOffsets);currentOffsets.clear();}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> collection) {//...}
});
try {while (IS_RUNNING.get()) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.println("topic:" + record.topic() + ",offset:" + record.offset() + ",value:" + record.value());currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));}consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {if (e != null) {e.printStackTrace();} else {logger.error("fail to commit offsets {}", offsets);}}});}
} catch (Exception e) {e.printStackTrace();
}

我们将消费位移暂存在一个局部变量currentOffsets中,正常消费的时候就可以通过commitAsync()方法来异步提交消费位移。

若发生再均衡,则在发生再均衡之前通过再均衡监听器的onPartitionRevoked()回调执行commitSync()方法来同步提交位移。

如何避免Kafka的重复消费相关推荐

  1. Kafka会不会重复消费

    本文来说下kafka会不会重复消费的问题.在单体架构时代,就存在着接口幂等性的问题,只不过到了分布式.高并发的场景之后,接口幂等性的问题会更加明显. 文章目录 概述 消息重复消费问题 解决方案 方案一 ...

  2. 什么?搞不定Kafka重复消费?

    来自:架构之美 前言 今天我们聊一个话题,这个话题大家可能在面试过程中,或者是工作当中经常遇到 ????如何保证 Kafka 消息不重复消费?我们在做开发的时候为了程序的健壮性,在使用 Kafka 的 ...

  3. kafka 重复消费和数据丢失_刨根问底,Kafka消息中间件到底会不会丢消息

    大型互联网公司一般都会要求消息传递最大限度的不丢失,比如用户服务给代金券服务发送一个消息,如果消息丢失会造成用户未收到应得的代金券,最终用户会投诉. 为避免上面类似情况的发生,除了做好补偿措施,更应该 ...

  4. 【消息队列】kafka是如何保证消息不被重复消费的

    一.kafka自带的消费机制 kafka有个offset的概念,当每个消息被写进去后,都有一个offset,代表他的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offs ...

  5. kafka rebalance与数据重复消费问题

    问题和现象: 某个程序在消费kafka数据时,总是重复消费相关数据,仿佛在数据消费完毕之后,没有提交相应的偏移量.然而在程序中设置了自动提交:enable.auto.commit为true 检查日志, ...

  6. kafka一直rebalance故障,重复消费

    今天我司线上kafka消息代理出现错误日志,异常rebalance,而且平均间隔2到3分钟就会rebalance一次,分析日志发现比较严重.错误日志如下 08-09 11:01:11 131 pool ...

  7. kafka重复消费问题

    开篇提示:kafka重复消费的根本原因就是"数据消费了,但是offset没更新"!而我们要探究一般什么情况下会导致offset没更新? 今天查看Elasticsearch索引的时候 ...

  8. MQ问题集(kafka主从同步与高可用,MQ重复消费、幂等)

    1.kafka主从同步与高可用 https://1028826685.iteye.com/blog/2354570 http://developer.51cto.com/art/201808/5815 ...

  9. kafka 脚本发送_Kafka笔记归纳(第五部分:一致性保证,消息重复消费场景及解决方式)...

    写在开头: 本章是Kafka学习归纳第五部分,着重于强调Kafka的事一致性保证,消息重复消费场景及解决方式,记录偏移量的主题,延时队列的知识点. 文章内容输出来源:拉勾教育大数据高薪训练营. 一致性 ...

最新文章

  1. 如何合理地估算线程池大小?
  2. 砸500万买学位房,一个焦虑中产的看房日记
  3. 高性能云计算展望(中)
  4. 漫游Kafka之过期数据清理
  5. 面向对象编程 object oriented programming(OOP)(第二篇)
  6. Memcached 工作原理
  7. 大数据可视化设计到底是啥,该怎么用
  8. javaweb教务管理系统_基于Java web的教务管理系统
  9. Linux C简单日志打印代码示例
  10. ​让AI触类旁通93种语言:Facebook最新多语种句嵌入来了
  11. 多维动态数组c语言,C语言多维动态数组的实现
  12. 学习笔记:GAMES101图形学入门闫令琪(五)抗锯齿
  13. Linux 之父亮相,这个开源社区要“搞大事”
  14. unit自动驾驶怎么使用_自动驾驶与驾驶辅助系统 正确使用驾驶辅助系统
  15. LLVM的源码目录结构
  16. K8S学习之污点容忍
  17. TI高精度实验室-运算放大器-第十六节-全差分放大器
  18. java具名参数_Spring jdbc具名参数使用方法详解
  19. 关于精准打击自签名伪造SSL/TLS “受信任域名证书”的方案
  20. 某喜欢研究车的80后,驾驶经验59条

热门文章

  1. linux p12 转 pem,p12转PEM供golang APNS使用
  2. 一名资深质量工程师总结的4个工作思路
  3. 21-Feb-2011
  4. 安卓APP上线各应用商店-最新版
  5. 发那科法兰克加工中心FANUC 0MD 操作面板讲解,看看这些按键是什么意思
  6. C++ 单向链表 —— 初始化、插入、返回第一个节点、删除、查找、长度、打印、反转(逆序)
  7. 腾云宾馆管理系统服务器名,腾云宾馆管理系统_行业软件_软件百科_360安全中心 - 360安全卫士官方网站...
  8. dns服务期搭建使用_DNS服务器的搭建与使用详解
  9. JAVA集合框架工具类自定义Collections集合方法
  10. 10月23日,相约全球边缘计算大会·上海站