业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS ,RocketMQ才会认为这批消息(默认是1条)是消费完成的

如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。

为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消费失败的消息重发回Broker(topic不是原topic而是这个消费租的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默
认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。

从哪里开始消费

当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度,如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:

CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息
CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一
遍
CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半
个小时以前

消息ACK机制

RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个消费组在这条queue上的消费进度。

每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到broker(即,不是立刻同步到broker,有一段时间消费进度只会存在于本地,此时如果宕机,那么未提交的消费进度就会被重新消费),以此持久化消费进度。但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值,如下图:

比如2消费失败,rocketmq跳过2消费到了8,8消费成功了,但是提交的时候只会提交【消费到了1】,因为2失败了,所以会提交最小成功点

重复消费问题

由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。

在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。
在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。

对于这个场景,RocketMQ暂时无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度。

重复消费验证

查看当前消费进度

检查队列消费的当前进度。
查看RocketMQ 的config文件夹下的consumerOffset.json

cat consumerOffset.json


通过consumerOffset.json我们可以知道当前topicTest主题的rocket_test_consumer_group组的queue2消费到偏移量为32

消费者发送消息

消费者发送消息,并查看各个队列消息的偏移量

我们发现队列2的偏移量最小为32
消费的时候最小偏移量不提交,其他都正常

//队列2的偏移量为32的数据在等待
if (ext.getQueueId() == 2 && ext.getQueueOffset() == 32) {System.out.println("消息消费耗时较厂接收queueId:[" + ext.getQueueId() + "],偏移量
offset:[" + ext.getQueueOffset() + "]");
//等待 模拟假死状态
try {Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {e.printStackTrace();
}
}

运行查看日志

我们发现只有队列2的偏移量为32的消息消费超时,其他都已经正常消费
我们再查看下consumerOffset.json

cat consumerOffset.json


我们发现因为rocketMQ 整个消费记录都没有被提交,所以下次消费会全部再次消费。 这里模拟出了整个消费进度都在本地,没来得及提交给broker。

还有一种情况就是,进度成功提交给broker了,queue0、1、3的消费进度都改变了。但是queue2的消费进度还是32,因为消费32的时候超时了,rocketmq只能提交最小成功offset!

再次消费

去掉延时代码继续消费


我们发现消息被重复消费了一遍

RocketMQ消费进度管理相关推荐

  1. RocketMQ(十)——Consumer消费进度(Offset)的管理

    文章目录 Consumer消费进度(Offset)的管理 Offset本地管理模式 Offset远程管理模式 offset用途 重试队列 offset的同步提交与异步提交 Consumer消费进度(O ...

  2. rocketmq存储消息mysql_RocketMQ消息消费以及进度管理解析

    最近 ONS 消息堆积的很严重,并且经常发现部分几乎没有消息消费的消费者也提示堆积,所以有必要深入了解一下 RocketMQ 的设计思路,来看看堆积量如何计算,以及如何正确的使用 Topic 以及 C ...

  3. 【高项】- 进度管理论文

    论文仅供参考 [摘要] 2018年4月,我参加了某地方政府信息融合项目的开发,担任项目经理.该项目一共投资600万元,建设工期1年,由于政府机构众多,信息发布平台存在重复建设,信息孤立问题.由政府信息 ...

  4. kafka_消费者组消费进度监控实现

    对于 Kafka 消费者,最重要的就是监控它们的消费进度,或者说监控它们消费的滞后程度(消费者 Lag 或 Consumer Lag). 所谓滞后程度,就是指消费者当前落后于生产者的程度.Lag 的单 ...

  5. 软考高项之进度管理——攻坚记忆

    软考高项之进度管理--攻坚记忆 一.进度管理过程 二.规划进度管理和制定进度计划区别 三.重要的工具与技术 四.相关重要概念 一.进度管理过程 1.规划进度管理:需要写一个文档,进度管理计划,里面规定 ...

  6. 【项目管理】进度管理

    目录 1.规划进度管理 2.定义活动 3.排列活动顺序 4.估算活动资源 5.估算活动持续时间 6.制定进度计划 7.控制进度 1.规划进度管理 编制进度管理计划,记录如何进行进度管理内容. 输入:项 ...

  7. 进度管理计划7个过程及相关重点

    概述: 1.规划进度管理:规划.编制.管理.执行和控制项目进度而制定政策.程序和文档过程. 2.定义活动:识别和记录为完成项目可交付成果而采取的具体行动的过程. 3.排列活动顺序:识别和记录项目活动之 ...

  8. PMP知识点(四、进度管理)

    此系列文章分享给想学习PMP的项目经理和想要学习PMP的程序猿们!期望观看者快速掌握PMP知识点并实际运用(还有顺利考过PMP了). 本章内容: 进度管理过程介绍 进度管理过程介绍 上一章我们了解了范 ...

  9. 论文,成本管理与进度管理(主成本)

    摘要: 2017年6月,我作为项目经理参加了XX市政府网站集约化建设项目,负责项目的全面管理工作.该项目投资300万,工期为6个月,项目内容包括了政府门户网站群.25个应用系统.统一身份认证和数据共享 ...

  10. 笔记-高项案例题-2016年上-范围管理+沟通管理+进度管理+风险管理

    2016上半年高级信息系统项目管理师下午案例分析真题 [说明] 系统集成商B公司中标了某电子商务A企业的信息系统硬件扩容项目,项目内容为采购用户指定型号的多台服务器之"交换设备.存储设备&q ...

最新文章

  1. 上海交大情感脑电数据集(SJTU Emotion EEG Dataset,SEED)
  2. [译] 基于事件流构建的服务
  3. php输出一百个hello,如何使用 PHP 输出 hello world?
  4. 如何完美的将对话框设置成无边框无标题栏样式?
  5. 【手算】行列式树形展开
  6. python查看list_reverseiterator object中的内容
  7. python 实例 cadu_【示例详解】AutoCAD处理控件Aspose.CAD 8月新更!支持加载大型DWG文件...
  8. 自己定义控件-仿iphone之ToggleButtonamp;VoiceSeekBar
  9. keil5调试如何选择晶振_有源晶振的负载电容重要吗?
  10. 如何在HTML中加载一个CSS文件?
  11. eclipse远程发布代码的方法(SSH自动同步)
  12. 白话空间统计二十四:地理加权回归(六)ArcGIS的GWR工具参数说明一
  13. 英语对程序员来讲有多重要?不会英语可以做程序员吗?
  14. 笔记本电脑怎么拆开后盖_新手怎么拆解笔记本?笔记本拆机注意事项 (全文)
  15. NeurIPS十年高引学者TOP100榜单发布!这些大牛值得膜拜!
  16. 使用protobuf_example_addressbook.proto项目时的问题:PROTOBUF_USE_DLLS
  17. centos:gtk:No package ‘gdk-2.0‘ found
  18. 树莓派远程4G遥控车教程(二)-相机云台舵机初步调试
  19. 微信小程序的设计以及demo
  20. 用python实现双人五子棋(终端版)

热门文章

  1. HFSS常见使用问题和解决办法汇总(纯经验分享)
  2. [导入]146部玄幻小说合集
  3. Linux网络系统之配置域名与主机名映射和常见网络命令
  4. 英语数字表达方式大全
  5. 联通手机卡网速的修改
  6. 那些IT行业的经典定律
  7. 【视线追踪】视线追踪的性能评估框架 及 基础知识
  8. C/C++回溯经典练习:马的走法
  9. 量价交易——寻找妖股的底部结构
  10. Hyper-v安装CentOS