在了解了RocketMQ的发送与接收后,也好奇RocketMQ内部是如何处理好生产端、消费端的负载均衡的,下面通过分析源码、查阅相关文档资料以及结合自己的理解,做了下归纳总结。

RocketMQ的消息负载均衡都是下放到Client端来实现的,具体可细分为2块:发送负载(Producer端)消费负载(Consumer端)

1、发送负载

1.1 路由信息

消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。

1.2 选择队列

1.2.1 默认方式(sendLatencyFaultEnable 开关关闭)

生产者端发送消息时,会根据Topic信息(每条消息都必须指定有Topic信息),从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。随机递增式的轮询,每个生产者都通过ThreadLocal维护自己的一套下标index,初始化时产生随机数生成下标,后续每次都递增加1后对队列个数取模,从而获取对应下标的messageQueue。

1.2.2 Broker故障延迟方式(sendLatencyFaultEnable 开关打开)

在随机递增取模的基础上,结合消息失败延迟策略,过滤掉暂时认为不可用的Broker的消息队列。

消息失败延迟策略的算法在MQFaultStrategy上实现(MQFaultStrategy也被称为失败延迟策略实现的门面类),其中2个重要的参数 latencyMax、notAvailableDuration(单位都是毫秒)。

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

这2个参数如何结合实现延迟的呢?

latencyMax,在发送消息后,根据本次消息的发送耗时 currentLatency,从latencyMax数组最后一个值往前找,直到第一个比currentLatency小的值,其对应的下标为currentIdx,则可设置Broker的不可用时长为notAvailableDuration[currentIdx],调用门面类updateFaultItem方法进行更新,以此达到退避的效果。

举个例子,如果请求的latency为3300L,则currentLatency=5,对应的不可用时长为notAvailableDuration[5]=180000L,也即本次记录broker需要退避的时长180秒。

该延迟机制(latencyFaultTolerance)也是消费者端实现高可用的核心所在。

2、消费负载

这里主要讲消费端的集群消费模式下的处理(另一种模式是广播模式)。

2.1 消息获取模型概述

目前客户端与服务端(Broker)之间有两种模式:推模式、拉模式。

这里的推模式是基于拉模式进行了封装,也即通过长轮询的方式来达到兼具Pull与Push的优点。在服务端收到客户端的请求后,会进行查询,如果队列里没有数据,此时服务端先挂起,不着急返回,等待一定时间(默认5s)后,会再进一步继续查询,当一直未查询到结果并超过重试次数后返回空结果(比较适合在客户端连接数量可控的场景中)。

PS,RocketMQ的前身,第一代的Notify主要使用了推模型,解决了事务消息。第二代的MetaQ则主要使用了拉模型,解决了顺序消息和海量堆积的问题。所以一个优秀的项目其实都是在不断进化演变中的。

2.2 消费者队列如何负载

消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列

在RocketMQ中,消息队列的负载均衡是由客户端启动MQClientInstance实例部分时,触发负载均衡服务线程(具体由RebalanceService线程实现),默认每20s执行一次。

底层实现均衡的逻辑是在RebalanceImpl类的rebalanceByTopic()方法中。代码如下:

/*** 消费负载均衡核心方法** @param topic 待重均衡主题* @param isOrder*/
private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}/** 集群模式 */case CLUSTERING: {/** 1、获取该topic下的所有mq消费队列 */Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);/** 2、获取该topic、消费者分组下的所有消费者id */List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);/** 3、获取消息队列分配策略 */AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;/** 4、开始给当前消费者分配消费队列 */List<MessageQueue> allocateResult = null;try {allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}/** 5、重均衡后,更新快照队列信息 */boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;}
}

具体过程解释(针对集群模式):

1、获取该topic下的所有mq消息队列;

2、获取该topic、消费者分组下的所有消费者id;

3、校验步骤1/2中任意一个结果,如果结果为空则跳过不做处理;否则进入步骤4;

4、获取消息队列分配策略;

目前RocketMQ提供了6种分配算法,默认使用消息队列的平均分配算法(AllocateMessageQueueAveragely),也推荐使用这种。

平均算法举例说明:假设有8个队列,q1,q2,……,q8,有3个消费者c1,c2,c3,则在平均分配算法下,各消费者的分配队列如下:

c1:q1,q2,q3

c2:q4,q5,q6

c3:q7,q8

(也因此可以看出,当消费者数量大于队列数量时,则会存在消费者无法分配到队列的情况)

RocketMQ提供的6种分配算法

5、重均衡后,更新快照队列信息(ProcessQueueTable)

此时调用RebalanceImpl#updateProcessQueueTableInRebalance()进行处理

假设本次通过上面几个步骤分配后得到的队列集合(mqSet)为mq1,mq2,mq3,mq4,在更新ProcessQueueTable中,会拿已分配到的队列与当前的消费队列快照(Queue consumption snapshot)比对。

变量解释说明:

processQueueTable:当前消费者负载的消息队列缓存表,结构是 ConcurrentMap<MessageQueue, ProcessQueue>

队列的比对情况(3种)以及对应执行的操作如下:

1)当前快照队列集合存在,新分配队列集合不存在(假设为上图processQueueTable标注的红色部分,e1,e2)

执行剔除e1,e2的操作,将状态标识字段 droped 置为 true,这样,该 ProcessQueue 中的消息将不会再被消费。

2)当前快照队列集合存在,新分配队列集合也存在(假设为上图processQueueTable标注的绿色部分,e3,e4)

Pull模式直接忽略不做调整;Push模式下判断processQueueTable中的该2个ProcessQueue是否已过期,已过期则移除。

3)当前快照队列集合不存在,新分配队列集合存在(假设为上图processQueueTable标注的白色部分,e5,e6);

本次新增的消息队列,添加入processQueueTable中。

至此,完成了消费端的负载均衡。

RocketMQ的负载均衡相关推荐

  1. RocketMQ Consumer 负载均衡算法源码学习 -- AllocateMessageQueueConsistentHash

    RocketMQ 提供了一致性hash 算法来做Consumer 和 MessageQueue的负载均衡. 源码中一致性hash 环的实现是很优秀的,我们一步一步分析. 一个Hash环包含多个节点, ...

  2. rocketmq消费负载均衡--push消费为例

    本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点.本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析,共分 ...

  3. RocketMQ 消息负载均衡策略解析——图解、源码级解析

  4. RocketMQ:消费端的消息消息队列负载均衡与重新发布机制源码解析

    文章目录 前言 流程解析 总结 前言 在上一篇博客中我们了解到,PullMessageService线程主要是负责从pullRequestQueue中获得拉取消息请求并进行请求处理的. PullMes ...

  5. RocketMQ消息存储、刷盘、负载均衡

    消息存储 消息存储是RocketMQ中最为复杂和最为重要的一部分. 消息存储总体架构 消息存储架构图: minOffset:当前队列的最小消息偏移量,如果消费时指定从最早消费,就是从该偏移量消费. m ...

  6. 从入门到入土(八)RocketMQ的Consumer是如何做的负载均衡的

    精彩推荐 一百期Java面试题汇总 SpringBoot内容聚合 IntelliJ IDEA内容聚合 Mybatis内容聚合 接上一篇:RocketMQ入门到入土(七 )为什么同一个消费组设置不同ta ...

  7. RocketMQ消费者端消息列队六种负载均衡算法分析

    在RocketMQ启动的时候会启动负载均衡线程,过程如下: //DefaultMQPullConsumerImpl.start()mQClientFactory.start();//上面点进去 -&g ...

  8. 【RocketMQ】消息的高可用与负载均衡

    消息生产的高可用机制 在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同brokerId的机器组成一个Broker组),这样当一 ...

  9. java rocketmq消费_rocketmq消费负载均衡--push消费详解

    前言 本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点.本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析 ...

最新文章

  1. jdbc增删改查_JDBC第二期
  2. [云炬ThinkPython阅读笔记]1.4 算术运算符
  3. 浅谈 Kubernetes Scheduling-Framework 插件的实现
  4. [css] 怎样修改chrome记住密码后自动填充表单的黄色背景?
  5. 在centos上安装html,Centos-7安装pdf2htmlEX
  6. 第二十一天 认识一维数组part3
  7. 程序员都必须了解的18个Python模式程序片段
  8. 跨区域报考计算机考试可以吗,考生注意!2020年医师资格机考跨题型不可以回看(附上机操作系统)...
  9. 不要把Linux和Windows比较
  10. 毕业设计 基于大数据的社交平台数据爬虫舆情分析可视化系统
  11. zktime 协议_zktime5.0考勤管理系统使用说明书(1.0版).pdf
  12. wps怎么把两张图片组合_wps如何把图片和图形进行组合?图片和图形进行组合的方法...
  13. python爬取豆瓣读书简单_Python利用lxml模块爬取豆瓣读书排行榜的方法与分析
  14. 高品质摄影作图台式计算机推荐,能拍出高品质作品的强大系统 摄影师段岳衡专访...
  15. UR机器人通信端口和协议
  16. 国内数藏造富只是个例,散户见好就收
  17. 扫描识别工具Dynamic Web TWAIN使用教程:如何自定义Web TWAIN对象
  18. JS 通过日期判断当前日期所在周的周一到周日的日期
  19. 《小白兔到大黑牛》第十一篇yum命令的总结
  20. FLUENT多孔介质数值模拟设置【转载】

热门文章

  1. 如何提高办公效率?不如试试智能化OA办公系统
  2. 【Mysql】免费的mysql图形化软件推荐
  3. 找不到dlopen failed: library /data/data/com.example.gpstest1/lib/libgnustl_shared.so not found
  4. 正整数前n项平方和与立方和的推导
  5. FFmpeg浅尝辄止(四)——音频的解码和编码
  6. Centos7下Nginx代理和二级域名配置
  7. 数智管理新动能,深度解读《2022中国指标中台市场研究报告》
  8. C语言拯救者 (零基础入门C语言--1)
  9. Latex实现框内强制换行
  10. pandas踩坑:nested renamer is not supported python