一、基础概念

1、消息模型

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,
Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。 ConsumerGroup 由多个Consumer 实例构成。

2、消息生产者

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。生产者中,会把同一类Producer组成一个集合,叫做生产者组,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

3、消息消费者

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
拉取式消费的应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控
制。一旦获取了批量消息,应用就会启动消费过程。
推动式消费模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。消费者同样会把同一类Consumer组成一个集合,叫做消费者组,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的
是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费
(Clustering)和广播消费(Broadcasting)。 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

4、主题

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。 同一个Topic下的数据,会分片保存到不同的Broker上,而每一个分片单位,就叫做MessageQueue。 MessageQueue是生产者发送消息与消费者消费消息的最小单位。

5、代理服务器

消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
Broker Server是RocketMQ真正的业务核心,包含了多个重要的子模块:
Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快
速查询。
而Broker Server要保证高可用需要搭建主从集群架构。RocketMQ中有两种Broker架构模式:
普通集群:
这种集群模式下会给每个节点分配一个固定的角色,master负责响应客户端的请求,并存储消息。
slave则只负责对master的消息进行同步保存,并响应部分客户端的读请求。消息同步方式分为同步同步和异步同步。这种集群模式下各个节点的角色无法进行切换,也就是说,master节点挂了,这一组Broker就不可用了。
Dledger高可用集群:
Dledger是RocketMQ自4.5版本引入的实现高可用集群的一项技术。这个模式下的集群会随机选出一个节点作为master,而当master节点挂了后,会从slave中自动选出一个节点升级成为master。
Dledger技术做的事情:1、接管Broker的CommitLog消息存储 2、从集群中选举出master节点 3、完成master节点往slave节点的消息同步。

6、NameServer

名称服务充当路由消息的提供者。Broker Server会在启动时向所有的Name Server注册自己的服务信息,并且后续通过心跳请求的方式保证这个服务信息的实时性。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
这种特性也就意味着NameServer中任意的节点挂了,只要有一台服务节点正常,整个路由服务就不会有影响。当然,这里不考虑节点的负载情况。

7、消息

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题Topic。 RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。 并且Message上有一个为消息设置的标志,Tag标签。用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

二、消息存储

1、什么时候存储消息

分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。
1. MQ收到一条消息后,需要向生产者返回一个ACK响应,并将消息存储起来。
2. MQ Push一条消息给消费者后,等待消费者的ACK响应,需要将消息标记为已消费。如果没有标记为消费,MQ会不断的尝试往消费者推送这条消息。
3. MQ需要定期删除一些过期的消息,这样才能保证服务一直可用。

2、消息存储介质

RocketMQ采用的是类似于Kafka的文件存储机制,即直接用磁盘文件来保存消息,而不需要借助
MySQL这一类索引工具。

3、消息存储结构

RocketMQ消息的存储分为三个部分:
CommitLog:存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog
由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名。
ConsumerQueue:存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前
MessageQueue被哪些消费者组消费到了哪一条CommitLog。
IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来
查找消息的方法不影响发送与消费消息的主流程。

4、刷盘机制

RocketMQ需要将消息存储到磁盘上,这样才能保证断电后消息不会丢失。同时这样才可以让存储的消 息量可以超出内存的限制。RocketMQ为了提高性能,会尽量保证磁盘的顺序写。消息在写入磁盘时,有两种写磁盘的方式,同步刷盘和异步刷盘
同步刷盘:
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功 的状态。
异步刷盘:
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大; 当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。 配置方式: 刷盘方式是通过Broker配置文件里的flflushDiskType 参数设置的,这个参数被配置成 SYNC_FLUSH、ASYNC_FLUSH中的 一个。

5、主从复制

