目录

rocketmq和kafka对比

基本角色

整体架构

核心参数

消息存储文件 commitLog

索引文件 IndexFile

队列 consumerqueue

零拷贝刷盘

consumer消费消息方式

消息过滤

顺序消息

延时消息


rocketmq和kafka对比

RocketMQ Kafka
设计定位

非日志的可靠消息传送

例如:订单,交易,消息推送等

系统间的数据流管道

例如:日志收集(主要),监控数据等

开发语言 Java Scala
社区活跃度
持久化 磁盘文件 磁盘文件
集群管理 name server zookeeper
选主方式 不支持自动切换 从 ISR 中自动选举 leader
主从切换 不支持自动切换

自动切换。N个副本,允许N-1个失效

数据可靠性 很好 很好
消息写入性能 很好 非常高
性能的稳定性 队列较多时性能稳定 队列/分区多时性能不稳定,明显下降
消息堆积能力 非常好 非常好
消息投递实时性 毫秒级。支持pull、push两种方式 毫秒级。由consumer轮询间隔决定
顺序消费

支持顺序消费

顺序消费情况下,消息失败时队列会暂停

支持顺序消费

一台broker宕机后,会产生消息乱序

延时消息 开源版支持18个级别 不支持
事务消息 支持 不支持
Broker端消息过滤 支持。通过tag过滤,类似子topic 不支持
消息查询

支持根据MessageId、MessageKey、时间查询

不支持
... ... ...

基本角色

角色名称 角色描述
NameServer

注册中心,用于管理Broker,类似于zookeeper集群的功能;

无状态,没有负载均衡的概念,集群中每个节点都包含整个Broker集群的全量信息,这样即使一个NameServer挂了,也对整体没有影响;

Broker

一个broker就是一个rocketmq进程。是rocketmq的核心,负责存储消息、转发消息等;

存储消息的元数据,包括主题、消费者组、消费进度偏移量等;

Broker启动过后,会定期地向NameServer发送心跳,以证明在线;

Producer

消息的发送者,每30秒从NameServer中获取Broker集群的信息;

Producer之间无状态,互相之间不感知;

Consumer

消息的消费者,每30秒从NameServer中获取Broker集群的信息;

然后Consumer会向Broker Master发送心跳确定它是否在线;

Consumer直接无状态,互相之间不感知;

整体架构

Broker 启动时会向 NameServer 注册 Broker 的信息,包括主题、消费之偏移量、队列、ip port等。每个 NameServer 都会保存整个Broker集群的信息。

Producer 和 Consumer 启动时会也向 NameServer 注册,且每隔30秒从 NameServer 获取 Broker 的消息。

每个 topic 对应一个 commitlog,里面存储真正的消息内容。

每个 topic 默认包含4个队列,每个队列对应一个持久化文件queuelog,它存储的是每条消息在commitlog中的位置等信息。

每个 topic 会创建一个 consumerOffset.json,用于保存每个consumer消费的偏移量。这个偏移量是consumer消费完消息之后,主动上报给broker的,避免消息重复推送。这个offset偏移量也对应了它在 consumerqueue 中的位置,消费消息的时候也会根据它来找到 consumerqueue 中的记录。

Producer 发送消息时,会根据消息的总量,取模上集群中的队列数量,轮训地去发送(非顺序消息的情况下)。

Consumer 内部有一个负载均衡的算法,大致机理是根据消费者数量取模上队列的数量,来选择对于的队列。

各个模块之间的通信基于 Netty,默认使用 epoll 方式。

核心参数

Broker参数

参数名称 参数描述
brokerClusterName 集群名称
brokerName broker名字,做集群时,slave的brokerName必须和master相同
brokerId broker为0时,表示master节点,其他都是slave
deleteWhen 例如:04。表示消息保存的时间超过期限后,在下次的凌晨4点删除
fileReservedTime 默认:48小时。表示消息保存48小时
brokeRole

集群中master和slave的消息同步方式;

有三个值:SYNC_MASTER, ASYNC_MASTER, SLAVE

flushDiskType

刷盘策略,将消息保存到磁盘中进行持久化

有两种方式:ASYNC_FLUSH (异步), SYNC_FLUSH (同步)

ASYNC_FLUSH:先将消息保存到缓冲区中,有可能丢失数据

SYNC_FLUSH:消息实时保存到磁盘,主从都保存完成才算成功,性能差

autoCreateTopicEnable 允许自动创建topic
enablePropertyFilter 支持消息过滤
storePathRootDir 消息持久化文件存储的根路径

Producer参数

参数名称 默认值 参数描述
group 发送者组
sendMessageTimeout 3000 发送消息的超时时间
retryTimesWhenSendFailed 2 发送失败的重试次数
maxMessageSize 4MB 消息的最大长度

