文章目录

  • rocketMQ
    • 核心brocker
    • NameServer
    • 集群模式
    • 集群特点
    • 各集群间关系
      • Producer集群
    • 1.rocketMQ使用场景
    • 2.rocketmq 高可用机制
      • 通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘方式,主从之间配置成SYNC_MASTER的复制方式,这样即时有一台机器出故障,仍然能保证数据不丢,是个不错的选择
    • 持久化存储
      • 存储介质
      • 消息存储结构
      • 消息的存储和发送
    • message构成
    • 消息的主从复制
    • RocketMQ 负载均衡
    • 如何保证 RocketMQ 不丢失消息
    • 消息重试
    • 死信队列、消费幂等
  • RocketMQ 的消息出现重复
    • 消息幂等
  • RocketMQ 主从同步若干问题
    • 一个新的消费组订阅一个已存在的Topic主题时,消费组是从该Topic的哪条消息开始消费呢?

rocketMQ

核心brocker

含义Broker是具体提供业务的服务器,解释一:RocketMQ的核心逻辑是Broker。Broker是实际用于手法消息的功能单元。从RocketMQ使用者的角度来看,生产者通过接口将消息投递到Broker,消费者从Broker获取消息进行消费。RocketMQ提供了推拉结合的方式用于获取消息。解释二:单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer,顺带一提底层的通信和连接都是基于Netty实现的。Broker中分master和slave两种角色,每个master可以对应多个slave,但一个slave只能对应一个master,master和slave通过指定相同的Brokername,不同的BrokerId (master为0)成为一个组。master和slave之间的同步方式分为同步双写和异步复制,异步复制方式master和slave之间虽然会存在少量的延迟,但性能较同步双写方式要高出10%左右举例:邮局 。它是RocketMQ的核心,用于接收Producer发过来的         消息、以及处理Consumer的消费消息请求、消息的持久化存储、服务端过滤功能等另外,Broker中还存在一些非常重要的名词需要说明:内含a.Topic:区分消息的种类,一个发送者可以发送消息给一个或者多个Topic,一个消息的接受者可以订阅一个或者多个Topic消息。对于RokectMQ而言,Topic只是一个逻辑上的概念,真正的消息存储其实是在Topic中的Queue中。这要设计是为了消息的顺序消费,b.Message Queue:相当于是Topic的分区,用于并发发送和接受消息

NameServer

含义解释一:RocketMQ没有引入第三方服务依赖,消息队列内部的服务发现以及配置更新等,都借由Name Server来完成。从功能上来说,Name Server相当于一个轻量级简化版的Zookeeper,或者说提供了类似ZK的功能。Name Server的定位是维护RocketMQ全局相关配置,提供消息路由信息,除此之外并不包含过多复杂逻辑。因为其相对轻量级,一般一组Name Server集群可以服务多组Broker集群。Name Server Cluster是多个Name Server实例的统称,Name Server之间并无关联,互相也不同步信息。多个Name Server的存在是为了提供高可用服务,不同实例之间的数据信息同步则实际是在数据写入的时候保证的。一份配置或消息路由信息会写入所有Name Server实例中。解释二:相当于配置中心,维护Broker集群、Broker信息、Broker存活信息、主题与队列信息等。NameServer彼此之间不通信,每个Broker与集群内所有NameServer保持长连接通信机制1.Broker启动后需要完成一次将自己注册到NameServer的操作;随后每隔30秒时间定时向NameServer更新Topic路由信息2.Producer发送消息时,需要根据消息的Topic从本地缓存的获取路由信息。如果没有则更新路由信息,会从NameServer重新拉取,同时Producer会默认每隔30秒向NameServer拉取一次路由信息3.Consumer消费消息时,从NameServer获取的路由信息,并再完成客户端的负载均衡后,监听指定消息队列获取消息并进行消费

集群模式

