了解nacos的心跳机制,需要先了解nacos的服务注册原理;可先阅读https://blog.csdn.net/LiaoHongHB/article/details/103993074

当nacos进行服务注册的时候,NacosServiceRegistry.class会调用register()方法进行服务注册,该方法中调用了namingService.registerInstance()方法进行服务注册的逻辑。

@Overridepublic void register(Registration registration) {if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No service to register for nacos client...");return;}String serviceId = registration.getServiceId();Instance instance = new Instance();instance.setIp(registration.getHost());instance.setPort(registration.getPort());instance.setWeight(nacosDiscoveryProperties.getWeight());instance.setClusterName(nacosDiscoveryProperties.getClusterName());instance.setMetadata(registration.getMetadata());try {namingService.registerInstance(serviceId, instance);log.info("nacos registry, {} {}:{} register finished", serviceId,instance.getIp(), instance.getPort());}catch (Exception e) {log.error("nacos registry, {} register failed...{},", serviceId,registration.toString(), e);}}

NacosNamingService实现了NamingService的接口;然后在namingService.registerInstance()方法中,会做两件事情,第一件事就是组装心跳包BeatInfo,并且发送心跳:

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {if (instance.isEphemeral()) {BeatInfo beatInfo = new BeatInfo();beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));beatInfo.setIp(instance.getIp());beatInfo.setPort(instance.getPort());beatInfo.setCluster(instance.getClusterName());beatInfo.setWeight(instance.getWeight());beatInfo.setMetadata(instance.getMetadata());beatInfo.setScheduled(false);this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);}this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);}

NacosNamingService中的构造函数,会调用init()方法,然后在init方法中会执行一个BeatReactor线程

NacosNamingService中的构造函数和init()方法:

public NacosNamingService(Properties properties) {this.init(properties);}private void init(Properties properties) {this.serverList = properties.getProperty("serverAddr");this.initNamespace(properties);this.initEndpoint(properties);this.initWebRootContext();this.initCacheDir();this.initLogName(properties);this.eventDispatcher = new EventDispatcher();this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList);this.serverProxy.setProperties(properties);//执行心跳的线程this.beatReactor = new BeatReactor(this.serverProxy, this.initClientBeatThreadCount(properties));this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, this.cacheDir, this.isLoadCacheAtStart(properties), this.initPollingThreadCount(properties));}

BeatReactor的构造函数中创建了一个ScheduledExecutorService线程操作对象,然后执行的方法是BeatReactor.BeatProcessor();在BeatProcessor()方法中又执行了一个线程操作,BeatTask线程,然后在BeatTask线程中调用了sendBeat()方法,将心跳包作为参数;

BeatReactor的构造函数:创建一个线程执行类,并执行BeatProcessor()方法