Consumer参数

参数名称 默认值 参数描述
topic 消息主题
consumerGroup 消费者组。一条消息只能被同一个消费者组中的一个消费者消费。
selectorType TAG 消息过滤的方式:TAG、SQL92
selectorExpression * 具体的消息过滤逻辑
consumeMode CONCURRENTLY 消费模式:CONCURRENTLY、ORDERLY
messageModel CLUSTERING 消息模式:CLUSTERING(集群)、BROADCASTING(广播)
consumeThreadMax 64 消费消息的最大线程数
maxReconsumeTimes -1 消息重新消费的最大次数。-1相当于16

消息存储文件 commitLog

rocketmq 的消息存储在 commitLog 中,broker 默认会给 commitLog 申请 1G的磁盘空间。这是为了保证存储空间是有序的

如果 commitLog 已经满了,会继续创建第二个 commitLog,且它的文件名最后几位是上一个 commitLog 的最大偏移量 masOffset。查找的时候,如果第一个文件中没找到,那么会计算它的最大偏移量,获取下一个要查找的 commitLog 的名称。

每次申请1G的空间,虽然浪费了点磁盘空间,但是换取了快速的查找,非常值得!

为什么要申请连续存储空间呢?因为 rocketmq 中使用了零拷贝技术来读取文件,它可以极大地提提升读取文件的性能。而要用零拷贝技术,必须要连续的内存空间。

索引文件 IndexFile

索引文件是用来支持快速地查找消息的。

rocketmq 支持两种查询方式:根据key;根据time;

文件存储位置:store/index${fileName},fileName是以创建时的时间戳命名的。

单个 IndexFile 大小:40 + 500W*4 + 2000W *20 = 420000040 字节,约400MB。

一个 IndexFile 可以保存 2000W 个索引。

索引文件的结构由三部分组成 (磁盘空间是连续的):

索引文件头(IndexHeader 40字节) + 槽位(Slot 每个4字节) + 消息的索引内容(条目,每个20字节)

IndexFile 的底层实现为 hash索引, 它是在文件系统中实现类似 HashMap 的结构。

索引文件头:固定40个字节

槽位

槽位可以看作是一个长度500W的数组。

根据 Message Key 计算槽位的位置:

  1. 计算 Message Key 的 hash 值:keyHash = hashcode(key)
  2. 得到槽位值 Slot:slotPos = keyHash % 槽位数(500W)
  3. 计算 Slot 的实际磁盘位置:IndexHeaderSize(40 Byte) + slotPos * hashSlotSize(4 Byte)

槽位Slot中存储的是最新条目的数值。例如此时一个key算出来的槽位是4,它是第100个条目,那么槽位中存的就是100。

既然取hash,那必然可能出现hash冲突,hash冲突时,条目使用链表来连接。如果此时第二个 Message Key 算出来的槽位也是4,假设它是第200个条目,那么此时槽位中的内容就更新为200。

这种数组加链表的方式,跟HashMap很相似。

消息的索引内容 (条目)

每个条目占用20个字节,它包含4个部分:

(key的hash值) + (commitLog偏移量) + (时间戳) + (上一个相同keyHash的条目的数值)

  • KeyHash 是根据 Message Key 算出来的 hash 值;
  • CommitLogOffset 保存的就是此 Message Key 的消息在 commitLog 中的位置;
  • timestamp 用于按照时间来查询;
  • prevIndex 是指当两个不同的 Message Key 计算出来的槽位相同 (hash碰撞) 时,新的条目中存储上一个条目的数值。比如当第100条目和第200条目产生hash碰撞,则第200条目的prevIndex为100,第100条目的prevIndex为0,查询的时候先查新数据,如果不是我们想要的数据,一直查询到prevIndex为0为止。

slot相当于一个数组,而相同 slot 的条目,相当于是一个链表。最终通过数组+链表的形式,保存消息的索引,跟 HashMap 的实现原理相似。

由于每个 slot 和 条目是定长的,所以只要得到数值,就可以很方便地计算它的实际物理地址。

通过 Message Key 查询时,计算 slot 值,取出此slot对应的条目在commitLog上的消息即可 (可能有多个,这个时候只要在客户端再根据 Message Key 匹配一下就行了) 。

队列 consumerqueue

rocketmq 默认会给每个 topic 创建4个队列,每个队列有一个 queuelog。

queuelog 可以看成是一个数组,当中每个元素可以理解为数据库的一条记录,它包含三个字段:commitLogOffset, msgSize, tagsCode。

  • commitLogOffset:消息在 commitlog 中的偏移量(开始的位置)
  • msgSize:消息的大小,和 commitLogOffset 配合就可以获取整条消息的内容
  • tagsCode:tag 的 hashcode