单Master模式这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试多Master模式一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点是优点:配置简单,单个Master宕机或重启维护对应无影响,在磁盘配置为PAID10时,即时机器宕机不可恢复情况下,由于PAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高。缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。多Master多Slave模式(异步)每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式优缺点如下:优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备份不能自动切换为主机

集群特点

NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServerProducer与NameServer集群中的其中一个节点(随机选择)建立长连接,定时从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅信息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

各集群间关系

Producer集群

与nameserver的关系单个Producer和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,生产者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。与nameserver之间没有心跳。与broker的关系单个Producer和与其关联的所有broker保持长连接,并维持心跳。默认情况下消息发送采用轮询方式,会均匀发到对应Topic的所有queue中。最佳实践1.一个应用尽可能只使用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。2.每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过 Topic,key 来查询返条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。3.消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。4.对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。5.某些应用如果不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。

Consumer集群

与nameserver的关系单个Consumer和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。与nameserver之间没有心跳。与broker的关系单个Consumer和与其关联的所有broker保持长连接,并维持心跳,失去心跳后,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费。最佳实践1.Consumer 数量要小于等于queue的总数量,由于Topic下的queue会被相对均匀的分配给Consumer,如果 Consumer 超过queue的数量,那多余的 Consumer 将没有queue可以消费消息。2.消费过程要做到幂等(即消费端去重),RocketMQ为了保证性能并不支持严格的消息去重。3.尽量使用批量方式消费,RocketMQ消费端采用pull方式拉取消息,通过consumeMessageBatchMaxSize参数可以增加单次拉取的消息数量,可以很大程度上提高消费吞吐量。另外,提高消费并行度也可以通过增加Consumer处理线程的方式,对应参数consumeThreadMin和consumeThreadMax。4.消息发送成功或者失败,要打印消息日志。

1.rocketMQ使用场景

异步处理

解耦

削峰填谷

数据同步

2.rocketmq 高可用机制

RocketMQ分布式集群是通过Master和Slave的配合达到高可用性
在Broker的配置文件中,参数brokerId的值为0表明这个Brocker是Master,大于0表明这个Brocker是Slave,同时BrokerRole参数也会说明这个Brocker是Master还是Slave

当Master不可用或者繁忙的时候,Consumer会被自动切换从Slave读。有了自动切换Consumer这种机制,当一个Master角色机出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序,这就达到了消费端的高可用性

当一个Brocker组的Master不可用后,其他的Master仍然可用,Procucer仍然可以发送消息,RocketMq目前还不支持把Slave自动转成Master,如果集器资源不足,需要把Slave转成Master,则要手动停止Slave角色的Brocker,更改配置文件,用新的配置文件启动Brocker

通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘方式,主从之间配置成SYNC_MASTER的复制方式,这样即时有一台机器出故障,仍然能保证数据不丢,是个不错的选择

异步刷盘+主从同步复制

持久化存储

分布式队列有高可靠性的要求,所以数据要进行持久化存储

  • 1,消息生产者发送消息到MQ。
  • 2,MQ收到消息,将消息进行持久化,即在存储系统中新增一条记录。
  • 3,返回ACK确认消息给生产者。
  • 4,然后MQ推送消息给对应的消费者,等待消费者返回ACK。
  • 5,如果消息消费者在指定时间内成功返回ACK,那么MQ认为消息消费成功,在存储系统中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新推送消息,重复执行4、5、6步骤。
  • 6,MQ删除消息

存储介质

关系型数据库DB
文件系统

消息存储结构

RocketMQ 消息的存储是由 ConsumeQueue 和 CommitLog 配合完成的,消息真正的物理存储文件是 CommitLog,ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个 Topic 下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件。

CommitLog:存储消息的元数据
ConsumerQueue:存储消息在 CommitLog 的索引
IndexFile:为了消息查询提供了一种通过 key 或时间区间来查询消息的方法,这种通过 IndexFile 来查找消息的方法不影响发送与消费消息的主流程

