原文:http://jaskey.github.io/blog/2017/02/16/rocketmq-clean-commitlog/

RocketMQ——消息文件过期原理

2017-02-16 THU 11:49

RocketMQ——消息ACK机制及消费进度管理 文中提过,所有的消费均是客户端发起Pull请求的,告诉消息的offset位置,broker去查询并返回。但是有一点需要非常明确的是,消息消费后,消息其实并没有物理地被清除,这是一个非常特殊的设计。本文来探索此设计的一些细节。

消费完后的消息去哪里了?

消息的存储是一直存在于CommitLog中的。而由于CommitLog是以文件为单位(而非消息)存在的,CommitLog的设计是只允许顺序写的,且每个消息大小不定长,所以这决定了消息文件几乎不可能按照消息为单位删除(否则性能会极具下降,逻辑也非常复杂)。所以消息被消费了,消息所占据的物理空间并不会立刻被回收。

但消息既然一直没有删除,那RocketMQ怎么知道应该投递过的消息就不再投递?——答案是客户端自身维护——客户端拉取完消息之后,在响应体中,broker会返回下一次应该拉取的位置,PushConsumer通过这一个位置,更新自己下一次的pull请求。这样就保证了正常情况下,消息只会被投递一次。

什么时候清理物理消息文件?

那消息文件到底删不删,什么时候删?

消息存储在CommitLog之后,的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删除消息文件(CommitLog):

  1. 消息文件过期(默认72小时),且到达清理时点(默认是凌晨4点),删除过期文件。
  2. 消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。
  3. 磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。

注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。

这样设计带来的好处

消息的物理文件一直存在,消费逻辑只是听客户端的决定而搜索出对应消息进行,这样做,笔者认为,有以下几个好处:

  1. 一个消息很可能需要被N个消费组(设计上很可能就是系统)消费,但消息只需要存储一份,消费进度单独记录即可。这给强大的消息堆积能力提供了很好的支持——一个消息无需复制N份,就可服务N个消费组。

  2. 由于消费从哪里消费的决定权一直都是客户端决定,所以只要消息还在,就可以消费到,这使得RocketMQ可以支持其他传统消息中间件不支持的回溯消费。即我可以通过设置消费进度回溯,就可以让我的消费组重新像放快照一样消费历史消息;或者我需要另一个系统也复制历史的数据,只需要另起一个消费组从头消费即可(前提是消息文件还存在)。

  3. 消息索引服务。只要消息还存在就能被搜索出来。所以可以依靠消息的索引搜索出消息的各种原信息,方便事后排查问题。

注:在消息清理的时候,由于消息文件默认是1GB,所以在清理的时候其实是在删除一个大文件操作,这对于IO的压力是非常大的,这时候如果有消息写入,写入的耗时会明显变高。这个现象可以在凌晨4点(默认删时间时点)后的附近观察得到。

RocketMQ官方建议Linux下文件系统改为Ext4,对于文件删除操作相比Ext3有非常明显的提升。

跳过历史消息的处理

由于消息本身是没有过期的概念,只有文件才有过期的概念。那么对于很多业务场景——一个消息如果太老,是无需要被消费的,是不合适的。

这种需要跳过历史消息的场景,在RocketMQ要怎么实现呢?

对于一个全新的消费组,PushConsumer默认就是跳过以前的消息而从最尾开始消费的,解析请参看RocketMQ——消息ACK机制及消费进度管理相关章节。

