目录

1、distro协议简介

2、设计思想

3、DistroProtocol类介绍

3.1、DistroComponentHolder

3.2、DistroTaskEngineHolder

3.3、distro协议启动

4、ClientManager

5、distro验证任务详解

5.1、getVerifyData方法解析

5.1.1、DistroDataStorageImpl——v1版本

5.1.2、DistroClientDataProcessor——v2版本

5.2、DistroVerifyExecuteTask

5.3、验证校验数据

5.3.1、v1版本校验数据原理

5.3.2、v2版本校验数据

5.3.3、v2版本校验数据原理——ConnectionBasedClient的验证

5.3.4、v2版本校验数据原理——临时IpPortBasedClient的验证

5.4、ConnectionBasedClient和临时的IpPortBasedClient删除

5.3.4、ConnectionBasedClient的删除

5.3.5、临时IpPortBasedClient的删除

6、总结


1、distro协议简介

Distro 协议是 Nacos 社区自研的一种 AP 分布式协议,是面向临时实例设计的一种分布式协议,其保证了在某些 Nacos 节点宕机后,整个临时实例处理系统依旧可以正常工作。其数据存储在缓存中,会在启动时进行数据的全量同步,并定期进行数据校验。

2、设计思想

  1. Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点。
  2. 每个节点只负责部分数据,定时发送自己负责数据的校验值到其他节点来保持数据一致性。
  3. 每个节点独立处理读请求,及时从本地发出响应。

3、DistroProtocol类介绍

distro协议的具体实现类为DistroProtocol,下面介绍一下该类中的几个重要属性

    // 服务器成员管理器private final ServerMemberManager memberManager;// distro组件的持有者,用来注册/获取各个组件private final DistroComponentHolder distroComponentHolder;// distro任务引擎持有者private final DistroTaskEngineHolder distroTaskEngineHolder;

memberManager的作用主要是在集群模式下提供服务器ip等信息方便远程调用;

下面我们来看看其他两个属性

3.1、DistroComponentHolder

@Component
public class DistroComponentHolder {// 存储不同类型的DistroData传输对象private final Map<String, DistroTransportAgent> transportAgentMap = new HashMap<>();// 存储不同类型的DistroData存储器器private final Map<String, DistroDataStorage> dataStorageMap = new HashMap<>();// 存储不同类型的Distro失败任务处理器private final Map<String, DistroFailedTaskHandler> failedTaskHandlerMap = new HashMap<>();// 存储不同类型的DistroData数据处理器private final Map<String, DistroDataProcessor> dataProcessorMap = new HashMap<>();// ...省略一些注册方法
}

了解了这几个属性后我们看看他们是在哪里注册进去的。

它们注册的途径有两个,一个是DistroClientComponentRegistry,另一个是DistroHttpRegistry。下面我们来看看他们的具体方法

    // DistroClientComponentRegistry@PostConstructpublic void doRegister() {// 创建distro处理器DistroClientDataProcessor dataProcessor = new DistroClientDataProcessor(clientManager, distroProtocol,upgradeJudgement);// 创建distro传输代理对象DistroTransportAgent transportAgent = new DistroClientTransportAgent(clusterRpcClientProxy,serverMemberManager);// 创建失败处理器,该处理器主要是添加失败重试任务DistroClientTaskFailedHandler taskFailedHandler = new DistroClientTaskFailedHandler(taskEngineHolder);// 注册Nacos:Naming:v2:ClientData类型数据的数据仓库实现componentHolder.registerDataStorage(DistroClientDataProcessor.TYPE, dataProcessor);// 注册Nacos:Naming:v2:ClientData类型的DistroData数据处理器componentHolder.registerDataProcessor(dataProcessor);// 注册Nacos:Naming:v2:ClientData类型数据的数据传输代理对象实现componentHolder.registerTransportAgent(DistroClientDataProcessor.TYPE, transportAgent);// 注册Nacos:Naming:v2:ClientData类型的失败任务处理器componentHolder.registerFailedTaskHandler(DistroClientDataProcessor.TYPE, taskFailedHandler);}// DistroHttpRegistry@PostConstructpublic void doRegister() {// 创建distro数据存储对象componentHolder.registerDataStorage(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,new DistroDataStorageImpl(dataStore, distroMapper));// 创建distro传输代理对象httpcomponentHolder.registerTransportAgent(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpAgent(memberManager));// 创建失败处理器,该处理器主要是添加失败重试任务componentHolder.registerFailedTaskHandler(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,new DistroHttpCombinedKeyTaskFailedHandler(taskEngineHolder));// 添加distro的http延时任务处理器taskEngineHolder.registerNacosTaskProcessor(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,new DistroHttpDelayTaskProcessor(globalConfig, taskEngineHolder));// 注册数据处理器componentHolder.registerDataProcessor(consistencyService);}

它们的两种注册分别对应着nacos-v1和v2两个版本,它们的具体作用在之后的具体功能之中再详细解释。

3.2、DistroTaskEngineHolder

