本次解析的版本基于Nacos 1.1.0

功能说明

ServerListManager是nacos naming server中list核心管理类。在启动时会执行一次本地信息到其他服务器,while(true)发生改变的service队列内容进行本地更新,同时启动对com.alibaba.nacos.naming.domains.meta.前缀的key的监听;在运行过程中,执行controller接受的相关请求的功能,完成本地状态和远程服务器同步。

继承体系

首先看下整体结构,有一个大致印象

具体刨析

主要总启动时执行和运行时执行的功能来分析。

启动分析

启动时的执行依赖于注解@PostConstruct的init()方法

    @PostConstructpublic void init() {GlobalExecutor.registerServerListUpdater(new ServerListUpdater());GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 5000);}

第一行代码解析:代码执行的功能是注册serverList的更新器,仍然是由GlobalExecutor来注册调度服务

public static void registerServerListUpdater(Runnable runnable) {executorService.scheduleAtFixedRate(runnable, 0, NACOS_SERVER_LIST_REFRESH_INTERVAL, TimeUnit.MILLISECONDS);}

其调度周期是5ms,即private static final long NACOS_SERVER_LIST_REFRESH_INTERVAL = TimeUnit.SECONDS.toMillis(5);

其具体的执行线程由ServerListUpdater实例来执行,来看下入口代码:

   public class ServerListUpdater implements Runnable {@Overridepublic void run() {try {List<Server> refreshedServers = refreshServerList();List<Server> oldServers = servers;if (CollectionUtils.isEmpty(refreshedServers)) {Loggers.RAFT.warn("refresh server list failed, ignore it.");return;}boolean changed = false;List<Server> newServers = (List<Server>) CollectionUtils.subtract(refreshedServers, oldServers);if (CollectionUtils.isNotEmpty(newServers)) {servers.addAll(newServers);changed = true;Loggers.RAFT.info("server list is updated, new: {} servers: {}", newServers.size(), newServers);}List<Server> deadServers = (List<Server>) CollectionUtils.subtract(oldServers, refreshedServers);if (CollectionUtils.isNotEmpty(deadServers)) {servers.removeAll(deadServers);changed = true;Loggers.RAFT.info("server list is updated, dead: {}, servers: {}", deadServers.size(), deadServers);}if (changed) {notifyListeners();}} catch (Exception e) {Loggers.RAFT.info("error while updating server list.", e);}}}

代码比较长逐步进行分析:首先refreshServerList()刷新来获取最新的服务列表,refreshServerList()具体逻辑是,首先本段是否单机启动模式,如果是直接将本地server和端口封装后返回;如果不是单机模式启动,会首先执行serverList = readClusterConf()方法,该方法会依据配置文件的路径,如下,读取配置文件

private static String getClusterConfFilePath() {return NACOS_HOME + File.separator + "conf" + File.separator + "cluster.conf";
}

如果从配置文件获取到的为空的;则尝试从serverList = SystemUtils.getIPsBySystemEnv(UtilsAndCommons.SELF_SERVICE_CLUSTER_ENV) ,即从启动的环境变量中进行获取。最后组装成server的list进行返回,从中可以看出,server cluster的配置依赖于3处,1,启动模式; 2,配置文件 ; 3,环境变量(启动参数)。在集群模式下,配置文件是最主要的方式,因为是定时扫描所以,可以看出更新配置文件后不需要重新启动即可加入集群。

到此,就完成了refreshServerList()刷新最新的服务列表。

下面执行刷新到的serverList和上次的old servers的比较。

1, 获取新的serverList与old servers的差集,也即获得新加入的new servers,如果不为空,则加入到servers并置位changed = true。

2,获取old servers 与 新的serverList的差集,也即获得已经删掉的或者死去的server,并并从servers中删除并置位changed = true。

如果,最终servers发生了改变,则调用notifyListeners()发起通知。其具体调用逻辑如下:

private void notifyListeners() {GlobalExecutor.notifyServerListChange(new Runnable() {@Overridepublic void run() {for (ServerChangeListener listener : listeners) {listener.onChangeServerList(servers);listener.onChangeHealthyServerList(healthyServers);}}});}
public static void notifyServerListChange(Runnable runnable) {notifyServerListExecutor.submit(runnable);}

可见,执行器仍然是GlobalExecutor,其方法是notifyServerListChange提交任务,是一个匿名线程来进行执行。