消息的存储和发送

message构成

topic:主题名称

tag:消息tag,用于消息过滤对消息的整体分类,比如 topic为物流跟踪轨迹 ,轨迹包含 揽收 出库 入库 派送 签收,可以分别给这些相同topic不同类型的数据打标签分类解析处理

keys:message索引键,多个用空格隔开,rocketmq可以根据这些key快速检索到消息对消息关键字的提取方便查询,比如一条消息某个关键字是 运单号,之后我们可以使用这个运单号作为关键字进行查询

waitstoremsgok:消息发送时是否等消息存储完成后再返

delaytimelevel:消息延迟级别,用于定时消息或消息重

user property:自定义消息属性

消息的主从复制

同步复制方式等Master和Slave均写 成功后反馈给客户端写成功状态
异步复制方式是只要Master写成功 即可反馈给客户端写成功状态

RocketMQ 负载均衡

Producer负载均衡
Consumer负载均衡

如何保证 RocketMQ 不丢失消息

生产阶段
生产者(Producer) 通过网络发送消息给 Broker,当 Broker 收到之后,将会返回确认响应信息给 Producer。所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失。

Broker 存储阶段
保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。

集群部署

为了保证可用性,Broker 通常采用一主(master)多从(slave)部署方式。为了保证消息不丢失,消息还需要复制到 slave 节点。

默认方式下,消息写入 master 成功,就可以返回确认响应给生产者,接着消息将会异步复制到 slave 节点。

结合生产阶段与存储阶段,若需要严格保证消息不丢失,broker 需要采用如下配置:

master 节点配置
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER

slave 节点配置
brokerRole=slave
flushDiskType = SYNC_FLUSH

消费阶段
消费者从 broker 拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态给 Broker。

如果 Broker 未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。

消息重试

在使用顺序消息时,务必保证应用能够即时监控并处理消费失败的情况,避免阻塞现象的发生
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性
消费失败后,重试配置方式
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的视线中明确进行配置
消费失败后,不重试配置方式
集群消费方式下,消息失败后期望消息不充实,需要捕获消费逻辑中可能抛出的异常,最终返回Action.CommitMessage,此后这条消息将不会在重试

返回 Action.ReconsumerLater(推荐)

消息队列RocketMQ 允许Consumer启动的时候设置最大重试次数

死信队列、消费幂等

消费者在正常情况下无法正确地消费该消费
有效期与正常消息相同,均为 3 天,3 天后会被自动删除。

RocketMQ 的消息出现重复

发送时消息重复(网络闪断或者客户端宕机)
投递时消息重复(客户端给服务端反馈应答的时候网络闪断)
负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)

消息幂等

根据业务上的唯一 Key 对消息做幂等处理
以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置

RocketMQ 主从同步若干问题

RocketMQ 的主从同步机制如下:

A. 首先启动Master并在指定端口监听;
B. 客户端启动,主动连接Master,建立TCP连接;
C. 客户端以每隔5s的间隔时间向服务端拉取消息,如果是第一次拉取的话,先获取本地commitlog文件中最大的偏移量,以该偏移量向服务端拉取消息;
D. 服务端解析请求,并返回一批数据给客户端;
E. 客户端收到一批消息后,将消息写入本地commitlog文件中,然后向Master汇报拉取进度,并更新下一次待拉取偏移量;
F. 然后重复第3步;

RocketMQ主从同步一个重要的特征:主从同步不具备主从切换功能,即当主节点宕机后,从不会接管消息发送,但可以提供消息读取。

主,从服务器都在运行过程中,消息消费者是从主拉取消息还是从从拉取?
RocketMQ主从同步架构中,如果主服务器宕机,从服务器会接管消息消费,此时消息消费进度如何保持,当主服务器恢复后,消息消费者是从主拉取消息还是从从服务器拉取,主从服务器之间的消息消费进度如何同步?