consumerqueue 可以看做是 commitlog 的索引文件。查找的时候就像数组那样根据索引下标查找元素,然后根据 commitLogOffset 和 msgSize,从 commitlog 中获取消息内容。

consumerqueue 中存储的内容是定长的,每条内容20个字节。commitLogOffset 8字节,msgSize 4字节,tagsCode 8字节。

Consumer 消费消息的时候,先从 consumerOffset.json 中获取待消费的消息 offset,然后从 consumequeue 中得到 commitLogOffset 和 msgSize,再去 commitlog 中读取消息内容。

通过 tag 过滤也是通过 consumerqueue 来实现的。消费的时候,先对 tag 做 hash运算,看跟 consumerqueue 中存储的 tagsCode 是否匹配,来进行过滤。

零拷贝刷盘

以文件下载为例,服务端要做的事情是:将磁盘中的文件不作修改地从已连接的 socket 发出去。

操作系统底层 I/O 过程如下图:

从图中可以看到,整个流程包含了4次拷贝,以及2次用户空间和内核空间的切换,比较耗费性能和CPU资源。零拷贝就是为了解决这种低效性。

零拷贝的 IO 流程:

用户空间映射页缓存,通知页缓存直接将数据共享到 Socket 缓冲区,这样就避免了2次用户空间和内核空间的切换,并且少了两次拷贝。这样可以很大程度地提升磁盘 IO 的性能,可以大致接近内存的速度。

要用到零拷贝技术,内存空间就必须是连续的,所以 rocketmq 在创建 commitlog 时直接申请 1G 的连续磁盘空间。

consumer消费消息方式

RocketMQ 是基于发布订阅模型的消息中间件。所谓的发布订阅就是说,consumer 订阅了 broker 上的某个 topic,当 producer 发布消息到 broker 上的该 topic 时,consumer 就能收到该条消息。

Pull (MQPullConsumer)

consumer 主动去 broker 中拉取消息,取消息的逻辑需要用户自己来写,而且需要管理 offset,比较复杂,使用 rocketmq 时一般不用这种方式。

Push (MQPushConsumer)

此模式下 broker 收到消息后会主动推送给 consumer。

对于PushConsumer,由用户注册 MessageListener 来消费消息。

RocketMQ 的 push 方式,在底层是通过pull实现的,并没有实现真正的 push。

consumer 开启向 broker 开启长轮询来批量拉取消息。它会不停地问 broker 有没有消息,有的话就拉过来,看起来就像是 broker 有了消息后 push 给 consumer 的。

消息过滤

TAG方式:Consumer 消费消息的时候,先从 consumerOffset.json 中获取待消费的消息在 consumequeue 中的 offset,根据 consumequeue 中保存的 tagsCode(tag对应的hashCode)进行过滤。然后根据 consumequeue 中的 commitLogOffset 和 msgSize,从 commitlog 中获取真正的消息内容,在客户端根据 tag 的真实内容再过滤一次,防止 hash 冲突导致它不是真正要消费的消息。

还有一种是 SQL92 方式。

rocketmq 还支持传入自定义的 java 类,把它的文件流传到服务器上,服务器上把文件流解析出来构造成一个类,这个类会被内部的引擎去执行。这样的话,我们按照需求,自己来写方法过滤消息。

顺序消息

顺序消息需要 Producer 和 Consumer 都保证顺序。Producer 需要保证消息被路由到正确的队列,Consumer 需要保证每个队列的数据只有一个线程消息。

rocketmq 默认给一个 topic 创建4个队列,要保证消息有序,只要将消息发送到同一个队列中去。所以取一个公共字段 (比如ID),对队列数量取模,然后发送到同一个队列中。

延时消息

常用场景:订单超时时取消订单。当下了一个订单后,就发送一个延时消息,比如30分钟。30分钟后处理次消息,里面去查看订单是否付款了,如果没付款就自动取消订单。

rocketmq 相比于其它消息中间件,其中一个优势就是支持延时消息。

开源版一共有18个延迟时间间隔:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。

要发送延迟消息,只要在发送消息时,设置消息延迟级别 (delayTimeLevel) 即可。

  • 设置消息延迟级别等于0时,则该消息为非延迟消息。
  • 设置消息延迟级别大于等于1且小于等于18时,消息延迟特定时间,如:设置消息延迟级别等于1,则延迟1s;设置消息延迟级别等于3,则延迟10s,以此类推。
  • 设置消息延迟级别大于18时,则该消息延迟级别为18

延迟消息有额外的主题:SCHEDULE_TOPIC_XXXX