但对于已存在的消费组,RocketMQ没有内置的跳过历史消息的实现,但有以下手段可以解决:

  1. 自身的消费代码按照日期过滤,太老的消息直接过滤。如:

         @Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for(MessageExt msg: msgs){if(System.currentTimeMillis()-msg.getBornTimestamp()>60*1000) {//一分钟之前的认为过期continue;//过期消息跳过}//do consume here}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
    
  2. 自身的消费代码代码判断消息的offset和MAX_OFFSET相差很远,认为是积压了很多,直接return CONSUME_SUCCESS过滤。

         @Overridepublic ConsumeConcurrentlyStatus consumeMessage(//List<MessageExt> msgs, //ConsumeConcurrentlyContext context) {long offset = msgs.get(0).getQueueOffset();String maxOffset = msgs.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET);long diff = Long. parseLong(maxOffset) - offset;if (diff > 100000) { //消息堆积了10W情况的特殊处理return ConsumeConcurrentlyStatus. CONSUME_SUCCESS;}//do consume herereturn ConsumeConcurrentlyStatus. CONSUME_SUCCESS;}
    
  3. 消费者启动前,先调整该消费组的消费进度,再开始消费。可以人工使用控制台命令resetOffsetByTime把消费进度调整到后面,再启动消费。

  4. 原理同3,但使用代码来控制。代码中调用内部的运维接口,具体代码实例祥见ResetOffsetByTimeCommand.java.

Posted by Jaskey Lam 2017-02-16 Thu 11:49  java, rocketmq

Jaskey Lam

站在风口,只有猪才能飞起来


转载请注明出处,作者,Jaskey Lam,软件工程师
联系邮箱: linjunjie1103@gmail.com
知乎主页: www.zhihu.com/people/linjunjie1103

RocketMQ——消息文件过期原理相关推荐

  1. 什么时候清理物理消息文件?

    那消息文件到底删不删,什么时候删? 消息存储在CommitLog之后,的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删除消息文件(CommitLog): 1. 消息文件过期(默认72小时 ...

  2. 源码分析RocketMQ顺序消息消费实现原理

    本节目录 1.消息队列负载 2.消息拉取 3.消息顺序消息消费 3.1核心属性与构造函数 3.2 start方法 3.3 submitConsumeRequest 3.4 ConsumeMessage ...

  3. RocketMQ源码(4)—Broker启动加载消息文件以及恢复数据源码【一万字】

    详细介绍了Broker启动加载消息文件以及恢复数据源码. 此前我们学习了Broker的启动源码:RocketMQ源码(3)-Broker启动流程源码解析[一万字],Broker的启动过程中,在Defa ...

  4. rocketmq 消息指定_rocketmq-常见问题总结(消息的顺序、重复、消费模式)

    参考: http://www.cnblogs.com/wxd0108/p/6038543.html https://www.cnblogs.com/520playboy/p/6750023.html ...

  5. 消息系统kafka原理解析

    Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Clouder ...

  6. rocketmq 消息 自定义_RocketMQ的消息发送及消费

    RocketMQ消息支持的模式: 消息支持的模式分为三种:NormalProducer(普通同步),消息异步发送,OneWay. 消息同步发送: 普通消息的发送和接收在前面已经演示过了,在前面的案例中 ...

  7. RocketMQ学习(四)——RocketMQ消息发送

    RocketMQ 网络架构图 RocketMQ分布式消息队列的网络部署架构图如下图所示 于上图中几个角色的说明: (1) NameServer: RocketMQ集群的命名服务器(也可以说是注册中心) ...

  8. 从源码分析RocketMQ系列-RocketMQ消息设计详解

    1 消息存储   消息存储是RocketMQ中最为复杂和最为重要的一部分,本节将分别从RocketMQ的消息存储整体架构.PageCache与Mmap内存映射以及RocketMQ中两种不同的刷盘方式三 ...

  9. 分布式消息通信ActiveMQ原理 分析一

    本章知识点: 1. 持久化消息和非持久化消息的发送策略2. 消息的持久化方案及实践3. 消费端消费消息的原理 持久化消息与非持久化消息的发送策略 消息同步发送和异步发送 同步发送过程中,发送者发送一条 ...

最新文章

  1. 白领们注意啦:“过劳死”27个危险信号!
  2. SqlServer数据类型
  3. 合并排序的非递归实现(自底向上设计)
  4. Apache Thrift的使用
  5. go build 参数_从0开始Go语言,用Golang搭建网站
  6. C#中跨线程访问控件问题解决方案
  7. linux日志添加到文件,关于linux:将变量中的内容追加到日志文件中
  8. java 字符流 utf8,JAVA基础(字符流设置编码读写字符)
  9. 7-3 DAG图优化-A (15 分)(更新版)
  10. 《计算机科学导论》 数据库基础知识
  11. 5.9 Longformer解读
  12. 20220309讨论
  13. 电脑关机程序(源码)
  14. QT 处理TCP粘包问题
  15. Annotation释疑
  16. linux 的截屏软件下载,Linux 截屏软件 Shutter
  17. 微信小程序云开发之简单两步实现集成赞赏加群弹窗功能
  18. 计算机组成原理补码减法,补码加减法运算(计算机组成原理).ppt
  19. WebForm网站和MVC网站运行机制的区别
  20. cad断点快捷键_CAD中打断于点的快捷键

热门文章

  1. 一分钟看懂IoC 原理
  2. 【创新实训8】手势理解功能与实际应用整合
  3. 小程序Mpx框架入门
  4. 原生实现C#与Lua相互调用方法
  5. When Cloud Storage Meets RDMA
  6. 【水晶报表内功心法】--PUSH模式样板招式
  7. PHP中的uniqid在高并发下的重复
  8. win10家庭版无法安装mysql_win10安装mysql遇到的坑
  9. 适合学生的平价蓝牙耳机,四款平价好用蓝牙耳机推荐
  10. iOS开发:电池电量监测