RocketMQ主从读写分离机制
RocketMQ的主从同步,在默认情况下RocketMQ会优先选择从主服务器进行拉取消息,并不是通常意义的上的读写分离,那什么时候会从拉取呢?

温馨提示:本节同样不会详细整个流程,只会点出其关键点,如果想详细了解消息拉取、消息消费等核心流程,建议大家查阅笔者所著的《RocketMQ技术内幕》。

在RocketMQ中判断是从主拉取,还是从从拉取的核心代码如下:

DefaultMessageStore#getMessage

long diff = maxOffsetPy - maxPhyOffsetPulling; // @1
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); // @2
getResult.setSuggestPullingFromSlave(diff > memory); // @3
代码@1:首先介绍一下几个局部变量的含义:

maxOffsetPy
当前最大的物理偏移量。返回的偏移量为已存入到操作系统的PageCache中的内容。
maxPhyOffsetPulling
本次消息拉取最大物理偏移量,按照消息顺序拉取的基本原则,可以基本预测下次开始拉取的物理偏移量将大于该值,并且就在其附近。
diff
maxOffsetPy与maxPhyOffsetPulling之间的间隔,getMessage通常用于消息消费时,即这个间隔可以理解为目前未处理的消息总大小。
代码@2:获取RocketMQ消息存储在PageCache中的总大小,如果当RocketMQ容量超过该阔值,将会将被置换出内存,如果要访问不在PageCache中的消息,则需要从磁盘读取。

StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
返回当前系统的总物理内存。参数
accessMessageInMemoryMaxRatio
设置消息存储在内存中的阀值,默认为40。
结合代码@2这两个参数的含义,算出RocketMQ消息能映射到内存中最大值为40% * (机器物理内存)。

代码@3:设置下次拉起是否从从拉取标记,触发下次从从服务器拉取的条件为:当前所有可用消息数据(所有commitlog)文件的大小已经超过了其阔值,默认为物理内存的40%。

那GetResult的suggestPullingFromSlave属性在哪里使用呢?

PullMessageProcessor#processRequest

if (getMessageResult.isSuggestPullingFromSlave()) { // @1
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) { // @2
case ASYNC_MASTER:
case SYNC_MASTER:
break;
case SLAVE:
if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
break;
}

if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) { // @3
// consume too slow ,redirect to another machine
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
}
// consume ok
else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
}
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
代码@1:如果从commitlog文件查找消息时,发现消息堆积太多,默认超过物理内存的40%后,会建议从从服务器读取。

代码@2:如果当前服务器的角色为从服务器:并且slaveReadEnable=true,则忽略代码@1设置的值,下次拉取切换为从主拉取。

代码@3:如果slaveReadEnable=true(从允许读),并且建议从从服务器读取,则从消息消费组建议当消息消费缓慢时建议的拉取brokerId,由订阅组配置属性whichBrokerWhenConsumeSlowly决定;如果消息消费速度正常,则使用订阅组建议的brokerId拉取消息进行消费,默认为主服务器。如果不允许从可读,则固定使用从主拉取。

温馨提示:请注意broker服务参数slaveReadEnable,与订阅组配置信息:whichBrokerWhenConsumeSlowly、brokerId的值,在生产环境中,可以通过updateSubGroup命令动态改变订阅组的配置信息。

如果订阅组的配置保持默认值的话,拉取消息请求发送到从服务器后,下一次消息拉取,无论是否开启slaveReadEnable,下一次拉取,还是会发往主服务器。

上面的步骤,在消息拉取命令的返回字段中,会将下次建议拉取Broker返回给客户端,根据其值从指定的broker拉取。

消息拉取实现PullAPIWrapper在处理拉取结果时会将服务端建议的brokerId更新到broker拉取缓存表中。