@Component
public class DistroTaskEngineHolder {private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine();public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {// 延时任务引擎设置默认任务处理器DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);}public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {return delayTaskExecuteEngine;}public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {return executeWorkersManager;}public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor);}
}

这个类中有一个延时执行引擎和一个立即执行引擎,在创建类时会给延时执行引擎设置一个默认的任务处理器。DistroHttpRegistry中会注册一个任务处理器,则v1版本的话不会使用默认的任务处理器。(执行引擎的分析在第一篇已经详细说过了,这里不再重复解释)。

3.3、distro协议启动

协议的开启主要是从startDistroTask方法开始,在DistroProtocol的构造方法中会调用启动方法,下面我们来看看这个方法

private void startDistroTask() {// 判断是否是单机模式,如果是单机模式,则直接初始化完成,不继续执行下面操作if (EnvUtil.getStandaloneMode()) {isInitialized = true;return;}// 开启验证任务startVerifyTask();// 开启加载全量拉取distro数据快照startLoadTask();}

在看校验任务的方法之前,我们先来看看上一篇没有介绍的ClientManager。

4、ClientManager

先来看看UML图

简单介绍下

  • ConnectionBasedClientManager:用于管理客户端与服务器之间的连接客户端ConnectionBasedClient;
  • EphemeralIpPortClientManager:用于管理临时实例的IpPortBasedClient;
  • PersistentIpPortClientManager:用于管理持久化实例的IpPortBasedClient;

在ConnectionManager中的registry方法中,会发送客户端连接事件来进行通知,因为ConnectionBasedClientManager继承了ClientConnectionEventListener监听器,之后会执行ConnectionBasedClientManager的clientConnected方法创建出client对象并缓存到ConnectionBasedClientManager的clients属性中,如下图所示

 而在ConnectionBasedClientManager创建的时候会在构造方法中开启一个定时任务,用来判断客户端是否已经过期(但是这里的过期只会判断是distro协议同步过来的客户端),如下图所示:

 像其它两种clientMnager也差不多的实现方式,只是其它两种是在创建实例的时候进行创建client实例,而ConnectionBasedClientManager是实现了监听器来创建client实例的。

5、distro验证任务详解

private void startVerifyTask() {// 开启定时任务执行DistroVerifyTimedTaskGlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,distroTaskEngineHolder.getExecuteWorkersManager()),DistroConfig.getInstance().getVerifyIntervalMillis());}// DistroVerifyTimedTask实现了Runnable
@Overridepublic void run() {try {List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("server list is: {}", targetServer);}// 每一种类型的数据,都要向其他节点发起验证for (String each : distroComponentHolder.getDataStorageTypes()) {// 对dataStorage内的数据进行验证verifyForDataStorage(each, targetServer);}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);}}private void verifyForDataStorage(String type, List<Member> targetServer) {// 根据类型拿到对应的数据存储器DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);// 判断该存储器有没有初始化完成(在启动时会全量拉取数据,拉去完则会设置FinishInitial为true)if (!dataStorage.isFinishInitial()) {Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",dataStorage.getClass().getSimpleName());return;}// 拿到要校验的distro数据集合List<DistroData> verifyData = dataStorage.getVerifyData();if (null == verifyData || verifyData.isEmpty()) {return;}for (Member member : targetServer) {// 获取到对应的传输代理http or rpcDistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);if (null == agent) {continue;}// 获取到distro传输代理对象,依次添加任务到立即执行引擎executeTaskExecuteEngine.addTask(member.getAddress() + type,new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));}}// DistroVerifyExecuteTask
@Overridepublic void run() {// 循环数据for (DistroData each : verifyData) {try {// 判断是否支持回调传输数据,http不支持if (transportAgent.supportCallbackTransport()) {// 发送验证数据带回调doSyncVerifyDataWithCallback(each);} else {// 发送验证数据doSyncVerifyData(each);}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer, e);}}}

总结一下上述代码做的事。

  1. 开启了一个定时任务,用于验证数据;
  2. 循环对v1和v2的数据处理,得到要校验的数据;
  3. 将md5数据发给集群中的其他节点校验;

5.1、getVerifyData方法解析

5.1.1、DistroDataStorageImpl——v1版本

先来看看v1版本的实现,v1版本的实现是由DistroDataStorageImpl来实现的。

@Overridepublic List<DistroData> getVerifyData() {// If upgrade to 2.0.X, do not verify for v1.// 当集群升级到2.0版本的时候,这里就不进行校验了,因为之后都是v2的数据了// TODO 这个地方如果v1数据其中一个节点新增数据,还没来得及校验,就马上升级会产生问题吧?if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {return Collections.emptyList();}Map<String, String> keyChecksums = new HashMap<>(64);for (String key : dataStore.keys()) {// 判断服务器是否是可以处理这个keyif (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {continue;}Datum datum = dataStore.get(key);if (datum == null) {continue;}// 拿到value的md5值并设置进去keyChecksums.put(key, datum.value.getChecksum());}if (keyChecksums.isEmpty()) {return Collections.emptyList();}DistroKey distroKey = new DistroKey(KeyBuilder.RESOURCE_KEY_CHECKSUM, KeyBuilder.INSTANCE_LIST_KEY_PREFIX);DistroData data = new DistroData(distroKey, ApplicationUtils.getBean(Serializer.class).serialize(keyChecksums));data.setType(DataOperation.VERIFY);return Collections.singletonList(data);}

上面代码中的dataStore是用来实际存储临时实例的数据的对象(这个在之后的实例注册篇会详细讲),上面方法中还有一个responsible方法,下面来看下这个方法的具体内容

public boolean responsible(String responsibleTag) {final List<String> servers = healthyList;//如果没有启用Distro或者单体的(非集群),则直接返回trueif (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {// 表示需要处理这个服务return true;}// 如果健康节点列表为空,则说明分布式配置还没有准备好,返回 falseif (CollectionUtils.isEmpty(servers)) {// means distro config is not ready yetreturn false;}// 参考https://github.com/alibaba/nacos/issues/5902int index = servers.indexOf(EnvUtil.getLocalAddress());int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());// 要是本机不在健康的服务器列表,则直接返回true// 当本地节点不健康的时候,这个时候所有的数据都是需要处理校验的,直接返回trueif (lastIndex < 0 || index < 0) {return true;}// 计算标签对应的散列值,然后将它对健康节点数量取模,得到映射到的节点位置int target = distroHash(responsibleTag) % servers.size();// 如果当前节点的位置在映射节点位置的左边,或者在映射节点位置的右边,也就是说不是属于本机,则不需要处理该服务return target >= index && target <= lastIndex;}

responsible方法主要是为了使其各个服务器只负责一部分数据的处理,这里的作用是在数据校验的时候,只筛选出校验自己本身负责的数据即可。

经过上面的分析,可见v1版本获取的校验数据是md5后的数据,下面我们来看看v2的。

5.1.2、DistroClientDataProcessor——v2版本

@Overridepublic List<DistroData> getVerifyData() {List<DistroData> result = new LinkedList<>();// 获取所有的clientId,遍历各个clientfor (String each : clientManager.allClientId()) {Client client = clientManager.getClient(each);if (null == client || !client.isEphemeral()) {continue;}// 这里的client也可能是ConnectionBasedClient,不可能是持久化的IpPortBasedClientif (clientManager.isResponsibleClient(client)) {// TODO add revision for client.DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);DistroData data = new DistroData(distroKey,ApplicationUtils.getBean(Serializer.class).serialize(verifyData));data.setType(DataOperation.VERIFY);result.add(data);}}return result;}

上面的代码比较容易懂,其中isResponsibleClient方法需要解释下,我们来看看这个方法的具体内容

// EphemeralIpPortClientManager
@Overridepublic boolean isResponsibleClient(Client client) {if (client instanceof IpPortBasedClient) {return distroMapper.responsible(((IpPortBasedClient) client).getResponsibleId());}return false;}// ConnectionBasedClientManager
@Overridepublic boolean isResponsibleClient(Client client) {return (client instanceof ConnectionBasedClient) && ((ConnectionBasedClient) client).isNative();}

上面两个方法,第一个是临时实例的,和v1时一样,会调用distroMapper.responsible方法来判断是否可以处理该标签。而第二个方法是用来校验各个服务器之间ConnectionBasedClient实例的,这个在v1中是没有的。上面的第二个方法中,判断该ConnectionBasedClient实例是不是同步过来的,如果isNative为true,则表示不是同步过来的实例,可以处理。

解释完isResponsibleClient之后,其余的代码就是封装序列化校验的数据,和v1类似,只是v2校验的数据内容是clientId,而v1是存储在dataStore的数据hash后的md5值。到此getVerifyData方法就分析完毕了。

5.2、DistroVerifyExecuteTask

现在有了校验数据,下面就是要将验证数据发送给其它服务器。实际发送校验数据是由transportAgent来负责的,它有两个实现,v1的校验数据走DistroHttpAgent(http),v2走DistroClientTransportAgent(rpc)

@Overridepublic void run() {// 循环发送校验数据for (DistroData each : verifyData) {try {// 判断是否支持回调传输数据,http不支持if (transportAgent.supportCallbackTransport()) {// 发送验证数据带回调doSyncVerifyDataWithCallback(each);} else {// 发送验证数据doSyncVerifyData(each);}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer, e);}}}// v1校验数据走的逻辑
@Overridepublic boolean syncVerifyData(DistroData verifyData, String targetServer) {if (!memberManager.hasMember(targetServer)) {return true;}// 构建http请求NamingProxy.syncCheckSums(verifyData.getContent(), targetServer);return true;}// v2校验数据走的逻辑
@Overridepublic void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {// 判断是否还存在目标服务器if (isNoExistTarget(targetServer)) {callback.onSuccess();return;}DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);Member member = memberManager.find(targetServer);try {// 创建回调类DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,verifyData.getDistroKey().getResourceKey(), callback, member);// 发送rpc请求clusterRpcClientProxy.asyncRequest(member, request, wrapper);} catch (NacosException nacosException) {callback.onFailed(nacosException);}}public void asyncRequest(Member member, Request request, RequestCallBack callBack) throws NacosException {RpcClient client = RpcClientFactory.getClient(memberClientKey(member));if (client != null) {client.asyncRequest(request, callBack);} else {throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);}}

以上代码就是发送验证数据的方法,可以看到的v2是rpc请求,实际上就是调用了rpcClient,在上一篇分析过了的。至此,我们发送验证数据方法就讲解完毕了。下面我们来看看接收验证数据的方法。

5.3、验证校验数据

5.3.1、v1版本校验数据原理

// DistroController
@PutMapping("/checksum")public ResponseEntity syncChecksum(@RequestParam String source, @RequestBody Map<String, String> dataMap) {DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(source), dataMap);distroProtocol.onVerify(distroHttpData, source);return ResponseEntity.ok("ok");}

v1是以http的形式发送的校验的数据,在服务端接收到该数据之后会调用distroProtocal的onVerify方法,下面看看onVerify方法。

public boolean onVerify(DistroData distroData, String sourceAddress) {if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(),distroData.getDistroKey());}String resourceType = distroData.getDistroKey().getResourceType();DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);return false;}// 使用对于的数据处理器来处理return dataProcessor.processVerifyData(distroData, sourceAddress);}

在v1版本,则获取到的数据处理器是DistroConsistencyServiceImpl

@Overridepublic boolean processVerifyData(DistroData distroData, String sourceAddress) {// If upgrade to 2.0.X, do not verify for v1.if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {return true;}DistroHttpData distroHttpData = (DistroHttpData) distroData;// 获取反序列化后的校验数据Map<String, String> verifyData = (Map<String, String>) distroHttpData.getDeserializedContent();onReceiveChecksums(verifyData, sourceAddress);return true;}public void onReceiveChecksums(Map<String, String> checksumMap, String server) {// 判断是否正在处理if (syncChecksumTasks.containsKey(server)) {// Already in process of this server:Loggers.DISTRO.warn("sync checksum task already in process with {}", server);return;}syncChecksumTasks.put(server, "1");try {List<String> toUpdateKeys = new ArrayList<>();List<String> toRemoveKeys = new ArrayList<>();// 遍历要校验的数据for (Map.Entry<String, String> entry : checksumMap.entrySet()) {// 如果接受到自己负责的数据,则直接不处理即可(hash算法在服务器列表发生改变的时候会产生这样的情况)if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {// this key should not be sent from remote server:Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);// abort the procedure:return;}// 如果当前节点没有该数据或者数据是空的或者当前节点的数据的md5值和传过来的md5值不一样,则添加到更新集合if (!dataStore.contains(entry.getKey()) || dataStore.get(entry.getKey()).value == null || !dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {toUpdateKeys.add(entry.getKey());}}for (String key : dataStore.keys()) {// 如果不是请求方的数据,则不处理// 这里的mapSrv方法是获取这个数据是属于哪个服务器管理的,与来源的服务器做对比(因为本地的dataStore存储着各个服务器负责的数据)if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {continue;}// 是请求方的数据,但是请求过来的数据在本地存在,说明被删除了,则添加到删除列表if (!checksumMap.containsKey(key)) {toRemoveKeys.add(key);}}Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", toRemoveKeys, toUpdateKeys, server);// 循环删除本地要删除的数据for (String key : toRemoveKeys) {onRemove(key);}if (toUpdateKeys.isEmpty()) {return;}try {DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,server);distroKey.getActualResourceTypes().addAll(toUpdateKeys);// 因为传递过来的是md5值,这里需要根据传过来的数据key来查询对应的distro数据DistroData remoteData = distroProtocol.queryFromRemote(distroKey);if (null != remoteData) {// 处理数据processData(remoteData.getContent());}} catch (Exception e) {Loggers.DISTRO.error("get data from " + server + " failed!", e);}} finally {// Remove this 'in process' flag:syncChecksumTasks.remove(server);}}

以上代码重要的地方都已经标注了注释,说起来其实就是做了这么几件事

  1. 判断本地的数据和获取的数据是否有区别,有区别添加到更新列表,之后根据更新列表这个key的集合去取真实的数据(因为只接收到了md5),拿到数据之后去更新数据
  2. 从本地数据拿到属于发送过来的服务器管理的数据,和接收到的数据进行对比,如果不存在则表示已经被删除,则本地执行数据删除

以下是更新本地数据的方法processData的代码

private boolean processData(byte[] data) throws Exception {if (data.length > 0) {// 反序列化出需要修改或者新增的数据Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);// 循环数据处理for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {// 更新或者新增数据dataStore.put(entry.getKey(), entry.getValue());// 判断监听器是否监听该key(如果有监听器表示该实例是已经存在的,如果没有监听器表示实例都不存在,如果实例都不存在,那么就需要创建服务)if (!listeners.containsKey(entry.getKey())) {// pretty sure the service not exist:// 如果没有监听器监听,则判断是否是临时实例if (switchDomain.isDefaultInstanceEphemeral()) {// create empty service// 创建空服务Loggers.DISTRO.info("creating service {}", entry.getKey());Service service = new Service();String serviceName = KeyBuilder.getServiceName(entry.getKey());String namespaceId = KeyBuilder.getNamespace(entry.getKey());service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(Constants.DEFAULT_GROUP);// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();// The Listener corresponding to the key value must not be emptyRecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();if (Objects.isNull(listener)) {return false;}// 初始化服务,如果是原来不存在这个服务则创建,反之更新(但是这里没有实例信息,所以无法更新),并且设置实例的监听器等操作listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);}}}for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {if (!listeners.containsKey(entry.getKey())) {// Should not happen:Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());continue;}try {// 获取到有监听器监听的实例(包含了上面新增的实例,因为上面方法给新增实例新增了监听器)// 获取监听该key的监听器,执行onChange方法for (RecordListener listener : listeners.get(entry.getKey())) {// 获取监听该key的监听器,执行onChange方法(此时distro存储的数据需要修改到core/Service中,保持数据一致)listener.onChange(entry.getKey(), entry.getValue().value);}} catch (Exception e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);continue;}// Update data store if listener executed successfully:// 执行监听器后数据发生了修改,更新数据dataStore.put(entry.getKey(), entry.getValue());}}return true;}

processData主要做的事:

  1. 反序列化数据,拿到需要更新的数据
  2. 循环数据,直接put数据。如果该数据被监听,则表明这个数据是新增的,则创建空服务。如果是原来不存在这个服务则创建,反之更新(但是这里没有实例信息,所以无法更新),并且设置实例的监听器等操作
  3. 循环数据,根据key获取到有监听器监听的实例(包含了上面新增的实例,因为上面方法给新增实例新增了监听器),根据distro数据来更新core/Service中的数据,使其和distro数据一致
  4. 重新put 数据到distro数据存储器

以上设计到了nacos v1相关的实例新增逻辑,这块在之后的实例注册篇会展开说,这里先带过。通过以上逻辑,验证并同步了服务器之间的distro数据和core/Service中的数据。

5.3.2、v2版本校验数据

v2相比v1来说相对要复杂一些,下面我们来看看v2的校验逻辑。先从DistroDataRequestHandler类来入手(怎么调用到的可以查看nacos的rpc篇)。DistroDataRequestHandler代码如下:

public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {try {switch (request.getDataOperation()) {case VERIFY:return handleVerify(request.getDistroData(), meta);case SNAPSHOT:return handleSnapshot();case ADD:case CHANGE:case DELETE:return handleSyncData(request.getDistroData());case QUERY:return handleQueryData(request.getDistroData());default:return new DistroDataResponse();}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);DistroDataResponse result = new DistroDataResponse();result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("handle distro request with exception");return result;}}private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {DistroDataResponse result = new DistroDataResponse();if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");}return result;}public boolean onVerify(DistroData distroData, String sourceAddress) {if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(),distroData.getDistroKey());}String resourceType = distroData.getDistroKey().getResourceType();DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);return false;}// 使用对于的数据处理器来处理return dataProcessor.processVerifyData(distroData, sourceAddress);}// DistroClientDataProcessor
@Overridepublic boolean processVerifyData(DistroData distroData, String sourceAddress) {DistroClientVerifyInfo verifyData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), DistroClientVerifyInfo.class);// 根据clientId来验证不同的客户端if (clientManager.verifyClient(verifyData.getClientId())) {return true;}Loggers.DISTRO.info("client {} is invalid, get new client from {}", verifyData.getClientId(), sourceAddress);return false;}@Overridepublic boolean verifyClient(String clientId) {return getClientManagerById(clientId).verifyClient(clientId);}private ClientManager getClientManagerById(String clientId) {if (isConnectionBasedClient(clientId)) {return connectionBasedClientManager;}return clientId.endsWith(SUFFIX) ? persistentIpPortClientManager : ephemeralIpPortClientManager;}private boolean isConnectionBasedClient(String clientId) {// ID_DELIMITER是个#return !clientId.contains(IpPortBasedClient.ID_DELIMITER);}

因为v2的验证数据在上面有说过是clientId,所以这个地方利用了委派模式来根据clientId来区分,ConnectionBasedClient的clientId是不带#的,而临时实例的则是#true。

因为v2使用了rpc来通信,所以在服务端多了维护客户端的连接对象(ConnectionBasedClient),这块它也是需要distro来同步的。下面我们来看看ConnectionBasedClient是怎么同步的。

5.3.3、v2版本校验数据原理——ConnectionBasedClient的验证

// ConnectionBasedClientManager
@Overridepublic boolean verifyClient(String clientId) {ConnectionBasedClient client = clients.get(clientId);// 如果已经存在,则更新最近一次有效连接时间if (null != client) {client.setLastRenewTime();return true;}// 如果不存在则直接返回falsereturn false;}

可以看到,这个上面client不存在则直接返回false,貌似也没有去同步client的数据。这个地方比较隐蔽,我们回过头来看这个rpc调用。

@Overridepublic void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {// 判断是否还存在目标服务器if (isNoExistTarget(targetServer)) {callback.onSuccess();return;}DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);Member member = memberManager.find(targetServer);try {// 创建回调类DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,verifyData.getDistroKey().getResourceKey(), callback, member);// 发送rpc请求clusterRpcClientProxy.asyncRequest(member, request, wrapper);} catch (NacosException nacosException) {callback.onFailed(nacosException);}}

可以看到这个请求是有一个回调类的,可以看看它的实现

// DistroVerifyCallbackWrapper
@Overridepublic void onResponse(Response response) {if (checkResponse(response)) {NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());distroCallback.onSuccess();} else {Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);// 发布验证失败事件NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer));NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());distroCallback.onFailed(null);}}

可以看到,请求失败之后会发布一个验证失败事件,可以看到clientId还是ConnectionBasedClient的id,会被设置带事件之中。下面来看看监听该事件的监听器实现——DistroClientDataProcessor

 @Overridepublic void onEvent(Event event) {if (EnvUtil.getStandaloneMode()) {return;}if (!upgradeJudgement.isUseGrpcFeatures()) {return;}if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}private void syncToVerifyFailedServer(ClientEvent.ClientVerifyFailedEvent event) {// 判断客户端是否存在Client client = clientManager.getClient(event.getClientId());if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}// 如果存在,则开始进行同步DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);// Verify failed data should be sync directly.distroProtocol.syncToTarget(distroKey, DataOperation.ADD, event.getTargetServer(), 0L);}public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),targetServer);DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);// 添加到延时任务执行引擎来执行     distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);}}// v2由默认的处理器执行
@Overridepublic boolean process(NacosTask task) {// 不处理非延迟任务if (!(task instanceof DistroDelayTask)) {return true;}DistroDelayTask distroDelayTask = (DistroDelayTask) task;DistroKey distroKey = distroDelayTask.getDistroKey();// 根据不同的操作类型创建具体的任务switch (distroDelayTask.getAction()) {case DELETE:DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);return true;case CHANGE:case ADD:DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);// 添加任务到立即执行任务引擎        distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);return true;default:return false;}}

上面的代码主要是利用任务执行引擎来异步执行同步client的操作。

接下来我们来看看这个到底是怎么同步的

// AbstractDistroExecuteTask  DistroSyncChangeTask和DistroSyncDeleteTask的抽象类
@Overridepublic void run() {// 获取被处理的数据资源类型String type = getDistroKey().getResourceType();// 根据类型获取数据传输代理DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);if (null == transportAgent) {Loggers.DISTRO.warn("No found transport agent for type [{}]", type);return;}Loggers.DISTRO.info("[DISTRO-START] {}", toString());// 判断代理对象是否支持回调if (transportAgent.supportCallbackTransport()) {doExecuteWithCallback(new DistroExecuteCallback());} else {executeDistroTask();}}// 执行任务@Overrideprotected void doExecuteWithCallback(DistroCallback callback) {String type = getDistroKey().getResourceType();DistroData distroData = getDistroData(type);if (null == distroData) {Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());return;}getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);}@Overridepublic void syncData(DistroData data, String targetServer, DistroCallback callback) {if (isNoExistTarget(targetServer)) {callback.onSuccess();return;}DistroDataRequest request = new DistroDataRequest(data, data.getType());Member member = memberManager.find(targetServer);try {// 发送请求clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));} catch (NacosException nacosException) {callback.onFailed(nacosException);}}

上面代码的主要作用就是构造请求对象,发起请求ADD请求。下面又回到最初的那个处理distro相关rpc请求的类,接收到ADD请求。

@Overridepublic DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {try {switch (request.getDataOperation()) {case VERIFY:return handleVerify(request.getDistroData(), meta);case SNAPSHOT:return handleSnapshot();case ADD:case CHANGE:case DELETE:return handleSyncData(request.getDistroData());case QUERY:return handleQueryData(request.getDistroData());default:return new DistroDataResponse();}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);DistroDataResponse result = new DistroDataResponse();result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("handle distro request with exception");return result;}}

与上次不同,这次走的是ADD,也就是handleSyncData方法,下面我们来看看

// DistroDataRequestHandler类
private DistroDataResponse handleSyncData(DistroData distroData) {DistroDataResponse result = new DistroDataResponse();if (!distroProtocol.onReceive(distroData)) {result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("[DISTRO-FAILED] distro data handle failed");}return result;}public boolean onReceive(DistroData distroData) {Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),distroData.getDistroKey());String resourceType = distroData.getDistroKey().getResourceType();DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);return false;}return dataProcessor.processData(distroData);}public boolean processData(DistroData distroData) {switch (distroData.getType()) {case ADD:case CHANGE:ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);handlerClientSyncData(clientSyncData);return true;case DELETE:String deleteClientId = distroData.getDistroKey().getResourceKey();Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);clientManager.clientDisconnected(deleteClientId);return true;default:return false;}}private void handlerClientSyncData(ClientSyncData clientSyncData) {Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());Client client = clientManager.getClient(clientSyncData.getClientId());
// ConnectionBasedClient会跳过这个,因为instancePublishInfos没有数据。临时客户端需要用到upgradeClient(client, clientSyncData);}

在上面的代码中,ConnectionBasedClient是不会执行upgradeClient方法的逻辑的。因为ClientSyncData对象namespace属性等都是空集合,可以看下面构造ClientSyncData对象逻辑

// AbstractClient 构造ClientSyncData方法
@Overridepublic ClientSyncData generateSyncData() {List<String> namespaces = new LinkedList<>();List<String> groupNames = new LinkedList<>();List<String> serviceNames = new LinkedList<>();List<InstancePublishInfo> instances = new LinkedList<>();for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {namespaces.add(entry.getKey().getNamespace());groupNames.add(entry.getKey().getGroup());serviceNames.add(entry.getKey().getName());instances.add(entry.getValue());}return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);}

下面重点来看一下syncClientConnected,这个方法会根据clientId来选择不同的策略,判断逻辑前面提到过了。下面我们直接进去ConnectionBasedClientManager来查看创建过程

@Overridepublic boolean syncClientConnected(String clientId, ClientAttributes attributes) {String type = attributes.getClientAttribute(ClientConstants.CONNECTION_TYPE);ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);return clientConnected(clientFactory.newSyncedClient(clientId, attributes));}@Overridepublic ConnectionBasedClient newSyncedClient(String clientId, ClientAttributes attributes) {return new ConnectionBasedClient(clientId, false);}@Overridepublic boolean clientConnected(final Client client) {clients.computeIfAbsent(client.getClientId(), s -> {Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());return (ConnectionBasedClient) client;});return true;}

可以看到,创建的时候指定了isNative为false,标记为同步过来的客户端。

在介绍clientManager提到,ConnectionBasedClientManager会有个定时任务来判断同步过来的ConnectionBasedClient是否过期。在distro验证数据v2版本,各个服务端每隔一段时间做数据验证的时候,如果目标服务器属于自己的client存在的话,就做了一个刷新client的lastRenewTime属性处理,这样结合起来看就清楚了。在服务端会管理其它服务端的ConnectionBasedClient是否过期,而校验任务,每个服务端都会发自己本地的clientId,去到其它服务端去刷新最近一次有效连接时间,这样就可以在其它服务保持存活。

5.3.4、v2版本校验数据原理——临时IpPortBasedClient的验证

接下来看看IpPortBasedClient临时实例是怎么验证的。

// 接收到rpc验证请求
@Overridepublic DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {try {switch (request.getDataOperation()) {case VERIFY:return handleVerify(request.getDistroData(), meta);case SNAPSHOT:return handleSnapshot();case ADD:case CHANGE:case DELETE:return handleSyncData(request.getDistroData());case QUERY:return handleQueryData(request.getDistroData());default:return new DistroDataResponse();}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);DistroDataResponse result = new DistroDataResponse();result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("handle distro request with exception");return result;}}// 省略中间一样的步骤。。。。
@Overridepublic boolean verifyClient(String clientId) {IpPortBasedClient client = clients.get(clientId);if (null != client) {NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(clientId, new ClientBeatUpdateTask(client));return true;}return false;}// ClientBeatUpdateTask
@Overridepublic void run() {long currentTime = System.currentTimeMillis();for (InstancePublishInfo each : client.getAllInstancePublishInfo()) {((HealthCheckInstancePublishInfo) each).setLastHeartBeatTime(currentTime);}client.setLastUpdatedTime();}

由上面代码可以看出,每次验证请求会更新该客户端IpPortBasedClient的所有实例的最后心跳时间和client的最后更新时间。可见的是IpPortBasedClient和ConnectionBasedClient几乎是一样的。当没有这个client的时候也会发布ClientEvent.ClientVerifyFailedEvent进行处理

// DistroClientDataProcessor
@Overridepublic void onEvent(Event event) {if (EnvUtil.getStandaloneMode()) {return;}if (!upgradeJudgement.isUseGrpcFeatures()) {return;}if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}// 省略部分代码。。。private void handlerClientSyncData(ClientSyncData clientSyncData) {Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());Client client = clientManager.getClient(clientSyncData.getClientId());// 更新客户端里面的实例信息upgradeClient(client, clientSyncData);}

以上代码和之前的ConnectionBasedClient的验证是一样的,下面看看syncClientConnected方法在EphemeralIpPortClientManager具体实现

// EphemeralIpPortClientManager
@Overridepublic boolean syncClientConnected(String clientId, ClientAttributes attributes) {return clientConnected(clientFactory.newSyncedClient(clientId, attributes));}@Overridepublic IpPortBasedClient newSyncedClient(String clientId, ClientAttributes attributes) {return new IpPortBasedClient(clientId, true);}@Overridepublic boolean clientConnected(final Client client) {clients.computeIfAbsent(client.getClientId(), s -> {Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;// 临时实例开启健康检查和过期检查任务ipPortBasedClient.init();return ipPortBasedClient;});return true;}

心跳检查这块先有个了解就好,在之后的心跳检查篇会详细说明,这里只需要知道临时的IpPortBasedClient客户端创建后会开启健康检查和过期检查任务,但是这里的检查任务只会去检查本服务端自己负责的IpPortBasedClient的实例。

接下来看看upgradeClient方法的具体实现

 private void upgradeClient(Client client, ClientSyncData clientSyncData) {List<String> namespaces = clientSyncData.getNamespaces();List<String> groupNames = clientSyncData.getGroupNames();List<String> serviceNames = clientSyncData.getServiceNames();List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();Set<Service> syncedService = new HashSet<>();for (int i = 0; i < namespaces.size(); i++) {Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));Service singleton = ServiceManager.getInstance().getSingleton(service);syncedService.add(singleton);InstancePublishInfo instancePublishInfo = instances.get(i);// 判断原来实例是否就存在if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {client.addServiceInstance(singleton, instancePublishInfo);NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));}}for (Service each : client.getAllPublishedService()) {if (!syncedService.contains(each)) {client.removeServiceInstance(each);NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));}}}

上面代码主要更新对应IpPortBasedClient的实例信息和更新ServiceManager的数据

到这里在校验的过程中,临时的IpPortBasedClient也同步完成了(包括客户端内的实例也同步了)

5.4、ConnectionBasedClient和临时的IpPortBasedClient删除

在验证任务中,对于client来说只会去新增,那么如果一个节点的client被删除了,是怎么利用distro协议来同步到其它节点的呢?下面带着这个疑问来看看client的删除。

5.3.4、ConnectionBasedClient的删除

下面我们来看看删除ConnectionBasedClient逻辑

// 触发一、ConnectionManager
public synchronized void unregister(String connectionId) {Connection remove = this.connections.remove(connectionId);if (remove != null) {String clientIp = remove.getMetaInfo().clientIp;AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);if (atomicInteger != null) {int count = atomicInteger.decrementAndGet();if (count <= 0) {connectionForClientIp.remove(clientIp);}}remove.close();Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);}}
// 触发二、ConnectionBasedClientManager定时任务
public ConnectionBasedClientManager() {GlobalExecutor.scheduleExpiredClientCleaner(new ExpiredClientCleaner(this), 0, Constants.DEFAULT_HEART_BEAT_INTERVAL,TimeUnit.MILLISECONDS);}
private static class ExpiredClientCleaner implements Runnable {private final ConnectionBasedClientManager clientManager;public ExpiredClientCleaner(ConnectionBasedClientManager clientManager) {this.clientManager = clientManager;}@Overridepublic void run() {// 判断客户端对象是否过期,过期则清除long currentTime = System.currentTimeMillis();for (String each : clientManager.allClientId()) {ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);if (null != client && client.isExpire(currentTime)) {clientManager.clientDisconnected(each);}}}}// ConnectionBasedClientManager
@Overridepublic void clientDisConnected(Connection connect) {clientDisconnected(connect.getMetaInfo().getConnectionId());}@Overridepublic boolean clientDisconnected(String clientId) {Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);ConnectionBasedClient client = clients.remove(clientId);if (null == client) {return true;}client.release();NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));return true;}

自己服务端在rpc断开连接到时候,会执行clientDisConnected方法。还有一种情况就是客户端管理器检测到服某个非本地的客户端过期了(有可能在rpc断开的时候因为网络原因导致没有接收到删除client的请求,断开rpc连接的这个服务端验证的时候并不会把这个删除的clientId给到验证,导致这个在另一个服务端客户端管理器中的client长时间没有刷新活跃时间),clientDisConnected里面会删除自己本地clients数据,并且发布ClientEvent.ClientDisconnectEvent事件。,下面我们来具体看看DistroClientDataProcessor监听器做了什么(与ConnectionBasedClient有关的监听器就这一个)

// DistroClientDataProcessor
@Overridepublic void onEvent(Event event) {if (EnvUtil.getStandaloneMode()) {return;}if (!upgradeJudgement.isUseGrpcFeatures()) {return;}if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}private void syncToAllServer(ClientEvent event) {Client client = event.getClient();// Only ephemeral data sync by Distro, persist client should sync by raft.if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {// 暂时没有什么处理,可以自己定义processor来处理DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}}public void sync(DistroKey distroKey, DataOperation action, long delay) {for (Member each : memberManager.allMembersWithoutSelf()) {syncToTarget(distroKey, action, each.getAddress(), delay);}}// 省略部分代码。。。
// 有到了熟悉的代码了
@Overridepublic DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {try {switch (request.getDataOperation()) {case VERIFY:return handleVerify(request.getDistroData(), meta);case SNAPSHOT:return handleSnapshot();case ADD:case CHANGE:case DELETE:return handleSyncData(request.getDistroData());case QUERY:return handleQueryData(request.getDistroData());default:return new DistroDataResponse();}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);DistroDataResponse result = new DistroDataResponse();result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("handle distro request with exception");return result;}}
// 省略部分代码。。。
// 这回走DELETE
@Overridepublic boolean processData(DistroData distroData) {switch (distroData.getType()) {case ADD:case CHANGE:ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);handlerClientSyncData(clientSyncData);return true;case DELETE:String deleteClientId = distroData.getDistroKey().getResourceKey();Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);clientManager.clientDisconnected(deleteClientId);return true;default:return false;}}@Overridepublic boolean clientDisconnected(String clientId) {Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);ConnectionBasedClient client = clients.remove(clientId);if (null == client) {return true;}client.release();NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));return true;}

