RocketMQ——消息文件过期原理
原文:http://jaskey.github.io/blog/2017/02/16/rocketmq-clean-commitlog/
RocketMQ——消息文件过期原理
2017-02-16 THU 11:49
RocketMQ——消息ACK机制及消费进度管理 文中提过,所有的消费均是客户端发起Pull请求的,告诉消息的offset位置,broker去查询并返回。但是有一点需要非常明确的是,消息消费后,消息其实并没有物理地被清除,这是一个非常特殊的设计。本文来探索此设计的一些细节。
消费完后的消息去哪里了?
消息的存储是一直存在于CommitLog中的。而由于CommitLog是以文件为单位(而非消息)存在的,CommitLog的设计是只允许顺序写的,且每个消息大小不定长,所以这决定了消息文件几乎不可能按照消息为单位删除(否则性能会极具下降,逻辑也非常复杂)。所以消息被消费了,消息所占据的物理空间并不会立刻被回收。
但消息既然一直没有删除,那RocketMQ怎么知道应该投递过的消息就不再投递?——答案是客户端自身维护——客户端拉取完消息之后,在响应体中,broker会返回下一次应该拉取的位置,PushConsumer通过这一个位置,更新自己下一次的pull请求。这样就保证了正常情况下,消息只会被投递一次。
什么时候清理物理消息文件?
那消息文件到底删不删,什么时候删?
消息存储在CommitLog之后,的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删除消息文件(CommitLog):
- 消息文件过期(默认72小时),且到达清理时点(默认是凌晨4点),删除过期文件。
- 消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。
- 磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。
注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。
这样设计带来的好处
消息的物理文件一直存在,消费逻辑只是听客户端的决定而搜索出对应消息进行,这样做,笔者认为,有以下几个好处:
一个消息很可能需要被N个消费组(设计上很可能就是系统)消费,但消息只需要存储一份,消费进度单独记录即可。这给强大的消息堆积能力提供了很好的支持——一个消息无需复制N份,就可服务N个消费组。
由于消费从哪里消费的决定权一直都是客户端决定,所以只要消息还在,就可以消费到,这使得RocketMQ可以支持其他传统消息中间件不支持的回溯消费。即我可以通过设置消费进度回溯,就可以让我的消费组重新像放快照一样消费历史消息;或者我需要另一个系统也复制历史的数据,只需要另起一个消费组从头消费即可(前提是消息文件还存在)。
消息索引服务。只要消息还存在就能被搜索出来。所以可以依靠消息的索引搜索出消息的各种原信息,方便事后排查问题。
注:在消息清理的时候,由于消息文件默认是1GB,所以在清理的时候其实是在删除一个大文件操作,这对于IO的压力是非常大的,这时候如果有消息写入,写入的耗时会明显变高。这个现象可以在凌晨4点(默认删时间时点)后的附近观察得到。
RocketMQ官方建议Linux下文件系统改为Ext4,对于文件删除操作相比Ext3有非常明显的提升。
跳过历史消息的处理
由于消息本身是没有过期的概念,只有文件才有过期的概念。那么对于很多业务场景——一个消息如果太老,是无需要被消费的,是不合适的。
这种需要跳过历史消息的场景,在RocketMQ要怎么实现呢?
对于一个全新的消费组,PushConsumer默认就是跳过以前的消息而从最尾开始消费的,解析请参看RocketMQ——消息ACK机制及消费进度管理相关章节。
但对于已存在的消费组,RocketMQ没有内置的跳过历史消息的实现,但有以下手段可以解决:
自身的消费代码按照日期过滤,太老的消息直接过滤。如:
@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for(MessageExt msg: msgs){if(System.currentTimeMillis()-msg.getBornTimestamp()>60*1000) {//一分钟之前的认为过期continue;//过期消息跳过}//do consume here}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
自身的消费代码代码判断消息的offset和MAX_OFFSET相差很远,认为是积压了很多,直接return CONSUME_SUCCESS过滤。
@Overridepublic ConsumeConcurrentlyStatus consumeMessage(//List<MessageExt> msgs, //ConsumeConcurrentlyContext context) {long offset = msgs.get(0).getQueueOffset();String maxOffset = msgs.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET);long diff = Long. parseLong(maxOffset) - offset;if (diff > 100000) { //消息堆积了10W情况的特殊处理return ConsumeConcurrentlyStatus. CONSUME_SUCCESS;}//do consume herereturn ConsumeConcurrentlyStatus. CONSUME_SUCCESS;}
消费者启动前,先调整该消费组的消费进度,再开始消费。可以人工使用控制台命令resetOffsetByTime把消费进度调整到后面,再启动消费。
- 原理同3,但使用代码来控制。代码中调用内部的运维接口,具体代码实例祥见
ResetOffsetByTimeCommand.java
.
Posted by Jaskey Lam 2017-02-16 Thu 11:49 java, rocketmq
站在风口,只有猪才能飞起来
转载请注明出处,作者,Jaskey Lam,软件工程师
联系邮箱: linjunjie1103@gmail.com
知乎主页: www.zhihu.com/people/linjunjie1103
RocketMQ——消息文件过期原理相关推荐
- 什么时候清理物理消息文件?
那消息文件到底删不删,什么时候删? 消息存储在CommitLog之后,的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删除消息文件(CommitLog): 1. 消息文件过期(默认72小时 ...
- 源码分析RocketMQ顺序消息消费实现原理
本节目录 1.消息队列负载 2.消息拉取 3.消息顺序消息消费 3.1核心属性与构造函数 3.2 start方法 3.3 submitConsumeRequest 3.4 ConsumeMessage ...
- RocketMQ源码(4)—Broker启动加载消息文件以及恢复数据源码【一万字】
详细介绍了Broker启动加载消息文件以及恢复数据源码. 此前我们学习了Broker的启动源码:RocketMQ源码(3)-Broker启动流程源码解析[一万字],Broker的启动过程中,在Defa ...
- rocketmq 消息指定_rocketmq-常见问题总结(消息的顺序、重复、消费模式)
参考: http://www.cnblogs.com/wxd0108/p/6038543.html https://www.cnblogs.com/520playboy/p/6750023.html ...
- 消息系统kafka原理解析
Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Clouder ...
- rocketmq 消息 自定义_RocketMQ的消息发送及消费
RocketMQ消息支持的模式: 消息支持的模式分为三种:NormalProducer(普通同步),消息异步发送,OneWay. 消息同步发送: 普通消息的发送和接收在前面已经演示过了,在前面的案例中 ...
- RocketMQ学习(四)——RocketMQ消息发送
RocketMQ 网络架构图 RocketMQ分布式消息队列的网络部署架构图如下图所示 于上图中几个角色的说明: (1) NameServer: RocketMQ集群的命名服务器(也可以说是注册中心) ...
- 从源码分析RocketMQ系列-RocketMQ消息设计详解
1 消息存储 消息存储是RocketMQ中最为复杂和最为重要的一部分,本节将分别从RocketMQ的消息存储整体架构.PageCache与Mmap内存映射以及RocketMQ中两种不同的刷盘方式三 ...
- 分布式消息通信ActiveMQ原理 分析一
本章知识点: 1. 持久化消息和非持久化消息的发送策略2. 消息的持久化方案及实践3. 消费端消费消息的原理 持久化消息与非持久化消息的发送策略 消息同步发送和异步发送 同步发送过程中,发送者发送一条 ...
最新文章
- 白领们注意啦:“过劳死”27个危险信号!
- SqlServer数据类型
- 合并排序的非递归实现(自底向上设计)
- Apache Thrift的使用
- go build 参数_从0开始Go语言,用Golang搭建网站
- C#中跨线程访问控件问题解决方案
- linux日志添加到文件,关于linux:将变量中的内容追加到日志文件中
- java 字符流 utf8,JAVA基础(字符流设置编码读写字符)
- 7-3 DAG图优化-A (15 分)(更新版)
- 《计算机科学导论》 数据库基础知识
- 5.9 Longformer解读
- 20220309讨论
- 电脑关机程序(源码)
- QT 处理TCP粘包问题
- Annotation释疑
- linux 的截屏软件下载,Linux 截屏软件 Shutter
- 微信小程序云开发之简单两步实现集成赞赏加群弹窗功能
- 计算机组成原理补码减法,补码加减法运算(计算机组成原理).ppt
- WebForm网站和MVC网站运行机制的区别
- cad断点快捷键_CAD中打断于点的快捷键