消息消费进度同步机制
从上面内容可知,主从同步引入的主要目的就是消息堆积的内容默认超过物理内存的40%,则消息读取则由从服务器来接管,实现消息的读写分离,避免主服务IO抖动严重。那问题来了,主服务器宕机后,从服务器接管消息消费后,那消息消费进度存储在哪里?当主服务器恢复正常后,消息是从主服务器拉取还是从从服务器拉取?主服务器如何得知最新的消息消费进度呢?

RocketMQ消息消费进度管理(集群模式):
集群模式下消息消费进度存储文件位于服务端${ROCKETMQ_HOME}/store/config/consumerOffset.json。消息消费者从服务器拉取一批消息后提交到消费组特定的线程池中处理消息,当消息消费成功后会向Broker发送ACK消息,告知消费端已成功消费到哪条消息,Broker收到消息消费进度反馈后,首先存储在内存中,然后定时持久化到consumeOffset.json文件中。备注:关于消息消费进度管理更多的实现细节,建议查阅笔者所著的《RocketMQ技术内幕》。

我们先看一下客户端向服务端反馈消息消费进度时如何选择Broker。
因为主服务的brokerId为0,默认情况下当主服务器存活的时候,优先会选择主服务器,只有当主服务器宕机的情况下,才会选择从服务器。

既然集群模式下消息消费进度存储在Broker端,当主服务器正常时,消息消费进度文件存储在主服务器,那提出如下两个问题:
1)消息消费端在主服务器存活的情况下,会优先向主服务器反馈消息消费进度,那从服务器是如何同步消息消费进度的。
2)当主服务器宕机后则消息消费端会向从服务器反馈消息消费进度,此时消息消费进度如何存储,当主服务器恢复正常后,主服务器如何得知最新的消息消费进度。

为了解开上述两个疑问,我们优先来看一下Broker服务器在收到提交消息消费进度反馈命令后的处理逻辑:

客户端定时向Broker端发送更新消息消费进度的请求,其入口为:RemoteBrokerOffsetStore#updateConsumeOffsetToBroker,该方法中一个非常关键的点是:选择broker的逻辑,如下所示:
在这里插入图片描述
如果主服务器存活,则选择主服务器,如果主服务器宕机,则选择从服务器。也就是说,不管消息是从主服务器拉取的还是从从服务器拉取的,提交消息消费进度请求,优先选择主服务器。服务端就是接收其偏移量,更新到服务端的内存中,然后定时持久化到${ROCKETMQ_HOME}/store/config/consumerOffset.json。

经过上面的分析,我们来讨论一下这个场景:

消息消费者首先从主服务器拉取消息,并向其提交消息消费进度,如果当主服务器宕机后,从服务器会接管消息拉取服务,此时消息消费进度存储在从服务器,主从服务器的消息消费进度会出现不一致?那当主服务器恢复正常后,两者之间的消息消费进度如何同步?

3.2.1 从服务定时同步主服务器进度
在这里插入图片描述
如果Broker角色为从服务器,会通过定时任务调用syncAll,从主服务器定时同步topic路由信息、消息消费进度、延迟队列处理进度、消费组订阅信息。

那问题来了,如果主服务器启动后,从服务器马上从主服务器同步消息消息进度,那岂不是又要重新消费?

其实在绝大部分情况下,就算从服务从主服务器同步了很久之前的消费进度,只要消息者没有重新启动,就不需要重新消费,在这种情况下,RocketMQ提供了两种机制来确保不丢失消息消费进度。

第一种,消息消费者在内存中存在最新的消息消费进度,继续以该进度去服务器拉取消息后,消息处理完后,会定时向Broker服务器反馈消息消费进度,在上面也提到过,在反馈消息消费进度时,会优先选择主服务器,此时主服务器的消息消费进度就立马更新了,从服务器此时只需定时同步主服务器的消息消费进度即可。

第二种是,消息消费者在向主服务器拉取消息时,如果是是主服务器,在处理消息拉取时,也会更新消息消费进度。