如果Broker以一个集群的方式部署,会有一个master节点和多个slave节点,消息需要从Master复制到 Slave上。而消息复制的方式分为同步复制和异步复制。
同步复制:
同步复制是等Master和Slave都写入消息成功后才反馈给客户端写入成功的状态。 在同步复制下,如果Master节点故障,Slave上有全部的数据备份,这样容易恢复数据。但是同步复制会增大数据写入的延迟,降低系统的吞吐量。
异步复制:
异步复制是只要master写入消息成功,就反馈给客户端写入成功的状态。然后再异步的将消息复制给 Slave节点。 在异步复制下,系统拥有较低的延迟和较高的吞吐量。但是如果master节点故障,而有些数据没有完成复制,就会造成数据丢失。
配置方式:
消息复制方式是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成
ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。

6、负载均衡

Producer负载均衡
Producer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的目的。而由于

MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的broker上。同时生产者在发送消息时,可以指定一个MessageQueueSelector。通过这个对象来将消息发送到自己指定的MessageQueue上。这样可以保证消息局部有序。

Consumer负载均衡
Consumer也是以MessageQueue为单位来进行负载均衡。分为集群模式和广播模式。
1、集群模式
在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。
RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message
queue。 而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。 每次分配时,都会将MessageQueue和消费者ID进行排序后,再用不同的分配算法进行分配。内置的分配的算法共有六种,分别对应AllocateMessageQueueStrategy下的六种实现类,可以在consumer中直接set来指定。默认情况下使用的是最简单的平均分配策略。
2、广播模式
广播模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说。 而在实现上,就是在Consumer分配Queue时,所有Consumer都分到所有的Queue。

7、消息重试

首先对于广播模式的消息, 是不存在消息重试的机制的,即消息消费失败后,不会再重新进行发送,而只是继续消费新的消息。而对于普通的消息,当消费者消费消息失败后,你可以通过设置返回状态达到消息重试的结果。
如何让消息进行重试
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置。
有三种配置方式:
返回Action.ReconsumeLater-推荐
返回null
抛出异常
重试次数:
如果消息重试16次后仍然失败,消息将不再投递。转为进入死信队列。 另外一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。 然后关于这个重试次数,RocketMQ可以进行定制。例如通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2小时。

8、死信队列

当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。
死信队列的特征:
一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
死信队列中的消息不会再被消费者正常消费。
死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fifileReservedTime属性。超过
这个最长时间的消息都会被删除,而不管消息是否消费过。
通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。

9、消息幂等

1、幂等的概念
在MQ系统中,对于消息幂等有三种实现语义:
at most once 最多一次:每条消息最多只会被消费一次
at least once 至少一次:每条消息至少会被消费一次
exactly once 刚刚好一次:每条消息都只会确定的消费一次
这三种语义都有他适用的业务场景。
其中,at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。 而at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证。
而这个exactly once是MQ中最理想也是最难保证的一种语义,需要有非常精细的设计才行。RocketMQ 只能保证at least once,保证不了exactly once。所以,使用RocketMQ时,需要由业务系统自行保证消息的幂等性。
2、消息幂等的必要性
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务
端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会
收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
3、处理方式
从上面的分析中,我们知道,在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。
而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。
但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。

