我们都知道RocketMQ的消息是持久化到文件的,具体的消息的刷盘策略是什么,是发送一条消息就直接持久化到文件中吗?作为一款高性能的消息中间件这样做肯定不行,至少性能上不允许这样操作,那么具体策略是啥我们具体分析下。

1、刷盘策略

RocketMQ提供了两种刷盘策略同步刷盘、异步刷盘

同步刷盘:在消息到达MQ后,RocketMQ需要将数据持久化,同步刷盘是指数据到达内存之后,必须刷到commitlog日志之后才算成功,然后返回producer数据已经发送成功。

异步刷盘:,同步刷盘是指数据到达内存之后,返回producer说数据已经发送成功。,然后再写入commitlog日志。

复制方式

优点

缺点

适应场景

同步刷盘

保证了消息不丢失

吞吐率相对于异步刷盘要低

消息可靠性要求较高的场景

异步刷盘

系统的吞吐量提高

系统断电等异常时会有部分丢失

对应吞吐量要求较高的场景

下面我们从源码的角度分析其实现的逻辑

2、同步刷盘

CommitLog.putMessage()方法中的刷盘的核心方法handleDiskFlush()

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {

// Synchronization flush 同步刷盘

if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {

final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;

//客户端确认要等待刷盘成功

if (messageExt.isWaitStoreMsgOK()) {

//封装刷盘请求对象 nextoffset : 当前内存写的位置 + 本次要写入的字节数

GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());

//添加刷盘请求(后台定时任务进行刷盘,每隔10毫秒批量刷盘。10毫秒中如果有多个请求,则多个请求一块刷盘)

service.putRequest(request);

//等待刷盘请求结果(最长等待5秒钟,刷盘成功后马上可以获取结果。)

boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

if (!flushOK) {

log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()

+ " client address: " + messageExt.getBornHostString());

putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);

}

} else {

service.wakeup();

}

}else {// Asynchronous flush 异步刷盘

if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {

//唤醒FlushRealTimeService服务线程

flushCommitLogService.wakeup();

} else {

//唤醒CommitRealTimeService服务线程

commitLogService.wakeup();

}

}

}

查看同步刷盘的核心类GroupCommitService中的核心属性

private volatile List requestsWrite = new ArrayList(); private volatile List requestsRead = new ArrayList(); requestsWrite : 写队列,主要用于向该线程添加刷盘任务 requestsRead : 读队列,主要用于执行特定的刷盘任务,这是是GroupCommitService 设计的一个亮点,把读写分离,每处理完requestsRead中的任务,就交换这两个队列。

我们查看其run()方法

public void run() {

CommitLog.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {

try {

//等待通知,如果数据过来,提前结束等待执行onWaitEnd()方法交换读写swapRequests()

//刷盘请求的requestsWrite->requestsRead

this.waitForRunning(10);

//执行刷盘

this.doCommit();

} catch (Exception e) {

CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);

}

}

//省略代码...

}

waitForRunning方法中执行了swapRequests()方法

private void swapRequests() {

List tmp = this.requestsWrite;

this.requestsWrite = this.requestsRead;

this.requestsRead = tmp;

}

GroupCommitService接收到的刷盘请求通过putRequest()方法加入到requestsWrite集合中,swapRequests()方法将requestsWrite请求集合交换到requestsRead集合中供刷盘使用,我们重点查看doCommit()方法

private void doCommit() {

synchronized (this.requestsRead) {

if (!this.requestsRead.isEmpty()) {

//循环每一个刷盘请求

for (GroupCommitRequest req : this.requestsRead) {

// There may be a message in the next file, so a maximum of

// two times the flush

boolean flushOK = false;

for (int i = 0; i < 2 && !flushOK; i++) {

//判断是否已经刷盘过了,刷盘的位置和当前消息下次刷盘需要的位置比较

flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

if (!flushOK) {

//0代码立刻刷盘,不管缓存中消息有多少

CommitLog.this.mappedFileQueue.flush(0);

}

}

//返回刷盘的结果

req.wakeupCustomer(flushOK);

}

long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();

//设置刷盘的时间点

if (storeTimestamp > 0) {

CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);

}

//清空requestsRead对象

this.requestsRead.clear();

} else {

// Because of individual messages is set to not sync flush, it

// will come to this process

CommitLog.this.mappedFileQueue.flush(0);

}

}

}

mappedFileQueue.flush(0)立刻刷盘