3.2.2 主服务器消息拉取时更新消息消费进度
主服务器在处理消息拉取命令时,会触发消息消费进度的更新,其代码入口为:PullMessageProcessor#processRequest

boolean storeOffsetEnable = brokerAllowSuspend; // @1
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; // @2
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
代码@1:首先介绍几个局部变量的含义:

brokerAllowSuspend:broker是否允许挂起,在消息拉取时,该值默认为true。
hasCommitOffsetFlag:消息消费者在内存中是否缓存了消息消费进度,如果缓存了,该标记设置为true。
如果Broker的角色为主服务器,并且上面两个变量都为true,则首先使用commitOffset更新消息消费进度。

1、主,从服务器都在运行过程中,消息消费者是从主拉取消息还是从从拉取?
答:默认情况下,RocketMQ消息消费者从主服务器拉取,当主服务器积压的消息超过了物理内存的40%,则建议从从服务器拉取。但如果slaveReadEnable为false,表示从服务器不可读,从服务器也不会接管消息拉取。

2、当消息消费者向从服务器拉取消息后,会一直从从服务器拉取?
答:不是的。分如下情况:
1)如果从服务器的slaveReadEnable设置为false,则下次拉取,从主服务器拉取。
2)如果从服务器允许读取并且从服务器积压的消息未超过其物理内存的40%,下次拉取使用的Broker为订阅组的brokerId指定的Broker服务器,该值默认为0,代表主服务器。
3)如果从服务器允许读取并且从服务器积压的消息超过了其物理内存的40%,下次拉取使用的Broker为订阅组的whichBrokerWhenConsumeSlowly指定的Broker服务器,该值默认为1,代表从服务器。

3、主从服务消息消费进是如何同步的?
答:消息消费进度的同步时单向的,从服务器开启一个定时任务,定时从主服务器同步消息消费进度;无论消息消费者是从主服务器拉的消息还是从从服务器拉取的消息,在向Broker反馈消息消费进度时,优先向主服务器汇报;消息消费者向主服务器拉取消息时,如果消息消费者内存中存在消息消费进度时,主会尝试跟新消息消费进度。

读写分离的正确使用姿势:
1、主从Broker服务器的slaveReadEnable设置为true。
2、通过updateSubGroup命令更新消息组whichBrokerWhenConsumeSlowly、brokerId,特别是其brokerId不要设置为0,不然从从服务器拉取一次后,下一次拉取就会从主去拉取。

一个新的消费组订阅一个已存在的Topic主题时,消费组是从该Topic的哪条消息开始消费呢?

首先翻阅DefaultMQPushConsumer的API时,setConsumeFromWhere(ConsumeFromWhere consumeFromWhere)API映入眼帘,从字面意思来看是设置消费者从哪里开始消费,正是解开该问题的”钥匙“。ConsumeFromWhere枚举类
CONSUME_FROM_MAX_OFFSET
从消费队列最大的偏移量开始消费。
CONSUME_FROM_FIRST_OFFSET
从消费队列最小偏移量开始消费。
CONSUME_FROM_TIMESTAMP
从指定的时间戳开始消费,默认为消费者启动之前的30分钟处开始消费。可以通过DefaultMQPushConsumer#setConsumeTimestamp。