该方法会遍历List<ServerChangeListener>实例listeners,并分别将servers,healthyServers为参数触发onChangeServerList,onChangeHealthyServerList方法执行响应的逻辑。

ServerChangeListener 接口定义如下,仅有这2个方法。

/*** Nacos cluster member change event listener** @author nkorange* @since 1.0.0*/
public interface ServerChangeListener {/*** If member list changed, this method is invoked.** @param servers servers after change*/void onChangeServerList(List<Server> servers);/*** If reachable member list changed, this method is invoked.** @param healthyServer reachable servers after change*/void onChangeHealthyServerList(List<Server> healthyServer);
}

在nacos中有2个实现:

第二行代码解析:代码执行的功能是注册server status的上报器,仍然是由GlobalExecutor来注册调度服务

public static void registerServerStatusReporter(Runnable runnable, long delay) {SERVER_STATUS_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS);}

这个是仅执行1次调度,调度的延迟是5000ms。

由ServerStatusReporter的线程来执行具体的逻辑,执行入口如下:

 private class ServerStatusReporter implements Runnable {@Overridepublic void run() {try {if (RunningConfig.getServerPort() <= 0) {return;}for (String key : distroConfig.keySet()) {for (Server server : distroConfig.get(key)) {server.setAlive(System.currentTimeMillis() - server.getLastRefTime() < switchDomain.getDistroServerExpiredMillis());}}int weight = Runtime.getRuntime().availableProcessors() / 2;if (weight <= 0) {weight = 1;}long curTime = System.currentTimeMillis();String status = LOCALHOST_SITE + "#" + NetUtils.localServer() + "#" + curTime + "#" + weight + "\r\n";//send status to itselfonReceiveServerStatus(status);List<Server> allServers = getServers();if (!contains(NetUtils.localServer())) {Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}", NetUtils.localServer(), allServers);return;}if (allServers.size() > 0 && !NetUtils.localServer().contains(UtilsAndCommons.LOCAL_HOST_IP)) {for (com.alibaba.nacos.naming.cluster.servers.Server server : allServers) {if (server.getKey().equals(NetUtils.localServer())) {continue;}Message msg = new Message();msg.setData(status);synchronizer.send(server.getKey(), msg);}}} catch (Exception e) {Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e);} finally {GlobalExecutor.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());}}}

代码比较多,但是逻辑比较简单,首先判断本地server端口是否有效,再根据当前时间与上次时间戳的差值是否与指定阈值distroServerExpiredMillis 的进行大小比较,来判定server的有效性并进行赋值,其中distroServerExpiredMillis 值是30ms

/*** The server is regarded as expired if its two reporting interval is lagger than this variable.*/private long distroServerExpiredMillis = TimeUnit.SECONDS.toMillis(30);

继续往下执行,根据int weight = Runtime.getRuntime().availableProcessors() / 2;确定服务器的权重,即,根据服务器的性能判定改server的权重,这还是比较巧妙地方法。

继续执行,按照一定格式组装status,并首先发送给自己(本服务器)

long curTime = System.currentTimeMillis();
String status = LOCALHOST_SITE + "#" + NetUtils.localServer() + "#" + curTime + "#" + weight + "\r\n";
//send status to itself
onReceiveServerStatus(status);

其中,private final static String LOCALHOST_SITE = UtilsAndCommons.UNKNOWN_SITE; public static final String UNKNOWN_SITE = "unknown";即LOCALHOST_SITE=“unkown”。

onReceiveServerStatus(status)逻辑相对复杂,会放在一篇单独的文章中进行讲解

最后,将status封装成Message通过synchronizer.send(server.getKey(), msg);发送到其他的server。

这里synchronizer是private Synchronizer synchronizer = new ServerStatusSynchronizer();即ServerStatusSynchronizer的send方法。进一步看下,send的实现逻辑。

public void send(final String serverIP, Message msg) {if (StringUtils.isEmpty(serverIP)) {return;}final Map<String, String> params = new HashMap<String, String>(2);params.put("serverStatus", msg.getData());String url = "http://" + serverIP + ":" + RunningConfig.getServerPort()+ RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/operator/server/status";if (serverIP.contains(UtilsAndCommons.IP_PORT_SPLITER)) {url = "http://" + serverIP + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT+ "/operator/server/status";}try {HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler() {@Overridepublic Integer onCompleted(Response response) throws Exception {if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}",serverIP);return 1;}return 0;}});} catch (Exception e) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}", serverIP, e);}}

可见最终是调用的asyncHttpGet方法发送出去的,也就是异步http的get方法,其发送path是RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/operator/server/status",其中RunningConfig.getContextPath()默认是/nacos,即/nacos/v1/ns/operator/server/status。从path的第4节path可以看出,该请求会被OperatorController类接收。

Nacos系列3---源码刨析naming服务的server列表核心管理类ServerListManager相关推荐

  1. Nacos系列5---源码刨析naming服务的核心开关类SwitchDomain刨析

    本次解析的版本基于Nacos 1.1.0 功能说明 SwitchDomain是整个naming的核心开关配置类,几乎所有关键的开关配置信息都在该类中定义. 继承体系 首先看下整体结构,有一个大致印象 ...

  2. spring源码刨析总结

    spring源码刨析笔记 1.概述 spring就是 spring Framework Ioc Inversion of Control(控制反转/反转控制) DI Dependancy Inject ...

  3. Metis异常检测算法率值检测和量值检测源码刨析

    Metis异常检测算法率值检测和量值检测源码刨析 1. 测试代码 2. 率值检测 2.1 rate_predict方法(detect.py) 2.2 predict方法(statistic.py) 2 ...

  4. springMvc源码刨析笔记

    springMvc源码刨析笔记 MVC 全名是 Model View Controller,是 模型(model)-视图(view)-控制器(controller) 的缩写, 是⼀种⽤于设计创建 We ...

  5. zookeeper笔记+源码刨析

    会不断更新!冲冲冲!跳转连接 https://blog.csdn.net/qq_35349982/category_10317485.html zookeeper 1.介绍 Zookeeper 分布式 ...

  6. MapReduce源码刨析

    MapReduce编程刨析: Map map函数是对一些独立元素组成的概念列表(如单词计数中每行数据形成的列表)的每一个元素进行指定的操作(如把每行数据拆分成不同单词,并把每个单词计数为1),用户可以 ...

  7. mybatis源码刨析总结

    拉勾 mybatis 初始化 1.创建git仓库 1.新建一个目录 然后点击右键 git base here 创建git (会弹出一个窗口) 2.初始化 再窗口输入 git init 3.指定仓库 g ...

  8. dubbo笔记+源码刨析

    会不断更新!冲冲冲!跳转连接 https://blog.csdn.net/qq_35349982/category_10317485.html dubbo笔记 1.概念 RPC全称为remote pr ...

  9. JsonRpc源码--处理http请求源码刨析

    从jsonRpc接入http请求直至开始业务逻辑处理总体层级如下: JsonServiceExporter->handleRequest-> handle -> handleRequ ...

最新文章

  1. java使用类似ini文件IniProperties的类
  2. 05 MapReduce应用案例02
  3. Struts2原理图
  4. 如何对一个对象进行深拷贝
  5. 【报告分享】2021年95后医美人群洞察报告:颜值经济,95后“美力”来袭.pdf(附下载链接)...
  6. 基于SSH的电子政务系统(附论文)
  7. 手机内存垃圾不会清理?学会删除这几个文件夹,瞬间腾出几个G
  8. 【Linux修炼】开篇
  9. 基于Cesium的火箭发射演示
  10. sendgrid html text,在Node.js中的SendGrid的“发件人”字段中添加名称
  11. nginx 域名解析
  12. 水库安全监测自动化系统解决方案
  13. 已解决:H5移动端网页实现录音功能,js实现录音功能,包括安卓webview接口也可以使用
  14. ETL(大数据)测试实战篇(二)
  15. 接触角测量案例分享及问题解答(二)
  16. bp神经网络预测模型优点,什么是BP神经网络模型?
  17. Siemens PLC S7-1500 AES 加,解密算法
  18. 将matlab文件写成csv格式
  19. BIM室内装饰应用到底有多牛?Revit插件告诉你!
  20. 你知道吗,自信是你成功的第一秘诀

热门文章

  1. 智慧城市总体解决方案和建设思路
  2. 有关路径规划入门的学习记录
  3. 高效程序员秘籍(1):使用AutoHotKey快速切换窗口
  4. 基于 Webpack4 搭建 Vue 开发环境
  5. MySQL引擎——TokuDB与RocksDB
  6. 转:在VS2008中调用Matlab的m文件
  7. C语言 编写倒计时程序
  8. 王佩丰excel2010基础教程学习笔记(第一讲到第五讲)
  9. Excel如何把事务型数据转化为可以直接关联性分析数据
  10. 【Java编程】遗传算法简介与简单二进制编码计算