由上可见,监听到断开连接事件后,这个服务端会给除了自己以外的所有的服务端发送DELETE操作,其它服务端也会执行clientDisconnected来删除各自的ConnectionBasedClient。至此ConnectionBasedClient的验证就说完了。

5.3.5、临时IpPortBasedClient的删除

// EphemeralIpPortClientManager
@Overridepublic void run() {long currentTime = System.currentTimeMillis();for (String each : clientManager.allClientId()) {IpPortBasedClient client = (IpPortBasedClient) clientManager.getClient(each);if (null != client && isExpireClient(currentTime, client)) {clientManager.clientDisconnected(each);}}}@Overridepublic boolean clientDisconnected(String clientId) {Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);IpPortBasedClient client = clients.remove(clientId);if (null == client) {return true;}NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));client.release();return true;}private boolean isExpireClient(long currentTime, IpPortBasedClient client) {long noUpdatedTime = currentTime - client.getLastUpdatedTime();return client.isEphemeral() && (isExpirePublishedClient(noUpdatedTime, client) && isExpireSubscriberClient(noUpdatedTime, client)|| noUpdatedTime > ClientConfig.getInstance().getClientExpiredTime());}private boolean isExpirePublishedClient(long noUpdatedTime, IpPortBasedClient client) {return client.getAllPublishedService().isEmpty() && noUpdatedTime > Constants.DEFAULT_IP_DELETE_TIMEOUT;}private boolean isExpireSubscriberClient(long noUpdatedTime, IpPortBasedClient client) {return client.getAllSubscribeService().isEmpty() || noUpdatedTime > switchDomain.getDefaultPushCacheMillis();}