内部遍历所有延迟级别,为每个延迟级别创建定时任务,如果发现延迟的时间到了,就发送消息到真实的主题下。

又创建了一个定时任务,用于持久化每个队列消费的偏移量。持久化的频率由 flushDelayOffsetInterval 属性进行配置,默认为10秒。

rocketmq原理解析相关推荐

  1. RocketMQ原理解析-producer 4.发送分布式事物消息

    2019独角兽企业重金招聘Python工程师标准>>> RocketMQ原理解析-producer 4.发送分布式事物消息 博客分类: MQ 为什么消息要具备事务能力 还是比较清晰的 ...

  2. RocketMQ原理解析-Consumer

    consumer 1.启动 有别于其他消息中间件由broker做负载均衡并主动向consumer投递消息,RocketMq是基于拉模式拉取消息,consumer做负载均衡并通过长轮询向broker拉消 ...

  3. RocketMQ实战与原理解析

    网站 更多书籍点击进入>> CiCi岛 下载 电子版仅供预览及学习交流使用,下载后请24小时内删除,支持正版,喜欢的请购买正版书籍 电子书下载(皮皮云盘-点击"普通下载" ...

  4. RocketMQ原理刨析

    RocketMQ原理 本文类容基本上和RocketMQ design类似,并无过多的改动.主要内容包括: RocketMQ概述,主要是概念上的一些内容 RocketMQ的特点以及消息发送.消费模型 R ...

  5. RocketMQ原理及架构

    RocketMQ原理及架构 RocketMQ 核心组件图 RocketMQ是开源的消息中间件,它主要由NameServer,Producer,Broker,Consumer四部分构成. NameSer ...

  6. 分布式架构原理解析常见问题解决

    大家觉得写还可以,可以点赞.收藏.关注一下吧! 也可以到我的个人博客参观一下,估计近几年都会一直更新!和我做个朋友吧!https://motongxue.cn 分布式架构原理解析常见问题解决 1. 分 ...

  7. Spark Shuffle原理解析

    Spark Shuffle原理解析 一:到底什么是Shuffle? Shuffle中文翻译为"洗牌",需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节 ...

  8. 秋色园QBlog技术原理解析:性能优化篇:用户和文章计数器方案(十七)

    2019独角兽企业重金招聘Python工程师标准>>> 上节概要: 上节 秋色园QBlog技术原理解析:性能优化篇:access的并发极限及分库分散并发方案(十六)  中, 介绍了 ...

  9. Tomcat 架构原理解析到架构设计借鉴

    ‍ 点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 Tomcat 架构原理解析到架构设计借鉴 Tomcat 发展这 ...

最新文章

  1. 语音识别技术迎风发展,未来五年规模将近300亿
  2. GDB 使用手册(谷歌翻译)
  3. shareinstall之Android SDK集成
  4. HDU3923-Invoker-polya n次二面体
  5. pythonpandas设置索引_python – pandas:使用(row,col)索引设置值
  6. 记一次 javax.xml.soap.SOAPException:
  7. python中输入17=x会引起错误_python新手常犯的17个错误
  8. JAVA编程相关:eclipse如何导入已有工程
  9. wordpress linux伪静态,nginx下wordpress伪静态设置
  10. 求一个数的二进制逆序之后所对应的数
  11. GRE tunnel 2
  12. 分享活动报名收费的微信小程序制作功能介绍_瑜伽健身房培训报名小程序开发介绍
  13. 计算机开机只显示,电脑开机黑屏只显示光标怎么办?来看看几种原因分析及解决方法!...
  14. 繁簡替互換(SQL)
  15. 怎么判断一篇微信公众号文章阅读量是不是刷上来的?
  16. LAN8720A芯片
  17. 【C语言网】C语言基础题集训练详解(一)
  18. 开源软件和开源社区的反思
  19. 数字孪生风机设备,智慧风电 3D 可视化智能运维
  20. 基于单片机控制的交通灯系统设计

热门文章

  1. 持续引领产业发展,华为云桌面连续6年位居国内市占率第一
  2. 整理下 android 保活 防被杀 守护进程
  3. 第三方网站实现钉钉(DingTalk)扫码登陆(Vue+SpringBoot)
  4. android 切换声道,如何将您的Android手机切换为单声道(以便您可以戴一副耳塞) | MOS86...
  5. 网页中常用的web安全字体
  6. 什么是机器学习PAI
  7. 使用Pelican在Github(国外线路访问)和Coding(国内线路访问)同步托管博客
  8. ios 抓娃娃开发_我爱抓娃娃appiOS版下载_我爱抓娃娃iOS版下载_18183软件下载
  9. 领导提升团队执行力9个方法
  10. 容器安全检测工具之一:docker bench