public boolean flush(final int flushLeastPages) {

boolean result = true;

MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);

if (mappedFile != null) {

long tmpTimeStamp = mappedFile.getStoreTimestamp();

//刷盘,返回刷写到磁盘指针

int offset = mappedFile.flush(flushLeastPages);

//计算当前的刷盘指针,之前的所有数据已经持久化到磁盘中

long where = mappedFile.getFileFromOffset() + offset;

result = where == this.flushedWhere;

this.flushedWhere = where;

if (0 == flushLeastPages) {

this.storeTimestamp = tmpTimeStamp;

}

}

return result;

}

mappedFile.flush(0);保证立刻刷盘后面异步刷盘时也会调用mappedFile.flush()方法

3、异步刷盘

if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {

//唤醒FlushRealTimeService服务线程

flushCommitLogService.wakeup();

} else {

//唤醒CommitRealTimeService服务线程

commitLogService.wakeup();

}

我们发现异步刷盘的时候有两种方式,一种是堆外内存池开启时启动CommitRealTimeService服务线程,另一个是默认执行的FlushRealTimeService服务线程进行刷盘操作,关于TransientStorePoolEnable在《RocketMQ内存映射》章节中的**“创建映射文件MappedFile”**中有介绍

​ 图3-1

1、FlushRealTimeService

查看其run()方法

public void run() {

CommitLog.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {

// 每次刷盘的间隔时间,默认 200ms

int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

// 每次commit最少的页数 默认4页

int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

// 如果上次刷新的时间+该值 小于当前时间,则改变flushPhysicQueueLeastPages =0 默认为200

int commitDataThoroughInterval =

CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

long begin = System.currentTimeMillis();

//距离上一次刷盘时间超过200ms则立刻刷盘,commit最少的页数置为0

if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {

this.lastCommitTimestamp = begin;

commitDataLeastPages = 0;

}

try {

//刷盘

boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);

long end = System.currentTimeMillis();

if (!result) {

this.lastCommitTimestamp = end; // result = false means some data committed.

//now wake up flush thread.

flushCommitLogService.wakeup();

}

if (end - begin > 500) {

log.info("Commit data to file costs {} ms", end - begin);

}

this.waitForRunning(interval);

} catch (Throwable e) {

CommitLog.log.error(this.getServiceName() + " service has exception. ", e);

}

}

boolean result = false;

for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {

result = CommitLog.this.mappedFileQueue.commit(0);

CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));

}

CommitLog.log.info(this.getServiceName() + " service end");

}

}

这种方式和同步刷盘一样就是mappedFileQueue.commit(commitDataLeastPages)参数有限制,数据达到一定量的时候才进行刷盘操作提高数据的刷盘性能。

2、CommitRealTimeService

查看其run()方法

public void run() {

CommitLog.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {

// 每次刷盘的间隔时间,默认 200ms

int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

// 每次commit最少的页数 默认4页

int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

// 如果上次刷新的时间+该值 小于当前时间,则改变flushPhysicQueueLeastPages =0 默认为200

int commitDataThoroughInterval =

CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

long begin = System.currentTimeMillis();

//距离上一次刷盘时间超过200ms则立刻刷盘,commit最少的页数置为0

if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {

this.lastCommitTimestamp = begin;

commitDataLeastPages = 0;

}

try {

//刷盘

boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);

long end = System.currentTimeMillis();

//返回的是false说明数据已经commit到了fileChannel中

if (!result) {

this.lastCommitTimestamp = end; // result = false means some data committed.

//now wake up flush thread.

flushCommitLogService.wakeup();

}

if (end - begin > 500) {

log.info("Commit data to file costs {} ms", end - begin);

}

this.waitForRunning(interval);

} catch (Throwable e) {

CommitLog.log.error(this.getServiceName() + " service has exception. ", e);

}

}

boolean result = false;

for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {

result = CommitLog.this.mappedFileQueue.commit(0);

CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));

}

CommitLog.log.info(this.getServiceName() + " service end");

}

}

我们发现其刷盘方法不一样mappedFileQueue.commit()调用MappedFile.commit()方法

public int commit(final int commitLeastPages) {

if (writeBuffer == null) {

//no need to commit data to file channel, so just regard wrotePosition as committedPosition.

return this.wrotePosition.get();

}

//如果提交的数据不满commitLeastPages则不执行本次的提交,待下一次提交

if (this.isAbleToCommit(commitLeastPages)) {

if (this.hold()) {

commit0(commitLeastPages);

this.release();

} else {

log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());

}

}