和IpPortBasedClient删除逻辑类似,在客户端过期之后会调用clientDisconnected方法并发布ClientEvent.ClientDisconnectEvent。之后会请求其它服务端删除对应的IpPortBasedClient数据,就是多了一些删除客户端索引和原数据一些信息(这个和实例注册等有关)

6、总结

好了,其实到这里,distro的数据新增、删除、更新已经都讲过了。在验证失败的时候,它会去新增客户端数据,这里涉及到distro数据新增。而更新数据其实和新增数据走的是一样的方法。举个例子,详见下图

这里是v2新增实例的一段方法,新增实例之后会发布事件去同步其它服务端数据

给各个服务器发送请求

 最后来到rpc接收端

 

由上图可见,客户端实例如果发生了改变,那么就会发布实例改变的事件,而最终这个走的方法我在上面也就解析过了。所以实例注销也是一样的,想要数据同步,发布对应的事件即可。这也是distro设计思想说的第一点——“Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点”。通俗来说,在某个节点发生数据改变,那么这个节点就发布事件同步其它节点。其中有一个平等的概念,这个的话在DistroFilter类中有体现,感兴趣的小伙伴可以自己去看看,里面重要的方法也都在上面解析过了。

嘿嘿,以上就是distro的大概了,还有一个全量拉取任务就不细说了。

