引言

当新的节点加入集群或者集群中有节点下线了,集群之间可以通过健康检查发现。健康检查的频率是怎么样的?节点的状态又是如何变动的?状态的变动又会触发什么动作。带着这些问题本文捋一捋。

一、内容提要

内容提要

健康检查

  • Nacos节点会向集群其他节点发送健康检查心跳,每一轮频率为2秒

  • 当健康检查异常时设置为不信任「SUSPICIOUS」状态,超过失败最大次数3次设置为下线「DOWN」状态

  • 健康检查成功设置该节点为科通信「UP」状态

  • 无论成功还是失败当节点状态变更时均发布MembersChangeEvent事件

成员变更事件

  • 当集群节点成员变更时,MemberChangeListener会收到该事件

  • 例如回调ClusterRpcClientProxy#onEvent触发refresh

  • 刷新本节点与集群中其他节点的RPC状态,关闭无效的或者增加新的RPC连接

二、健康检查

代码翻到ServerMemberManager#onApplicationEvent,在Nacos启动的时候会启动一个定时任务,第一次延迟5秒执行,该定时任务即负责节点之间的心跳。

@Override
public void onApplicationEvent(WebServerInitializedEvent event) {getSelf().setState(NodeState.UP);if (!EnvUtil.getStandaloneMode()) { // 注解@1GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);}EnvUtil.setPort(event.getWebServer().getPort());EnvUtil.setLocalAddress(this.localAddress);Loggers.CLUSTER.info("This node is ready to provide external services");
}

注解@1 非单机模式延迟5秒执行,执行的infoReportTask为MemberInfoReportTask。

public abstract class Task implements Runnable {protected volatile boolean shutdown = false;@Overridepublic void run() { // 注解@2if (shutdown) {return;}try {executeBody();} catch (Throwable t) {Loggers.CORE.error("this task execute has error : {}", ExceptionUtil.getStackTrace(t));} finally {if (!shutdown) {after();}}}}

注解@2 看下这个Task执行逻辑,先执行 executeBody(),执行结束后执行after()。

class MemberInfoReportTask extends Task {private final GenericType<RestResult<String>> reference = new GenericType<RestResult<String>>() {};private int cursor = 0;@Overrideprotected void executeBody() {// ----------注解@1 start---------------// 获取集群中除了自身以外的其他节点列表List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();if (members.isEmpty()) {return;}// 定义一个游标this.cursor = (this.cursor + 1) % members.size();// 获取每个节信息Member target = members.get(cursor);//-----------注解@1 end-----------------Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress());// 注解@2final String url = HttpUtils.buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT,"/cluster/report");try {// 注解@3asyncRestTemplate.post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version),Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() { @Overridepublic void onReceive(RestResult<String> result) { // 注解@4// 注解@5 返回版本不一致if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value()|| result.getCode() == HttpStatus.NOT_FOUND.value()) {// ...Member memberNew = target.copy();if (memberNew.getAbilities() != null&& memberNew.getAbilities().getRemoteAbility() != null && memberNew.getAbilities().getRemoteAbility().isSupportRemoteConnection()) {memberNew.getAbilities().getRemoteAbility().setSupportRemoteConnection(false);update(memberNew); // 更新节点属性}return;}// 注解@6if (result.ok()) {MemberUtil.onSuccess(ServerMemberManager.this, target);} else {// 注解@7 处理失败上报MemberUtil.onFail(ServerMemberManager.this, target);}}@Overridepublic void onError(Throwable throwable) {// 注解@8 处理失败上报MemberUtil.onFail(ServerMemberManager.this, target, throwable);}@Overridepublic void onCancel() {}});} catch (Throwable ex) {// ...}}@Overrideprotected void after() {GlobalExecutor.scheduleByCommon(this, 2_000L); // 注解@9}
}

注解@1 获取集群中除了自身以外的其他节点列表,通过游标循环每个节点。

注解@2 构造每个节点的上报url请求路径为「/cluster/report」

注解@3 发起Post健康检查请求,请求内容为自身信息Member

注解@4 处理健康检查返回结果,有以下三种类型

注解@5 版本过低错误,这个可能在集群中版本不一致出现

注解@6 处理成功上报,更新该节点member的状态为UP表示科通信,设置失败次数为0,并发布成员变更事件

public static void onSuccess(final ServerMemberManager manager, final Member member) {final NodeState old = member.getState();manager.getMemberAddressInfos().add(member.getAddress());member.setState(NodeState.UP); // 状态为UP可通信状态member.setFailAccessCnt(0); // 失败次数为0if (!Objects.equals(old, member.getState())) {manager.notifyMemberChange(); // 发布成员变更事件}
}

注解@7&注解@8 均为处理失败的上报,例如:集群中一个节点被kill -9 杀掉后。在nacos-cluster.log日志文件中会打印如下日志,并发布成员变更事件

2021-07-0x 16:30:24,994 ERROR failed to report new info to target node : x.x.x.x:8848, error : caused: Connection refused;:2021-07-0x 16:30:30,995 ERROR failed to report new info to target node : x.x.x.x:8848, error : caused: Connection refused;
public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) {manager.getMemberAddressInfos().remove(member.getAddress());final NodeState old = member.getState();// 设置该节点为「不信任」member.setState(NodeState.SUSPICIOUS);// 失败次数递增+1member.setFailAccessCnt(member.getFailAccessCnt() + 1);// 默认最大失败重试次数为3int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3);// If the number of consecutive failures to access the target node reaches// a maximum, or the link request is rejected, the state is directly down// 超过重试次数设置节点状态为「下线」if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils.containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) {member.setState(NodeState.DOWN);}if (!Objects.equals(old, member.getState())) {manager.notifyMemberChange(); // 发布成员变更事件}
}

被kill -9 杀掉的节点显示状态为下线DOWN

注解@9 执行完executeBody后延迟2秒继续执行executeBody,也就是检查健康检查的心跳频率为2秒,一轮全部节点检查结束后延迟2秒接着下一轮

无论检查成功还是失败,当节点状态变更时,发布成员变更事件。

if (!Objects.equals(old, member.getState())) {manager.notifyMemberChange();
}void notifyMemberChange() {NotifyCenter.publishEvent(MembersChangeEvent.builder().members(allMembers()).build());
}

小结: Nacos节点会向集群其他节点发送健康检查心跳,每一轮频率为2秒;当健康检查异常时设置为不信任「SUSPICIOUS」状态,超过失败最大次数3次设置为下线「DOWN」状态;健康检查成功设置该节点为科通信「UP」状态;无论成功还是失败当节点状态变更时均发布MembersChangeEvent事件。

三、成员变更事件

当集群中有节点下线或者新节点上线都会通过心跳健康检查探测对节点状态进行改变。 而状态的变更均会触发成员变更事件MembersChangeEvent。 那订阅到这个事件干啥呢?

ClusterRpcClientProxy继承了MemberChangeListener,当有MembersChangeEvent事件时会回调其onEvent方法。

@Override
public void onEvent(MembersChangeEvent event) {try {List<Member> members = serverMemberManager.allMembersWithoutSelf();refresh(members);} catch (NacosException e) {// ...}
}

那接着看refresh方法。

private void refresh(List<Member> members) throws NacosException {for (Member member : members) {if (MemberUtil.isSupportedLongCon(member)) {// 注解@10createRpcClientAndStart(member, ConnectionType.GRPC);}}Set<Map.Entry<String, RpcClient>> allClientEntrys = RpcClientFactory.getAllClientEntries();Iterator<Map.Entry<String, RpcClient>> iterator = allClientEntrys.iterator();List<String> newMemberKeys = members.stream().filter(a -> MemberUtil.isSupportedLongCon(a)).map(a -> memberClientKey(a)).collect(Collectors.toList());// 关闭旧的grpc连接while (iterator.hasNext()) {Map.Entry<String, RpcClient> next1 = iterator.next();if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) {Loggers.CLUSTER.info("member leave,destroy client of member - > : {}", next1.getKey());RpcClientFactory.getClient(next1.getKey()).shutdown();iterator.remove();}}}

注解@10 为集群中每个节点member创建rcp client,在client启动时会先目标节点发送HealthCheckRequest,如果非健康节点将会被移除。见RpcClient类部分代码。

boolean isHealthy = healthCheck();
// 非健康节点
if (!isHealthy) {if (currentConnection == null) {continue;}LoggerUtils.printIfInfoEnabled(LOGGER,"[{}]Server healthy check fail,currentConnection={}", name,currentConnection.getConnectionId());// 标记客户端状态为unhealthyrpcClientStatus.set(RpcClientStatus.UNHEALTHY);// 重置ReconnectContext移除serverInforeconnectContext = new ReconnectContext(null, false);

这个意味着如果集群中有节点下线,与下线节点的rpc将会失效;同样如果集群中有新节点加入将会建立新的rpc通道。

小结: 当集群节点成员变更时,MemberChangeListener会收到该事件。例如回调ClusterRpcClientProxy#onEvent触发refresh。刷新本节点与集群中其他节点的RPC状态,关闭无效的或者增加新的RPC连接。

Nacos8# 集群中节点之间健康检查相关推荐

  1. HADOOP集群中节点之间的配置文件可否不一样?

    环境: 台式机的hostname是:Desktop 笔记本的hostname是:Laptop #---------------------------------------------------- ...

  2. 容器编排技术 -- Kubernetes 重新配置活动集群中节点的 Kubelet

    容器编排技术 -- Kubernetes 重新配置活动集群中节点的 Kubelet 1 Before you begin 2 重新配置集群活动节点上的 Kubelet 2.1 基本工作流程概览 2.2 ...

  3. Hadoop学习笔记—13.分布式集群中节点的动态添加与下架

    Hadoop学习笔记-13.分布式集群中节点的动态添加与下架 开篇:在本笔记系列的第一篇中,我们介绍了如何搭建伪分布与分布模式的Hadoop集群.现在,我们来了解一下在一个Hadoop分布式集群中,如 ...

  4. pycharm中的settings没有latest version_k8s集群中pod镜像版本检查

    概述 version-checker用于观察k8s集群中运行的镜像的当前版本以及上游可用的最新版本.这些检查作为Prometheus指标公开,可以在gara fana上查看. 该工具目前处于实验阶段. ...

  5. 为什么zookeeper集群中节点配置个数是奇数个?

    Zookeeper的大部分操作都是通过选举产生的.比如,标记一个写是否成功是要在超过一半节点发送写请求成功时才认为有效.同样,Zookeeper选择领导者节点也是在超过一半节点同意时才有效.最后,Zo ...

  6. 如何调试Kubernetes集群中的网络延迟问题

    本文深入研究和解决了 Kubernetes 平台上的服务零星延迟问题,就在不久前我也遇到了类似的问题,看似是玄学事件,刚开始归结于网络链路抖动,一段时间后依然存在,虽然影响都是 P99.99 以后的数 ...

  7. 调试Kubernetes集群中的网络停顿问题

    调试Kubernetes集群中的网络停顿问题 在过去几年,Kubernetes在GitHub已经成为标准的部署模式.目前在GitHub,我们在Kubernetes上运行着海量的面向内部团队以及面向C端 ...

  8. OpenShift - 5分钟自动替换 OpenShift 集群故障节点(附视频)

    <OpenShift / RHEL / DevSecOps 汇总目录> 文本已在OpenShift 4.10环境中进行验证. 文章目录 场景说明 配置 Machine 健康检查 验证自动修 ...

  9. Redis集群添加节点

    Redis集群添加节点 1:首先把需要添加的节点启动 cd /usr/local/cluster/ mkdir 7006 cp /usr/local/cluster/redis.conf  /usr/ ...

最新文章

  1. 安卓程序添加指纹解锁功能
  2. python中编完类后到实例编写_[零基础学python]编写类之一创建实例
  3. python:实现Django简单的网页设计
  4. 完全二叉树的JAVA实现(以及非递归遍历方法)
  5. 用delphi操作mapinfo
  6. python使曲线变得平滑_如何在Python中平滑数据?
  7. SAP UI5 bindProperty
  8. Visual Studio Code 1.0正式发布
  9. Axure原型分类及存在意义
  10. 2018年中小学计算机培训,2018年中小学网管员培训心得体会
  11. 【清华大学陈渝】第一章 操作系统概述
  12. 【数据结构】人名查询哈希表设计(线性探测法)
  13. 怎么以最新汇率牌价计算XX美元相当于多少人民币
  14. c语言满屏爱心,微信聊天可以发满屏动态爱心了 个性又浪漫!
  15. 《工程伦理与学术道德》第二章习题
  16. OPEN3D学习笔记(四)——Global registration
  17. python scrapy 爬取妹子图的照片
  18. 市值超660亿,居然之家成功借壳上市
  19. Excel的检验数据的标准(数据验证-数据的有效性)
  20. 当了一次人肉SMT贴片机后的感悟

热门文章

  1. Linux vi 保存/退出命令
  2. html清除图片上下间距,css - 三种方法解决LI和内部Img的上下间距问题
  3. 数据库事务的四大特性(ACID)
  4. 06、HTML文件和第一个网页
  5. 大吉大利 今晚吃鸡之跑毒篇
  6. win7,如何把文件夹和文件,视图改成平铺,默认是列表
  7. iframe打开base64格式的PDF显示空白
  8. VirtualBox虚拟网络环境配置【两台虚拟机互通】
  9. React关于 this.props.children 总结
  10. Fastjson 1.2.68版本Autotype绕过技巧