RocketMQ消费进度管理
业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
,RocketMQ才会认为这批消息(默认是1条)是消费完成的
如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER
,RocketMQ就会认为这批消息消费失败了。
为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消费失败的消息重发回Broker(topic不是原topic而是这个消费租的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默
认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。
从哪里开始消费
当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度,如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:
CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息
CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一
遍
CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半
个小时以前
消息ACK机制
RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个消费组在这条queue上的消费进度。
每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到broker(即,不是立刻同步到broker,有一段时间消费进度只会存在于本地,此时如果宕机,那么未提交的消费进度就会被重新消费),以此持久化消费进度。但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值,如下图:
比如2消费失败,rocketmq跳过2消费到了8,8消费成功了,但是提交的时候只会提交【消费到了1】,因为2失败了,所以会提交最小成功点
重复消费问题
由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。
在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。
在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。
对于这个场景,RocketMQ暂时无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度。
重复消费验证
查看当前消费进度
检查队列消费的当前进度。
查看RocketMQ 的config文件夹下的consumerOffset.json
cat consumerOffset.json
通过consumerOffset.json我们可以知道当前topicTest
主题的rocket_test_consumer_group
组的queue2
消费到偏移量为32
消费者发送消息
消费者发送消息,并查看各个队列消息的偏移量
我们发现队列2的偏移量最小为32
消费的时候最小偏移量不提交,其他都正常
//队列2的偏移量为32的数据在等待
if (ext.getQueueId() == 2 && ext.getQueueOffset() == 32) {System.out.println("消息消费耗时较厂接收queueId:[" + ext.getQueueId() + "],偏移量
offset:[" + ext.getQueueOffset() + "]");
//等待 模拟假死状态
try {Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {e.printStackTrace();
}
}
运行查看日志
我们发现只有队列2的偏移量为32的消息消费超时,其他都已经正常消费
我们再查看下consumerOffset.json
cat consumerOffset.json
我们发现因为rocketMQ 整个消费记录都没有被提交,所以下次消费会全部再次消费。 这里模拟出了整个消费进度都在本地,没来得及提交给broker。
还有一种情况就是,进度成功提交给broker了,queue0、1、3的消费进度都改变了。但是queue2的消费进度还是32,因为消费32的时候超时了,rocketmq只能提交最小成功offset!
再次消费
去掉延时代码继续消费
我们发现消息被重复消费了一遍
RocketMQ消费进度管理相关推荐
- RocketMQ(十)——Consumer消费进度(Offset)的管理
文章目录 Consumer消费进度(Offset)的管理 Offset本地管理模式 Offset远程管理模式 offset用途 重试队列 offset的同步提交与异步提交 Consumer消费进度(O ...
- rocketmq存储消息mysql_RocketMQ消息消费以及进度管理解析
最近 ONS 消息堆积的很严重,并且经常发现部分几乎没有消息消费的消费者也提示堆积,所以有必要深入了解一下 RocketMQ 的设计思路,来看看堆积量如何计算,以及如何正确的使用 Topic 以及 C ...
- 【高项】- 进度管理论文
论文仅供参考 [摘要] 2018年4月,我参加了某地方政府信息融合项目的开发,担任项目经理.该项目一共投资600万元,建设工期1年,由于政府机构众多,信息发布平台存在重复建设,信息孤立问题.由政府信息 ...
- kafka_消费者组消费进度监控实现
对于 Kafka 消费者,最重要的就是监控它们的消费进度,或者说监控它们消费的滞后程度(消费者 Lag 或 Consumer Lag). 所谓滞后程度,就是指消费者当前落后于生产者的程度.Lag 的单 ...
- 软考高项之进度管理——攻坚记忆
软考高项之进度管理--攻坚记忆 一.进度管理过程 二.规划进度管理和制定进度计划区别 三.重要的工具与技术 四.相关重要概念 一.进度管理过程 1.规划进度管理:需要写一个文档,进度管理计划,里面规定 ...
- 【项目管理】进度管理
目录 1.规划进度管理 2.定义活动 3.排列活动顺序 4.估算活动资源 5.估算活动持续时间 6.制定进度计划 7.控制进度 1.规划进度管理 编制进度管理计划,记录如何进行进度管理内容. 输入:项 ...
- 进度管理计划7个过程及相关重点
概述: 1.规划进度管理:规划.编制.管理.执行和控制项目进度而制定政策.程序和文档过程. 2.定义活动:识别和记录为完成项目可交付成果而采取的具体行动的过程. 3.排列活动顺序:识别和记录项目活动之 ...
- PMP知识点(四、进度管理)
此系列文章分享给想学习PMP的项目经理和想要学习PMP的程序猿们!期望观看者快速掌握PMP知识点并实际运用(还有顺利考过PMP了). 本章内容: 进度管理过程介绍 进度管理过程介绍 上一章我们了解了范 ...
- 论文,成本管理与进度管理(主成本)
摘要: 2017年6月,我作为项目经理参加了XX市政府网站集约化建设项目,负责项目的全面管理工作.该项目投资300万,工期为6个月,项目内容包括了政府门户网站群.25个应用系统.统一身份认证和数据共享 ...
- 笔记-高项案例题-2016年上-范围管理+沟通管理+进度管理+风险管理
2016上半年高级信息系统项目管理师下午案例分析真题 [说明] 系统集成商B公司中标了某电子商务A企业的信息系统硬件扩容项目,项目内容为采购用户指定型号的多台服务器之"交换设备.存储设备&q ...
最新文章
- 上海交大情感脑电数据集(SJTU Emotion EEG Dataset,SEED)
- [译] 基于事件流构建的服务
- php输出一百个hello,如何使用 PHP 输出 hello world?
- 如何完美的将对话框设置成无边框无标题栏样式?
- 【手算】行列式树形展开
- python查看list_reverseiterator object中的内容
- python 实例 cadu_【示例详解】AutoCAD处理控件Aspose.CAD 8月新更!支持加载大型DWG文件...
- 自己定义控件-仿iphone之ToggleButtonamp;VoiceSeekBar
- keil5调试如何选择晶振_有源晶振的负载电容重要吗?
- 如何在HTML中加载一个CSS文件?
- eclipse远程发布代码的方法(SSH自动同步)
- 白话空间统计二十四:地理加权回归(六)ArcGIS的GWR工具参数说明一
- 英语对程序员来讲有多重要?不会英语可以做程序员吗?
- 笔记本电脑怎么拆开后盖_新手怎么拆解笔记本?笔记本拆机注意事项 (全文)
- NeurIPS十年高引学者TOP100榜单发布!这些大牛值得膜拜!
- 使用protobuf_example_addressbook.proto项目时的问题:PROTOBUF_USE_DLLS
- centos:gtk:No package ‘gdk-2.0‘ found
- 树莓派远程4G遥控车教程(二)-相机云台舵机初步调试
- 微信小程序的设计以及demo
- 用python实现双人五子棋(终端版)