以上都是个人理解,如有错误恳请指正。

Nacos2.0.3源码解析(四)一致性协议-Distro协议实现原理相关推荐

  1. Android8.0 bindService源码解析

    1.AMS的bindService 我们从调用bindService方法开始来看 bindService(intent,serviceConnection, Context.BIND_AUTO_CRE ...

  2. Dubbo源码解析-Dubbo服务消费者_Dubbo协议(一)

    前言: 在介绍完Dubbo 本地模式(Injvm协议)下的服务提供与消费后,上文我们又介绍了Dubbo远程模式(dubbo协议)下的服务暴露过程,本质上就是通过Netty将dubbo协议端口暴露出去, ...

  3. Android7.0 bindService源码解析

    看到标题的时候,有些同学可能会有些质疑:现在都Android12了,你讲Android7,是不是太过时了.这里有两个原因: (1)Android8.0和Android7.0的源码有些不同,但是Andr ...

  4. loraserver 源码解析 (四) lora-gateway-bridge

    lora-gateway-bridge  负责接收 gateway 通过 udp 发送的 packet-forwarder 数据 然后通过 MQTT broker 将报文转发给 LoRa Server ...

  5. Recast源码解析(一):射线实现原理

    最近公司的项目用到了recast做服务端寻路,自己在使用的过程中对其如何实现网格寻路很感兴趣,根据需要研读过部分实现代码,同时也发现网上关于源码分析方面的资料较少,因此这里打算写成一组系列做个总结.本 ...

  6. YYModel V1.0.4源码解析

    YYKit出现了很长时间了,一直想要详细解析一下它的源码,都是各种缘由推迟了. 最近稍微闲了一点,决定先从最简单的YYModel开始吧. 首先,我也先去搜索了一下YYModel相关的文章,解析主要AP ...

  7. EventBus1.0.1源码解析

    很久没有更新过源码解析类文章,以下内容作为源码分析类的笔记.分析方法适用于其它源码分析. 分析工具说明 许久以来,阅读源代码最得力的工具就非Source Insight莫属了.然,后来微软出了一款轻量 ...

  8. 部署测试fabric1.0及源码解析

    开发环境 UBUNTU 16.04 LTS docker docker-compose git go 1.8以上 docker,docker-compose以及go的安装这里不再描述. 部署测试 新建 ...

  9. android9.0 UsbService源码解析

    文章目录 前言 一.服务启动 二.服务创建 三.系统就绪 四.系统启动完毕 前言 USBManager作为一接口类,客户端,当然要有一个服务端来支持工作,这个服务就是UsbService.我这里先从他 ...

  10. Tomcat源码解析四:Tomcat关闭过程

    我们在Tomcat启动过程(Tomcat源代码阅读系列之三)一文中已经知道Tomcat启动以后,会启动6条线程,他们分别如下: [java] view plaincopy "ajp-bio- ...

最新文章

  1. 深耕智能制造和超高清视频领域,思谋科技获数千万美元融资
  2. 文本比较算法Ⅲ——计算文本的相似度
  3. 用友互联网战略的变与不变
  4. 【转】C++ 模板类的声明与实现分离问题
  5. Python之简单验证码实现
  6. unity运行环境_LG电子与Unity合作仿真软件 加速研发更安全的自动驾驶汽车系统...
  7. 2021 ICPC Asia Jinan Regional Contest-J Determinant(取模高斯消元)
  8. 14.软件架构设计:大型网站技术架构与业务架构融合之道 --- 业务架构思维
  9. php工资条发放源码,如何通过邮箱批量单独发放工资条?(非常详细的图文指导,一看就懂)[腾讯企业邮箱]-腾曦网络...
  10. mysql宠物销售系统论文,宠物店管理系统的设计与实现.doc
  11. 防火电缆分类、标准、阻燃等级划分详细说明
  12. kux格式如何无损转换为MP4格式
  13. Mac快捷键(用到什么整理什么)
  14. 搭档之家:14天1.5万人民币!悉尼将征收入境隔离费
  15. 【金融财经】金融市场一周简报(2017-09-01)
  16. 基于RV1126 Video分析-----链接 isp 与mipi csi 的media模块
  17. 为什么 MongoDB 索引选择B-树,而 Mysql 选择B+树(精干总结)
  18. mysql数据导入导出 CSV格式_导出mysql的数据为csv格式的文件
  19. VScode远程调试remote development
  20. 超详细解决方案:在与 SQL Server 建立连接时出现与网络相关的或特定于实例的错误。

热门文章

  1. winform 调用摄像头扫码识别二维码的实现步骤
  2. 浅谈WORD、CHM、PDF和Bizhelp在线帮助文档制作的区别
  3. 计算机词汇背诵,雅思分类词汇背诵记忆:计算机3
  4. 把域名续费到了2023年
  5. gitlab使用个人版v16.11
  6. Real-Time-Voice-Cloning 安装使用笔记
  7. pygame教程实例(二)模拟三体
  8. 我想找个这样的男朋友,要求高吗?
  9. 数据跨领域应用实例—公路数据应用场景(一)
  10. Oracle 11g ORA-12514:TNS:监听程序当前无法识别连接描述符中请求的服务