rocketmq rebalance
接着上编文章继续 https://blog.csdn.net/changjh1/article/details/116599745
this.rebalanceService.start(); // 重平衡
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}
心跳的时候做了一些事
public void doRebalance() {// 遍历每个消费组获取 MQConsumeInner 对象 一个消费组就一个MQConsumeInner对象// consumerTable 心跳的时候注入for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();if (impl != null) {try {impl.doRebalance();} catch (Throwable e) {log.error("doRebalance exception", e);}}}}
public void doRebalance(final boolean isOrder) {// consumer启动的时候放入的?Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}this.truncateMessageQueueNotMyTopic();}
private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}case CLUSTERING: {// 重要// 心跳或者consumer启动的时候 更新nameSrv路由信息的时候 放入消费者队列 topicSubscribeInfoTable// topic->队列信息Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);// 所有消费者IDsList<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {// 策略 消费者=》队列 关系 负载算法 TODO// 一个消费者对应的队列 allocateResultallocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}// PullMessageService 放入拉取任务了boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);// TODO taskTable 重新放入任务this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;}}
// PullMessageService 放入拉取任务了
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {boolean changed = false;Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();// 这个队列属于 这个topicif (mq.getTopic().equals(topic)) {// mqSet 最新队列列表if (!mqSet.contains(mq)) {pq.setDropped(true);// 保存队列的 offset 让其他消费者 获取offset开始消费,重新被其他消费者加载// 删除队列if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);}} else if (pq.isPullExpired()) {switch (this.consumeType()) {case CONSUME_ACTIVELY:break;case CONSUME_PASSIVELY:pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",consumerGroup, mq);}break;default:break;}}}}List<PullRequest> pullRequestList = new ArrayList<PullRequest>();for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);continue;}this.removeDirtyOffset(mq); // 删除 offsetTable 队列-》offset信息 删除后重新计算// mq 快照ProcessQueue pq = new ProcessQueue();long nextOffset = this.computePullFromWhere(mq); // 拉取MessageQueue 上次结束的偏移量 重新开始if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset); // 设置 从呢开始干活pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}}// 设置pullRequestQueue 任务量this.dispatchPullRequest(pullRequestList);return changed;}
// 设置pullRequestQueue 任务量
this.dispatchPullRequest(pullRequestList); // 重平衡 封装PullRequest放入阻塞队列 拉取消息
@Overridepublic void dispatchPullRequest(List<PullRequest> pullRequestList) {for (PullRequest pullRequest : pullRequestList) {this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);}}
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
public void executePullRequestImmediately(final PullRequest pullRequest) {try {this.pullRequestQueue.put(pullRequest); //放入} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);}}
rocketmq rebalance相关推荐
- 深入理解RocketMQ Rebalance机制
本文深入的分析了RocketMQ的Rebalance机制,主要包括以下内容: Rebalance必要的元数据信息的维护 Broker协调通知机制: 消费者/启动/运行时/停止时Rebalance触发时 ...
- 深入理解RocketMQ延迟消息
延迟消息是实际开发中一个非常有用的功能,本文第一部分从整体上介绍秒级精度延迟消息的实现思路,在第二部分结合RocketMQ的延迟消息实现,进行细致的讲解,点出关键部分的源码.第三步介绍延迟消息与消息重 ...
- java每隔 消费队列数据_消费者Rebalance机制
本文深入的分析了RocketMQ的Rebalance机制,主要包括以下内容:Rebalance必要的元数据信息的维护 Broker协调通知机制: 消费者/启动/运行时/停止时Rebalance触发时机 ...
- RocketMQ的Rebalance详解
负载均衡 RocketMQ的消费负载是通过queue来对消息进行分片,然后consumer消费自己对应的queue来实现.我们以如下场景为例: topicA有两个队列:queue1和queue2. 消 ...
- RocketMQ(五)-消费者启动机制、Rebalance机制
消费者启动机制 DefaultMQPullConsumer 核心属性 核心方法 DefaultMQPushConsumer 消费者启动流程 DefaultMQPullConsumerImpl启动流程 ...
- RocketMQ源码解析之rebalance
阅读须知 文章中使用/* */注释的方法会做深入分析 正文 rebalance 是 RocketMQ 消费过程中一个非常重要的流程,可以先从字面简单的理解下这个流程要做的事情.在分析 Consumer ...
- RocketMQ源码(十九)之消费者Rebalance
文章目录 版本 简介 Broker端 ConsumerManager ConsumerOffsetManager SubscriptionGroupManager 消费端 RebalanceServi ...
- RocketMQ(八)——Rebalance机制介绍
Rebalance机制 前提:集群消费模式 介绍: Rebalance指的是:将下一个Topic的多个Queue在同一个Consumer Group中的多个Consumer间进行重新分配的过程 该机制 ...
- RocketMQ的Rebalance机制
Rebalance机制本意是为了提升消息的并行处理能力.例如,一个Topic下5个队列,在只有1个消费者的情况下,那么这个消费者将负责处理这5个队列的消息.如果此时我们增加一个消费者,那么可以给其中一 ...
最新文章
- 乱思。。。。。。。、、、、、
- 33、springboot整合springcloud
- 在可编辑div中插入文字或图片的问题解决思路
- 奔跑吧,OpenStack现场分享:超融合架构如何抹平物理硬件差异?
- 被LTRIM(RTRIM())害死了,差点
- .NET开源MSSQL、Redis监控产品Opserver之Redis配置
- 一张图告诉你什么是系统架构师
- Spring MVC 4快速入门Maven原型得到了改进–更多Java 8功能
- C/C++笔试题(基础题)
- 解决0RA-04031故障
- c语言编写算术编码,编程实现算术编码算法.doc
- scratch编程滑雪者游戏教程
- 使用bat脚本创建快捷方式
- JavaScript 对象 — 重学 JavaScript
- 用Python分析经纬度数据
- 大连八中学2021年高考成绩查询,2021年大连各高中高考成绩排名及放榜最新消息...
- hihoCoder #1692 : 第K小分数
- Debian系下载deb安装包及依赖包
- 开始Windows Embedded Compact 7的第一个项目――虚拟机上的CEPC
- 从三大行业看大数据应用的三重境界:数据、分析、成果
热门文章
- 不同模拟器使用不同IP代理方法
- 用unity制作一个智能机器人程序
- 5G测试白卡需要写入哪些数据才能正常使用?
- 古月居机器视觉开发——ROS+opencv的图像处理方法(三)
- Vue 引用网络图片 403拒绝访问
- Angular5学习笔记(一)
- angular路由的重定向,pathMatch='prefix'到底是干啥(学习angular5x路由笔记)
- 代码首次提交到gitee上报错问题解决
- 怎么解决sockjs.js?9be2:1609 GET http://192.168.0.133:8080/sockjs-node/info?t=1630545142551 net::ERR_NETW
- 转载:强化学习中Bellman最优性方程背后的数学原理?