1、BrokerController 中与主从相关的方法详解


本节先对 BrokerController 中与主从切换相关的方法。

1.1 startProcessorByHa

BrokerController#startProcessorByHa

private void startProcessorByHa(BrokerRole role) {

if (BrokerRole.SLAVE != role) {

if (this.transactionalMessageCheckService != null) {

this.transactionalMessageCheckService.start();

}

}

}

感觉该方法的取名较为随意,该方法的作用是开启事务状态回查处理器,即当节点为主节点时,开启对应的事务状态回查处理器,对PREPARE状态的消息发起事务状态回查请求。

1.2 shutdownProcessorByHa

BrokerController#shutdownProcessorByHa

private void shutdownProcessorByHa() {

if (this.transactionalMessageCheckService != null) {

this.transactionalMessageCheckService.shutdown(true);

}

}

关闭事务状态回查处理器,当节点从主节点变更为从节点后,该方法被调用。

1.3 handleSlaveSynchronize

BrokerController#handleSlaveSynchronize

private void handleSlaveSynchronize(BrokerRole role) {

if (role == BrokerRole.SLAVE) { // @1

if (null != slaveSyncFuture) {

slaveSyncFuture.cancel(false);

}

this.slaveSynchronize.setMasterAddr(null); //

slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

BrokerController.this.slaveSynchronize.syncAll();

} catch (Throwable e) {

log.error(“ScheduledTask SlaveSynchronize syncAll error.”, e);

}

}

}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);

} else { // @2

//handle the slave synchronise

if (null != slaveSyncFuture) {

slaveSyncFuture.cancel(false);

}

this.slaveSynchronize.setMasterAddr(null);

}

}

该方法的主要作用是处理从节点的元数据同步,即从节点向主节点主动同步 topic 的路由信息、消费进度、延迟队列处理队列、消费组订阅配置等信息。

代码@1:如果当前节点的角色为从节点:

  • 如果上次同步的 future 不为空,则首先先取消。

  • 然后设置 slaveSynchronize 的 master 地址为空。不知大家是否与笔者一样,有一个疑问,从节点的时候,如果将 master 地址设置为空,那如何同步元数据,那这个值会在什么时候设置呢?

  • 开启定时同步任务,每 10s 从主节点同步一次元数据。

代码@2:如果当前节点的角色为主节点,则取消定时同步任务并设置 master 的地址为空。

1.4 changeToSlave

BrokerController#changeToSlave

public void changeToSlave(int brokerId) {

log.info(“Begin to change to slave brokerName={} brokerId={}”, brokerConfig.getBrokerName(), brokerId);

//change the role

brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check // @1

messageStoreConfig.setBrokerRole(BrokerRole.SLAVE); // @2

//handle the scheduled service

try {

this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE); // @3

} catch (Throwable t) {

log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);

}

//handle the transactional service

try {

this.shutdownProcessorByHa(); // @4

} catch (Throwable t) {

log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);

}

//handle the slave synchronise

handleSlaveSynchronize(BrokerRole.SLAVE); // @5

try {

this.registerBrokerAll(true, true, brokerConfig.isForceRegister()); // @6

} catch (Throwable ignored) {

}

log.info(“Finish to change to slave brokerName={} brokerId={}”, brokerConfig.getBrokerName(), brokerId);

}

Broker 状态变更为从节点。其关键实现如下:

  • 设置 brokerId,如果broker的id为0,则设置为1,这里在使用的时候,注意规划好集群内节点的 brokerId。

  • 设置 broker 的状态为 BrokerRole.SLAVE。

  • 如果是从节点,则关闭定时调度线程(处理 RocketMQ 延迟队列),如果是主节点,则启动该线程。

  • 关闭事务状态回查处理器。

  • 从节点需要启动元数据同步处理器,即启动 SlaveSynchronize 定时从主服务器同步元数据。

  • 立即向集群内所有的 nameserver 告知 broker 信息状态的变更。

1.5 changeToMaster

BrokerController#changeToMaster

