接着上编文章继续 https://blog.csdn.net/changjh1/article/details/116599745

this.rebalanceService.start(); // 重平衡
    public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}

心跳的时候做了一些事

    public void doRebalance() {// 遍历每个消费组获取 MQConsumeInner 对象 一个消费组就一个MQConsumeInner对象// consumerTable 心跳的时候注入for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();if (impl != null) {try {impl.doRebalance();} catch (Throwable e) {log.error("doRebalance exception", e);}}}}
    public void doRebalance(final boolean isOrder) {// consumer启动的时候放入的?Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}this.truncateMessageQueueNotMyTopic();}
 private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}case CLUSTERING: {// 重要// 心跳或者consumer启动的时候  更新nameSrv路由信息的时候 放入消费者队列  topicSubscribeInfoTable// topic->队列信息Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);// 所有消费者IDsList<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {// 策略 消费者=》队列 关系 负载算法 TODO// 一个消费者对应的队列 allocateResultallocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}// PullMessageService 放入拉取任务了boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);// TODO taskTable 重新放入任务this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;}}
// PullMessageService 放入拉取任务了
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {boolean changed = false;Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();// 这个队列属于 这个topicif (mq.getTopic().equals(topic)) {// mqSet 最新队列列表if (!mqSet.contains(mq)) {pq.setDropped(true);// 保存队列的 offset 让其他消费者 获取offset开始消费,重新被其他消费者加载// 删除队列if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);}} else if (pq.isPullExpired()) {switch (this.consumeType()) {case CONSUME_ACTIVELY:break;case CONSUME_PASSIVELY:pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",consumerGroup, mq);}break;default:break;}}}}List<PullRequest> pullRequestList = new ArrayList<PullRequest>();for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);continue;}this.removeDirtyOffset(mq); //  删除 offsetTable 队列-》offset信息 删除后重新计算// mq 快照ProcessQueue pq = new ProcessQueue();long nextOffset = this.computePullFromWhere(mq); // 拉取MessageQueue 上次结束的偏移量 重新开始if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset); // 设置 从呢开始干活pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}}// 设置pullRequestQueue 任务量this.dispatchPullRequest(pullRequestList);return changed;}
// 设置pullRequestQueue 任务量
this.dispatchPullRequest(pullRequestList); // 重平衡 封装PullRequest放入阻塞队列 拉取消息
    @Overridepublic void dispatchPullRequest(List<PullRequest> pullRequestList) {for (PullRequest pullRequest : pullRequestList) {this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);}}
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
    public void executePullRequestImmediately(final PullRequest pullRequest) {try {this.pullRequestQueue.put(pullRequest); //放入} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);}}

rocketmq rebalance相关推荐

  1. 深入理解RocketMQ Rebalance机制

    本文深入的分析了RocketMQ的Rebalance机制,主要包括以下内容: Rebalance必要的元数据信息的维护 Broker协调通知机制: 消费者/启动/运行时/停止时Rebalance触发时 ...

  2. 深入理解RocketMQ延迟消息

    延迟消息是实际开发中一个非常有用的功能,本文第一部分从整体上介绍秒级精度延迟消息的实现思路,在第二部分结合RocketMQ的延迟消息实现,进行细致的讲解,点出关键部分的源码.第三步介绍延迟消息与消息重 ...

  3. java每隔 消费队列数据_消费者Rebalance机制

    本文深入的分析了RocketMQ的Rebalance机制,主要包括以下内容:Rebalance必要的元数据信息的维护 Broker协调通知机制: 消费者/启动/运行时/停止时Rebalance触发时机 ...

  4. RocketMQ的Rebalance详解

    负载均衡 RocketMQ的消费负载是通过queue来对消息进行分片,然后consumer消费自己对应的queue来实现.我们以如下场景为例: topicA有两个队列:queue1和queue2. 消 ...

  5. RocketMQ(五)-消费者启动机制、Rebalance机制

    消费者启动机制 DefaultMQPullConsumer 核心属性 核心方法 DefaultMQPushConsumer 消费者启动流程 DefaultMQPullConsumerImpl启动流程 ...

  6. RocketMQ源码解析之rebalance

    阅读须知 文章中使用/* */注释的方法会做深入分析 正文 rebalance 是 RocketMQ 消费过程中一个非常重要的流程,可以先从字面简单的理解下这个流程要做的事情.在分析 Consumer ...

  7. RocketMQ源码(十九)之消费者Rebalance

    文章目录 版本 简介 Broker端 ConsumerManager ConsumerOffsetManager SubscriptionGroupManager 消费端 RebalanceServi ...

  8. RocketMQ(八)——Rebalance机制介绍

    Rebalance机制 前提:集群消费模式 介绍: Rebalance指的是:将下一个Topic的多个Queue在同一个Consumer Group中的多个Consumer间进行重新分配的过程 该机制 ...

  9. RocketMQ的Rebalance机制

    Rebalance机制本意是为了提升消息的并行处理能力.例如,一个Topic下5个队列,在只有1个消费者的情况下,那么这个消费者将负责处理这5个队列的消息.如果此时我们增加一个消费者,那么可以给其中一 ...

最新文章

  1. 乱思。。。。。。。、、、、、
  2. 33、springboot整合springcloud
  3. 在可编辑div中插入文字或图片的问题解决思路
  4. 奔跑吧,OpenStack现场分享:超融合架构如何抹平物理硬件差异?
  5. 被LTRIM(RTRIM())害死了,差点
  6. .NET开源MSSQL、Redis监控产品Opserver之Redis配置
  7. 一张图告诉你什么是系统架构师
  8. Spring MVC 4快速入门Maven原型得到了改进–更多Java 8功能
  9. C/C++笔试题(基础题)
  10. 解决0RA-04031故障
  11. c语言编写算术编码,编程实现算术编码算法.doc
  12. scratch编程滑雪者游戏教程
  13. 使用bat脚本创建快捷方式
  14. JavaScript 对象 — 重学 JavaScript
  15. 用Python分析经纬度数据
  16. 大连八中学2021年高考成绩查询,2021年大连各高中高考成绩排名及放榜最新消息...
  17. hihoCoder #1692 : 第K小分数
  18. Debian系下载deb安装包及依赖包
  19. 开始Windows Embedded Compact 7的第一个项目――虚拟机上的CEPC
  20. 从三大行业看大数据应用的三重境界:数据、分析、成果

热门文章

  1. 不同模拟器使用不同IP代理方法
  2. 用unity制作一个智能机器人程序
  3. 5G测试白卡需要写入哪些数据才能正常使用?
  4. 古月居机器视觉开发——ROS+opencv的图像处理方法(三)
  5. Vue 引用网络图片 403拒绝访问
  6. Angular5学习笔记(一)
  7. angular路由的重定向,pathMatch='prefix'到底是干啥(学习angular5x路由笔记)
  8. 代码首次提交到gitee上报错问题解决
  9. 怎么解决sockjs.js?9be2:1609 GET http://192.168.0.133:8080/sockjs-node/info?t=1630545142551 net::ERR_NETW
  10. 转载:强化学习中Bellman最优性方程背后的数学原理?