RocketMQ学习笔记(2)相关推荐

  1. RocketMQ学习笔记(7)----RocketMQ的整体架构

    1. RocketMQ主要的9个模块,如图: 2. 模块介绍 1. rocketmq-common:通用的常量枚举,基类方法或者数据结构,按描述的目标来分包,通俗易懂.报名有admin,consume ...

  2. JavaEE 企业级分布式高级架构师(二十)RocketMQ学习笔记(2)

    RocketMQ学习笔记 进阶篇 消息样例 普通消息 消息发送 发送同步消息 发送异步消息 单向发送消息 三种发送方式的对比 消费消息 顺序消息 如何保证顺序 顺序的实现 MessageListene ...

  3. RocketMQ学习笔记(8)----RocketMQ的Producer API简介

    在RocketMQ中提供了三种发送消息的模式: 1.NormalProducer(普通) 2.OrderProducer(顺序) 3.TransactionProducer(事务) 下面来介绍一下pr ...

  4. RocketMQ学习笔记

    RocketMQ学习笔记 文章目录 RocketMQ学习笔记 前言 为什么要使用消息队列? 解耦 异步 削峰 使用了消息队列会有什么缺点? 消息队列如何选型? 如何保证消息队列是高可用的? 如何保证消 ...

  5. RocketMQ学习笔记(持续更新)

    文章目录 前言 一.RocketMQ是什么? 1. 基本概念 2. 功能特性 3. 为什么选择RocketMQ? 二.架构设计 1.技术架构 2.部署架构 部署特点 三.安装启动 常用命令 四.简单实 ...

  6. RocketMQ学习笔记:基础知识和安装启动

    这是本人学习的总结,主要学习资料如下 马士兵教育 rocketMq官方文档 目录 1.架构 2.基本概念 3.安装和启动 3.1.命令行启动 3.1.1.启动Server 3.1.2.启动Broker ...

  7. RocketMQ学习笔记(二)

    第六章 RocketMQ生产者核心配置和核心知识讲解 第1集 消息队列RocketMQ4.X生产者核心配置讲解 简介:消息队列RocketMQ4.X核心配置讲解 生产者常见核心配置 compressM ...

  8. RocketMq学习笔记001---Kafka,ActiveMQ、RabbitMQ、RocketMQ消息中间件的对比

    分布式系统中,我们广泛运用消息中间件进行系统间的数据交换,便于异步解耦.现在开源的消息中间件有很多,前段时间我们自家的产品 RocketMQ (MetaQ的内核) 也顺利开源,得到大家的关注. 那么, ...

  9. RocketMQ学习笔记四之【DefaultMQPullConsumer使用与流程简单分析】

    我们首先看下DefaultMQPullConsumer使用例子: package com.swk.springboot.rocketmq;import java.util.HashMap; impor ...

最新文章

  1. 服务器越来越慢的原因及解决办法
  2. Java三大主流开源工作流引擎技术分析
  3. Windows cmd(DOS)命令窗口中echo命令ANSI转义显示彩色字或背景
  4. [BZOJ1007] [HNOI2008] 水平可见直线 (凸包)
  5. 快速排序的实现及优化
  6. Magic Leap有新动态!成立由斯蒂芬森领头的内容团队
  7. 嗨,你选择自学成才还是参加培训?
  8. Linux安装rpc监控系统资源
  9. Python的Django框架中forms表单类的使用方法详解
  10. 西门子smartclient怎么用_西门子200SMART PLC软件各功能怎么用?编程必备!
  11. 8647服务器装系统,今天重新安装了系统,麻烦请红夜鬼先生进来帮我看一下
  12. 文献阅读——Revisiting Semi-Supervised Learning with Graph Embeddings
  13. STM32F427/STM32F437高性能MCU微控制器介绍
  14. 继明当时明月在:万历十五年
  15. python脚本爬取今日百度热点新闻
  16. JavaScript案例之电影院电子选票
  17. 图解 | JavaScript的作用域和作用域链
  18. 【关于如何输出字符串指针指向字符串地址】
  19. ThreeJS 着色器打造震撼海洋动画
  20. Matlab .m脚本文件

热门文章

  1. 《使用机器视觉从照片中确定西瓜质量》论文笔记
  2. UG NX 12 草图设计
  3. m4a怎么转换成mp3格式?
  4. 三位水仙花数python代码
  5. javascript高级程序设计阅读收获(1.1)——javascript简短的历史回顾
  6. 网易2018校园招聘:重排数列 [python]
  7. react在线编辑Excel表格
  8. android svg路径动画,五、Android SVG动画
  9. Graph Convolutional Matrix Completion,GC-MC
  10. 岛屿最大面积 leetcode Java_LeetCode:岛屿的最大面积