public void changeToMaster(BrokerRole role) {

if (role == BrokerRole.SLAVE) {

return;

}

log.info(“Begin to change to master brokerName={}”, brokerConfig.getBrokerName());

//handle the slave synchronise

handleSlaveSynchronize(role); // @1

//handle the scheduled service

try {

this.messageStore.handleScheduleMessageService(role); // @2

} catch (Throwable t) {

log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);

}

//ha

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

ndle the transactional service

try {

this.startProcessorByHa(BrokerRole.SYNC_MASTER); // @3

} catch (Throwable t) {

log.error("[MONITOR] startProcessorByHa failed when changing to master", t);

}

//if the operations above are totally successful, we change to master

brokerConfig.setBrokerId(0); //TO DO check // @4

messageStoreConfig.setBrokerRole(role);

try {

this.registerBrokerAll(true, true, brokerConfig.isForceRegister()); // @5

} catch (Throwable ignored) {

}

log.info(“Finish to change to master brokerName={}”, brokerConfig.getBrokerName());

}

该方法是 Broker 角色从从节点变更为主节点的处理逻辑,其实现要点如下:

  • 关闭元数据同步器,因为主节点无需同步。

  • 开启定时任务处理线程。

  • 开启事务状态回查处理线程。

  • 设置 brokerId 为 0。

  • 向 nameserver 立即发送心跳包以便告知 broker 服务器当前最新的状态。

主从节点状态变更的核心方法就介绍到这里了,接下来看看如何触发主从切换。

2、如何触发主从切换


从前面的文章我们可以得知,RocketMQ DLedger 是基于 raft 协议实现的,在该协议中就实现了主节点的选举与主节点失效后集群会自动进行重新选举,经过协商投票产生新的主节点,从而实现高可用。

BrokerController#initialize

if (messageStoreConfig.isEnableDLegerCommitLog()) {

DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);

((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);

}

上述代码片段截取自 BrokerController 的 initialize 方法,我们可以得知在 Broker 启动时,如果开启了 多副本机制,即 enableDLedgerCommitLog 参数设置为 true,会为 集群节点选主器添加 roleChangeHandler 事件处理器,即节点发送变更后的事件处理器。

接下来我们将重点探讨 DLedgerRoleChangeHandler 。

2.1 类图

DLedgerRoleChangeHandler 继承自 RoleChangeHandler,即节点状态发生变更后的事件处理器。上述的属性都很简单,在这里就重点介绍一下 ExecutorService executorService,事件处理线程池,但只会开启一个线程,故事件将一个一个按顺序执行。

接下来我们来重点看一下 handle 方法的执行。

2.2 handle 主从状态切换处理逻辑

DLedgerRoleChangeHandler#handle