// All dirty data has been committed to FileChannel.

if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {

this.transientStorePool.returnBuffer(writeBuffer);

this.writeBuffer = null;

}

return this.committedPosition.get();

}

查看其核心刷盘方法

protected void commit0(final int commitLeastPages) {

int writePos = this.wrotePosition.get();

int lastCommittedPosition = this.committedPosition.get();

if (writePos - this.committedPosition.get() > 0) {

try {

//创建writeBuffer的共享缓存区

ByteBuffer byteBuffer = writeBuffer.slice();

//将指针回退到上一次提交的位置

byteBuffer.position(lastCommittedPosition);

//设置limit为writePos

byteBuffer.limit(writePos);

this.fileChannel.position(lastCommittedPosition);

//将committedPosition指针到wrotePosition的数据复制(写入)到fileChannel中

this.fileChannel.write(byteBuffer);

//更新committedPosition指针为writePos

this.committedPosition.set(writePos);

} catch (Throwable e) {

log.error("Error occurred when commit data to FileChannel.", e);

}

}

}

commit0()只是将缓存数据加入到fileChannel中,我们在CommitRealTimeService.run()方法中看到唤醒flushCommitLogService线程需要将fileChannel中的数据flush到磁盘中,我们发现两种方式都需要走flushCommitLogService.run()方法最后都执行MappedFile.flush(int)

public int flush(final int flushLeastPages) {

if (this.isAbleToFlush(flushLeastPages)) {

if (this.hold()) {

int value = getReadPosition();

try {

//We only append data to fileChannel or mappedByteBuffer, never both.

if (writeBuffer != null || this.fileChannel.position() != 0) {

this.fileChannel.force(false);

} else {

this.mappedByteBuffer.force();

}

} catch (Throwable e) {

log.error("Error occurred when force data to disk.", e);

}

//设置刷盘后的指针

this.flushedPosition.set(value);

this.release();

} else {

log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());

this.flushedPosition.set(getReadPosition());

}

}

return this.getFlushedPosition();

}

两种缓存方式走的刷盘逻辑也不同,可以查看**“图3-1”**两种方式的处理流程图

我们还发现一个方法isAbleToFlush()判断是否需要刷盘

private boolean isAbleToFlush(final int flushLeastPages) {

int flush = this.flushedPosition.get();

int write = getReadPosition();

if (this.isFull()) {

return true;

}

if (flushLeastPages > 0) {

return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;

}

return write > flush;

}

同步刷盘时flushLeastPages=0立刻刷盘

异步刷盘时flushLeastPages=4 ,默认是4,需要刷盘的数据达到PageCache的页数4倍时才会刷盘,或者距上一次刷盘时间>=200ms则设置flushLeastPages=0立刻刷盘

同步刷盘时无论消息的大小都立刻刷盘,线程阻塞等待刷盘结果

异步刷盘有两种方式但是其逻辑都是需要刷盘的数据OS_PAGE_SIZE的4倍即(1024 * 4)*4=16k或者距上一次刷盘时间>=200ms时才刷盘,提高数据的刷盘性能