public BeatReactor(NamingProxy serverProxy, int threadCount) {this.clientBeatInterval = 5000L;this.dom2Beat = new ConcurrentHashMap();this.serverProxy = serverProxy;//创建一个线程执行类,并执行BeatProcessor()方法this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {public Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.beat.sender");return thread;}});this.executorService.schedule(new BeatReactor.BeatProcessor(), 0L, TimeUnit.MILLISECONDS);}

BeatProcessor类中的线程操作:执行一个BeatTask线程

public void run() {try {Iterator var1 = BeatReactor.this.dom2Beat.entrySet().iterator();while(var1.hasNext()) {Entry<String, BeatInfo> entry = (Entry)var1.next();BeatInfo beatInfo = (BeatInfo)entry.getValue();if (!beatInfo.isScheduled()) {beatInfo.setScheduled(true);//执行一个BeatTask线程BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(beatInfo), 0L, TimeUnit.MILLISECONDS);}}} catch (Exception var7) {LogUtils.NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", var7);} finally {BeatReactor.this.executorService.schedule(this, BeatReactor.this.clientBeatInterval, TimeUnit.MILLISECONDS);}}

BeatTask线程操作:调用sendBeat()方法

class BeatTask implements Runnable {BeatInfo beatInfo;public BeatTask(BeatInfo beatInfo) {this.beatInfo = beatInfo;}public void run() {//调用sendBeat()方法long result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo);this.beatInfo.setScheduled(false);if (result > 0L) {BeatReactor.this.clientBeatInterval = result;}}}

在sendBeat()方法中,通过http服务,调用了InstanceController.beat()方法,进行心跳的确认:

public long sendBeat(BeatInfo beatInfo) {try {LogUtils.NAMING_LOGGER.info("[BEAT] {} sending beat to server: {}", this.namespaceId, beatInfo.toString());Map<String, String> params = new HashMap(4);params.put("beat", JSON.toJSONString(beatInfo));params.put("namespaceId", this.namespaceId);params.put("serviceName", beatInfo.getServiceName());//http远程调用String result = this.reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, (String)"PUT");JSONObject jsonObject = JSON.parseObject(result);if (jsonObject != null) {return jsonObject.getLong("clientBeatInterval").longValue();}} catch (Exception var5) {LogUtils.NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: " + JSON.toJSONString(beatInfo), var5);}return 0L;}

InstanceController.beat()方法

在InstanceController.beat()方法中,调用了service.processClientBeat(clientBeat)方法;在该方法中调用了HealthCheckReactor.scheduleNow(clientBeatProcessor)方法执行clientBeatProcessor的线程操作;在clientBeatProcessor线程操作中,会通过当前的ip+port找到对应的当前实例,然后调用setLastBeat()方法,最后将当前发送心跳的时间赋值到对应的属性中:

InstanceController.beat():

service.processClientBeat(clientBeat);

service.processClientBeat():

public void processClientBeat(final RsInfo rsInfo) {ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();clientBeatProcessor.setService(this);clientBeatProcessor.setRsInfo(rsInfo);//执行一个clientBeatProcessor线程对象HealthCheckReactor.scheduleNow(clientBeatProcessor);}

 HealthCheckReactor.scheduleNow:

 public static ScheduledFuture<?> scheduleNow(Runnable task) {return EXECUTOR.schedule(task, 0, TimeUnit.MILLISECONDS);}

clientBeatProcessor线程操作:

public void run() {Service service = this.service;if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());}String ip = rsInfo.getIp();String clusterName = rsInfo.getCluster();int port = rsInfo.getPort();Cluster cluster = service.getClusterMap().get(clusterName);List<Instance> instances = cluster.allIPs(true);for (Instance instance : instances) {//根据ip+port获取当前的实例if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());}//设置当前发送心跳的时间instance.setLastBeat(System.currentTimeMillis());if (!instance.isMarked()) {if (!instance.isHealthy()) {instance.setHealthy(true);Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);getPushService().serviceChanged(service);}}}}}

至此,nacos发送心跳的过程就到此结束。

接下俩还要分析的是,nacos是如何定时通过心跳机制判断实例是否存活的原理。

前面说到,namingService.registerInstance()方法中,会做两件事情,第一件事就是组装心跳包BeatInfo,并且发送心跳:

那么第二件事情就是向nacos注册实例,也是通过http调用的方式,将请求发送到InstanceController.register()方法中:

@PostMappingpublic String register(HttpServletRequest request) throws Exception {String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));return "ok";}

该方法中调用了serviceManager.registerInstance方法,registerInstance方法中的逻辑如下:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {//创建service对象createEmptyService(namespaceId, serviceName, instance.isEphemeral());Service service = getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.INVALID_PARAM,"service not found, namespace: " + namespaceId + ", service: " + serviceName);}//将创建好的service对象放入到内存中addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);}

首先会创建一个service对象,然后将该对象放入到内存中;在创建service对象的时候,逻辑如下:

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {createServiceIfAbsent(namespaceId, serviceName, local, null);}
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {Service service = getService(namespaceId, serviceName);if (service == null) {Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();putServiceAndInit(service);if (!local) {addOrReplaceService(service);}}}

创建完service对象之后,调用了putServiceAndInit方法:

private void putServiceAndInit(Service service) throws NacosException {putService(service);service.init();consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());}

主要看service.init()方法:

public void init() {HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}}

该方法中通过HealthCheckReactor.scheduleCheck(clientBeatCheckTask)调用了一个clientBeatCheckTask任务线程,进入到

scheduleCheck方法中:

 public static void scheduleCheck(ClientBeatCheckTask task) {futureMap.putIfAbsent(task.taskKey(), EXECUTOR.scheduleWithFixedDelay(task, 5000, 5000, TimeUnit.MILLISECONDS));}

发现,该方法中是开启了一个定时任务,这个任务是每隔5s就执行一次ClientBeatCheckTask线程操作;接下来看ClientBeatCheckTask线程操作:

@Overridepublic void run() {try {if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List<Instance> instances = service.allIPs(true);// first set health status of instances:for (Instance instance : instances) {if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());getPushService().serviceChanged(service);SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));deleteIP(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}

发现ClientBeatCheckTask线程操作主要有两个事情:

一个是遍历所有的实例对象,判断最后一次心跳发送的时间距离当前时间是否超过了设定的值,如果是,则将该实例的health属性改为false,

第二个事情是遍历所有的实例对象,判断最后一次心跳发送的时间距离当前时间是否超过了可删除时间的值,如果是,则将该实例从内存中删除。

需要注意的是,在InstanceController.beat方法中,如果instance不存在,也会自动的去创建一个instance,调用的方法同InstanceController.register()方法,所以这里也是启动定时线程检查心跳机制的一个入口。

Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);if (instance == null) {if (clientBeat == null) {result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);return result;}instance = new Instance();instance.setPort(clientBeat.getPort());instance.setIp(clientBeat.getIp());instance.setWeight(clientBeat.getWeight());instance.setMetadata(clientBeat.getMetadata());instance.setClusterName(clusterName);instance.setServiceName(serviceName);instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(clientBeat.isEphemeral());serviceManager.registerInstance(namespaceId, serviceName, instance);}Service service = serviceManager.getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.SERVER_ERROR,"service not found: " + serviceName + "@" + namespaceId);}if (clientBeat == null) {clientBeat = new RsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(clusterName);}service.processClientBeat(clientBeat);result.put(CommonParams.CODE, NamingResponseCode.OK);result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());return result;

nacos的心跳机制相关推荐

  1. nacos之心跳机制

    备注360第三天 nacos的心跳机制,现有A.B两个个服务 调用关系 B->A,心跳步骤如下: 启动nacos服务 启动A.B服务,并向nacos发送http请求连接 连接成功后发送心跳连接, ...

  2. nacos的心跳机制详解

    Nacos源码系列整体栏目 [一]nacos服务注册底层源码详解 [二]nacos服务发现底层源码详解 [三]nacos的心跳机制底层源码详解 [四]nacos配置中心的底层源码详解 nacos的心跳 ...

  3. 关于Nacos的心跳机制

    心跳:代表周期性的操作,来表示自己是健康可用的机制 注册到Nacos的微服务项目(模块)都是会遵循这个心跳机制的 心跳机制的目的 1. 是表示当前微服务模块运行状态正常的手段 2. 是表示当前微服务模 ...

  4. SpringCloud Nacos 心跳机制和服务健康检查源码解析

    1 客户端心跳机制 1.1 客户端注册源码流程 https://blog.csdn.net/qq_34125999/article/details/117566523 1.2 NacosNamingS ...

  5. 【微服务】Day03(Nacos心跳机制、cart,stock项目业务、Dubbo概述)

    Nacos心跳机制 常见面试题 心跳:周期性的操作,来表示自己是健康可用的机制 注册到Nacos的微服务项目(模块)都是会遵循这个心跳机制的 心跳机制的目的,是每个服务和Nacos保持沟通和交换信息的 ...

  6. Netty实现心跳机制与断线重连

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 来源:https://www.jianshu.com/p/ ...

  7. Eureka中的心跳机制

    前言 从以前的单体架构到现在的微服务分布式架构, 随着架构的演变, 所需要的技术越来越多, 要求的也越来越多了, 今天来谈一下微服务领域中的心跳机制         在微服务领域,心跳机制很常见了, ...

  8. Netty——心跳机制与断线重连

    心跳机制与断线重连 心跳机制 IdleStateHandler 客户端 服务端 测试 正常情况 异常情况 总结 断线重连 为了保证系统的稳定性,心跳机制和断线重连可是必不可少的,而这两个在Netty中 ...

  9. [心跳] 使用心跳机制实现CS架构下多客户端的在线状态实时更新以及掉线自动重连...

    此文讲述的内容是一个实际项目开发中的一部分内容,笔者将亲身经历写成文章. [背景] 现 需要实现这样的功能:有多个客户端连着同一个服务器.服务器和客户端之间需要"互相"知道彼此的连 ...

最新文章

  1. 完全理解Python迭代对象、迭代器、生成器
  2. IISApp -a查找对应的AppPool
  3. 部署Rsync服务器-差异化数据同步
  4. arcgis server 10.4 安装与授权
  5. mysql 从服务器_如何使用mysql 主从服务器
  6. 作者:张丹(1991-),女,中南大学信息与工程学院硕士生。
  7. A summary of OpenGL ES 3.1 demos and samples
  8. 关于SSH使用的一些经验
  9. 简单快捷 Lambda数组打印
  10. linux 创建mbr分区,MBR分区类型简介
  11. web前端高级实战 - 实现京东淘宝商品详细放大镜效果
  12. regularization 正则化
  13. CLH(Craig, Landin, and Hagersten locks)机制
  14. 6-4 批量求和(*) (20分)
  15. cookie的domain属性
  16. 西电计算机通信与网络复习
  17. 史上最强的美名腾智能起名成功发布
  18. 红帽linux挑战赛题目,浅谈红帽linux挑战赛(三)
  19. 什么是IC卡、M1卡,有啥区别
  20. 手推式洗地机什么牌子好?洗地机品牌排行榜

热门文章

  1. Html导出Excel文件(兼容所有浏览器,支持设置文件名)
  2. 关于XP“网络上有重名”
  3. Linux Python 发送邮件
  4. 抖音小店找达人带货怎么操作?跟达人沟通话术有哪些?一文知悉
  5. 怎样快速将pdf转excel转换
  6. lvcreate 常用命令举例
  7. 温度转换程序中包含的python语法元素_1.4温度转换实例,python语法元素分析
  8. 如何使用Python的filter函数
  9. Tyk API gateway
  10. CentOS7虚拟机 enss33没有ip地址的解决方法