public void handle(long term, MemberState.Role role) {

Runnable runnable = new Runnable() {

public void run() {

long start = System.currentTimeMillis();

try {

boolean succ = true;

log.info(“Begin handling broker role change term={} role={} currStoreRole={}”, term, role, messageStore.getMessageStoreConfig().getBrokerRole());

switch (role) {

case CANDIDATE: // @1

if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {

brokerController.changeToSlave(dLedgerCommitLog.getId());

}

break;

case FOLLOWER: // @2

brokerController.changeToSlave(dLedgerCommitLog.getId());

break;

源码分析 RocketMQ DLedger 多副本即主从切换实现原理,java开发数据库面试题相关推荐

  1. 源码分析 RocketMQ DLedger 多副本存储实现,泛微网络java面试题

    前言 俗话说"生于忧患,死于安乐",其实大部分中年危机,就是在安乐中产生的. 有的人或许会反驳,"照你这么说,我还必须奋斗了,不奋斗就要死,难道选择安逸的生活就不对吗?我 ...

  2. 源码分析RocketMQ ACL实现机制

    有关RocketMQ ACL的使用请查看上一篇<RocketMQ ACL使用指南>,本文从源码的角度,分析一下RocketMQ ACL的实现原理. 备注:RocketMQ在4.4.0时引入 ...

  3. 从源码分析RocketMQ系列-消息拉取PullMessageProcessor详解

    导语   在之前的分析中分析了关于SendMessageProcessor,并且提供了对应的源码分析分析对于消息持久化的问题,下面来看另外一个PullMessageProcessor,在RocketM ...

  4. 从源码分析RocketMQ系列-RocketMQ消息持久化源码详解

    导语   在上篇分析中,提到了一个概念处理器,并且在进入到最终NettyIO的时候看到了一个Pair的对象,这个对象存储了两个对象,一个是执行器,一个是处理器,在进入Runable对象的时候看到封装到 ...

  5. 从源码分析RocketMQ系列-Remoting通信架构源码详解

    导语   这篇博客要从官方给出的一张图开始说起,之前的分析我们都是简单的分析了一下消息传递的流程,以及消息传递流程过程中出现的一些类的封装,并且提出,所有的封装操作都是为了更加高效的服务于NameSe ...

  6. 【转】ABP源码分析三十五:ABP中动态WebAPI原理解析

    动态WebAPI应该算是ABP中最Magic的功能之一了吧.开发人员无须定义继承自ApiController的类,只须重用Application Service中的类就可以对外提供WebAPI的功能, ...

  7. 源码分析 There is no getter for property named '*' in 'class java.lang.String

    2019独角兽企业重金招聘Python工程师标准>>> There is no getter for property named '*' in 'class java.lang.S ...

  8. 从源码分析RocketMQ系列-MQClientInstance类详解

    导语   在之前的分析中,看到有一个类MQClientInstance,这个无论是在Producer端还是在Consumer端都是很重要的一个类,很多的功能都是从这个类发起的,这边分享中就来详细的看看 ...

  9. 从源码分析RocketMQ系列-Producer的SendResult来自哪里?

    导语   对于消息中间件大家都应该不陌生,现在比较主流的消息中间件有Kafka.RabbitMQ.RocketMQ.ActiveMQ等等.前段时间花了很长时间分析了关于RocketMQ源码,之前也分享 ...

最新文章

  1. 好骚气的树状数组的解释
  2. halcon 旋转_HALCON高级篇:3D相机标定(2/3)
  3. 十七、“秦时山洞汉时水,水长山高不止息。”(2021.5.17)
  4. python实验三答案_20194123 实验三《Python程序设计》实验报告
  5. 万字长文!java读取json文件数据给对象
  6. Ubuntu下安装配置JDK1.7
  7. linux那些事之 page translation(硬件篇)
  8. 嵌套点击事件只执行第一次
  9. layer.open中使用时间控件laydate失败不显示的解决方案
  10. Android ASM插桩探索及实战
  11. Spring整合FreeMarker生成静态页面(静态模板)
  12. 基于Android手机近距离感应器的俯卧撑计数器
  13. 导师对计算机学生论文的评语,导师对论文的学术评语
  14. 基于Verilog HDL的数字秒表
  15. Ubuntu Core 将支持物联网 Matter
  16. 谈谈eve-ng仿真模器
  17. 华为云弹性公网IP,如何解决现代企业的网络IP烦恼
  18. 微信公众平台群发规则说明
  19. Windows 10 1909(19H2) 精简纯净版PE系统
  20. Python中ArcPy实现对大量长时间序列栅格遥感影像批量逐像元求取像素平均值

热门文章

  1. Outlook2013邮件中的链接打不开提示组织策略阻止
  2. 阿里云免费服务器申请与基本必要配置
  3. 如何将PDF中的黑色字体改成红色
  4. python限制输入类型_《计算机二级Python语言程序设计考试》第3章:基本数据类型...
  5. 电子合同系统,1秒钟识破各种合同调包计
  6. Javaagent使用指南
  7. 木色世界android版本,我的木筏世界手游下载-我的木筏世界安卓版下载v1.0-魅卓网...
  8. Python 打包工具 pyinstaller
  9. 更换MAC笔记本电池
  10. ScrollView小拓展