java什么是消息刷盘_RocketMQ刷盘策略相关推荐

  1. 海信IP108H_S905L2_免拆_U盘卡刷固件包

    海信IP108H_S905L2_免拆_U盘卡刷固件包 图片参照固件包 特点: 1.适用于卡刷: 2.开放原厂固件屏蔽的市场安装和u盘安装apk: 3.修改dns,三网通用: 4.大量精简内置的没用的软 ...

  2. 九联UNT413A-S905L3A-免拆U盘卡刷固件包-当贝纯净桌面-内有教程

    九联UNT413A-S905L3A-免拆U盘卡刷固件包-当贝纯净桌面-内有教程 特点: 1.适用于对应型号的电视盒子刷机: 2.开放原厂固件屏蔽的市场安装和u盘安装apk: 3.修改dns,三网通用: ...

  3. Ventoy制作启动盘、刷机盘(多个系统的刷机或启动盘)

    Ventoy功能介绍 因为工作原因,经常需要刷机,为不同架构的机器刷不同种类的系统,刻盘.刷机耗时耗力,比如使用Ultraiso工具刻盘,每个架构的系统都得刻一个刷机盘(启动盘),在网上查了些资料,发 ...

  4. lc用U盘更新固件_魔百盒九联代工M301H海思Hi3798MV310芯片红外蓝牙语音遥控免拆卡刷固件及刷机教程202009版...

    前言:本固件适用于运营商定制的魔百和九联代工M301H-海思Hi3798MV310芯片-红外蓝牙语音遥控免拆固件,定制华为logo和动画,支持红外蓝牙语音,开启无线设置!仅供机油交流. 免责申明:本安 ...

  5. 小米路由器mini刷机过程/U盘刷系统

    一.小米路由器mini救砖.重刷系统 U盘刷机教程 小米路由器mini跟小米手机一样也是基于安卓系统的,因此也就存在刷机变砖的可能,或者你只是想重刷系统什么的.不同于手机刷机,路由器要怎么刷机呢?借助 ...

  6. hitool备份3798固件方法_创维E900s海思3798芯片当贝桌面不拆机通刷固件及刷机教程201910版...

    前言:本固件教程针对创维E900S系列海思3798芯片,直刷卡刷包固件无需拆机:测试了部分省份等地方E900S系列刷机正常,其他地方未测试:不支持高安版.CA版本,未测试悦Me系列,理论上支持:本固件 ...

  7. UT斯达康MC8638S-高安-S905-河北联通-破解刷机线刷固件包

    UT斯达康MC8638S-高安-S905-河北联通-破解刷机线刷固件包 固件特点: 1.修改dns,三网通用: 2.开放原厂固件屏蔽的市场安装和u盘安装apk: 3.无开机广告,无系统更新,不在被强制 ...

  8. zbrush常用笔刷_zbrush笔刷大合集

    文件大小 26G 基本包含了常用不常用的各类笔刷模型 已经分类整理好目录方便查找 含预览图 共包含4个大合集 大合集一目录 Zbrush笔刷001 一组岩石墙壁笔刷合集 Zbrush笔刷002 96款 ...

  9. 江苏移动MGV3000-YS(S)/YS(M)-S905L3卡刷和线刷固件包

    江苏移动MGV3000-YS(S)/YS(M)-S905L3卡刷和线刷固件包 固件特点: 1.修改dns,三网通用: 2.开放原厂固件屏蔽的市场安装和u盘安装apk: 3.无开机广告,无系统更新,不在 ...

最新文章

  1. 基于深度学习的可疑活动视频分析
  2. 多线程基础(五)NSThread线程通信
  3. Atitit.数据操作dsl 的设计 ---linq 方案
  4. 在中国,程序员是青春饭吗?
  5. c# select标签绑定枚举,并以Description做Text显示
  6. 深入探究Spark -- Cluster Manger部署(最常用为YARN实现Task Scheduler)
  7. How to install plugin for Eclipse from .zip
  8. java向上转型不能调用子类独有的方法
  9. D5000工作站服务器型号,【戴尔Precision T5820参数】戴尔Precision T5820系列工作站参数-ZOL中关村在线...
  10. 程序员最想得到的十大证件
  11. android listview 增加单选 复选,ListView里面加入CheckBox如何实现单选?
  12. springmvc的控制器是不是单例模式,如果是,有什么问题,怎么解决?
  13. aclocal-1.16:未找到命令
  14. css案例_shortcut快捷导航样式
  15. HBuilderX真机模拟uni-app项目 + 上架应用市场
  16. java技能要求_Java工程师需要具备的25个技能
  17. Linux shell脚本执行后出现语法错误: 未预期的文件结尾
  18. pytorch中DataLoader的num_workers
  19. png 微软ppt 透明度_超实用!PPT微软官方教程
  20. 原来当年发葫芦娃种子的才是真正的老司机

热门文章

  1. Golang 高性能高可用消息队列框架go-nsq使用
  2. 【已更新至GitHub】数字图像处理常用图片【免费】【不失效】
  3. 羧基修饰青色乳胶微球100nm
  4. 三肽-29/胶原三肽/Tripeptide-29
  5. 拯救者安装Ubuntu18.04后屏幕亮度调解决
  6. C51单片机的省电模式
  7. 动网论坛防止群发的解决方法
  8. java可以编写siri_揭秘Siri,苹果发布论文阐释语音助手设计想法
  9. mysql rpm_MySQL官网下载安装.rpm包
  10. 案例篇13—css3实现可爱的熊猫和竹子(附源码)