rocketMQ知识点总结相关推荐

  1. RocketMQ知识点整理

    RocketMQ知识点整理 一.消息队列 二.RocketMQ简介 RocketMQ-组件 RocketMQ架构: 三. RocketMQ理解性问题整理 1.使用消息中间件之前需要先了解"同 ...

  2. 超详细的RocketMq知识点讲解以及实战

    大家好我是魔笑,下面是对RocketMq知识点的一些讲解,以及代码实战,如果有讲的不对的地方,请多指教.,前期介绍了,RockeMq部署架构,以及角色以及相关术语进行讲解.后面主干是从上产消息,存储消 ...

  3. 看完保送阿里的RocketMQ知识点(超详细)

    你知道的越多,你不知道的越多 点赞再看,养成习惯 本文GitHub https://github.com/JavaFamily 已收录,有一线大厂面试点脑图.个人联系方式,欢迎Star和指教 前言 消 ...

  4. RocketMQ 知识点整理

    文章目录 什么是RMQ?架构介绍 消息持久化 异步刷盘 RocketMQ为什么速度快 延迟消息 事务消息 过滤消息 有序消息 批量消息 消息轨迹(msg trace) 消费重试 死信队列 广播消息 o ...

  5. 分布式系统灰度发布实践

    文章目录 0.分布式系统灰度要实现的功能清单 1.携带灰度因子 1.1.Http请求中 1.2.JVM中 1.3.调用链中 2.前端资源灰度 2.1.方案一:基于verynginx 2.2.方案二:基 ...

  6. MQ 之 RocketMQ 核心知识点

    前言 紧接上文,这一章将记录RocketMQ的一些核心知识点,也是面试上经常被问及的地方,是非常重要的. 当然了,这里全是理论知识,会比较枯燥,下一张会记录RocketMQ的应用,就会好很多了,不过要 ...

  7. RocketMQ 1.学习资料 2.面试题 3.知识点

    前言 这篇文章分三方面来介绍RocketMQ 1.学习资料 2.面试题 3.知识点 学习资料 官方 官方文档 rocketmq.apache.org/docs/simple- 官方github git ...

  8. 10分钟掌握RocketMQ的核心知识

    前言 Apache RocketMQ 是阿里开源的一款高性能.高吞吐量的分布式消息中间件. RocketMQ主要由 Producer.Broker.Consumer 三部分组成,其中Producer ...

  9. 「查缺补漏」巩固你的 RocketMQ 知识体系

    本期带来的内容是 RocketMQ 核心知识点.建议收藏起来慢慢看~ Windows安装部署 下载地址:https://www.apache.org/dyn/closer.cgi?path=rocke ...

最新文章

  1. legend3---lavarel安装debugbar工具条
  2. 字符串中最后一个词组的长度 Length of Last Word
  3. .NET 二维码生成(ThoughtWorks.QRCode)
  4. NAR | 陈加余/陈亮合作建立R-loop全基因组分布与调控的专家数据库
  5. python编程(动态加载)
  6. destoon php os,destoon运行流程二次开发必看
  7. Read Asia Embedded fell
  8. 我是如何学习Android源码的
  9. Ubuntu20.04 安装在U盘上
  10. ssm(Spring+SpringMVC+MyBatis)台球室内乒乓球室体育器械租赁收费系统
  11. 合宙Air720UH链接阿里云流程
  12. 解决mysql sum求和返回null问题或IFNULL应用
  13. 用java实现简单绘图
  14. 解决掌阅PC端不能复制的脚本
  15. JavaScript异步编程(1)- ECMAScript 6的Promise对象
  16. 遗传算法解决TSP(34个省会城市)问题
  17. 智能控制和计算机控制的区别,BA楼宇控制方式是什么 智能照明控制与BA楼宇控制方式的区别在哪...
  18. 粉丝福利-2019云栖大会学习资料
  19. 不懂技术,如何轻松制作微信H5页面?
  20. 关于 getWriter() has already been called for this response 的错误解决办法

热门文章

  1. Navicat新建查询系统找不到指定路径
  2. 网络超时检测-select()函数
  3. 四、redis原理之set底层数据结构
  4. 利用Java进行zip文件压缩与解压缩
  5. sklearn 回归 算法 最小二乘法
  6. 计算机专业四川省录取分数线,四川省计算机信息职业技术学院2020年招生录取分数线...
  7. 以下关于中国公用计算机互联网,中国公用计算机互联网国际联网管理办法
  8. python scrapy 环境搭建_Python Scrapy 爬虫(一):环境搭建
  9. 嵌入式软件系统开发历程
  10. 安卓手机出现闪屏怎么处理