引言

本文接着撸Distro协议,上文中分析了在Nacos server启动时会进行全量数据同步和数据校验,具体数据即客户端注册节点信息含命名空间、分组名称、服务名称、节点Instance信息等。什么时候会触发增量同步?增量同步都干了些啥,下文接着撸撸增量数据同步。

一、内容提要

增量数据同步

  • 在Nacos节点启动时通过事件驱动模式订阅了ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件

  • 当节点收到ClientChangedEvent事件时,会向集群中其他节点发送更新Client信息请求,其他节点收到后更新缓存

  • 当节点收到ClientVerifyFailedEvent事件时,向该Event指定的目标节点发起新增该Event指定的Client信息请求,目标节点收到后更新到自己缓存中

  • 当节点收到ClientDisconnectEvent事件时,会向集群中其他节点发送删除Client信息请求,其他节点收到后将该Client缓存删除

增量事件触发

  • 当有服务注册或者注销时会触发ClientEvent.ClientChangedEvent事件,即客户端调用naming.registerInstance或者naming.deregisterInstance

  • 定时任务每隔3秒钟定时检查缓存中的所有连接,如果超过保鲜期20秒则再次发起连接请求,连接未成功则注销关闭该连接并发布ClientEvent.ClientDisconnectEvent事件

  • Nacos集群之间通过每5秒发送心跳校验数据请求(具体为本节点负责Client信息),其他节点接受到校验请求,如果缓存中存在该client表示校验成功,同时更新保鲜时间;否则校验失败,回调返回失败Response,请求节点收到失败的Response后会发布ClientVerifyFailedEvent事件

二、增量数据同步

将代码翻到DistroClientDataProcessor类中,该类继承了SmartSubscriber,遵循Subscriber/Notify模式,即事件驱动模式。该模式前面文章中分析过,当有订阅的事件时会进行回调通知。

订阅的事件

DistroClientDataProcessor订阅了ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件。

@Override
public List<Class<? extends Event>> subscribeTypes() {List<Class<? extends Event>> result = new LinkedList<>();result.add(ClientEvent.ClientChangedEvent.class);result.add(ClientEvent.ClientDisconnectEvent.class);result.add(ClientEvent.ClientVerifyFailedEvent.class);return result;
}

当有上述三个事件产生时,DefaultPublisher回调onEvent方法。

public void onEvent(Event event) {if (EnvUtil.getStandaloneMode()) {return;}if (!upgradeJudgement.isUseGrpcFeatures()) {return;}if (event instanceof ClientEvent.ClientVerifyFailedEvent) {// 注解@1syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {// 注解@2syncToAllServer((ClientEvent) event);}
}

注解@1 将ClientVerifyFailedEvent同步给校验失败的节点,操作类型为ADD

注解@2 将同步给集群中的其他节

private void syncToAllServer(ClientEvent event) {Client client = event.getClient();// Only ephemeral data sync by Distro, persist client should sync by raft.if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {// 注解@3DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {// 注解@4DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}
}

注解@3 当客户端断开连接事件ClientDisconnectEvent时,向其他节点同步DELETE操作

注解@4 当客户端变更事件ClientChangedEvent时,向其他节点同步CHANGE操作

接着看下不同操作类型的处理

@Override
public boolean process(NacosTask task) {if (!(task instanceof DistroDelayTask)) {return true;}DistroDelayTask distroDelayTask = (DistroDelayTask) task;DistroKey distroKey = distroDelayTask.getDistroKey();switch (distroDelayTask.getAction()) {case DELETE: // 删除操作DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);return true;case CHANGE:case ADD: // 更新和新增操作DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);return true;default:return false;}
}

向指定的集群节点同步更新数据

@Override
public boolean syncData(DistroData data, String targetServer) {if (isNoExistTarget(targetServer)) {return true;}// 构造请求数据并设置数据类型DistroDataRequest request = new DistroDataRequest(data, data.getType());// 查找目标节点缓存数据Member member = memberManager.find(targetServer);// 节点状态检查需UP状态,即:可通信状态if (checkTargetServerStatusUnhealthy(member)) {Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);return false;}try {// 向目标节点发送数据Response response = clusterRpcClientProxy.sendRequest(member, request);return checkResponse(response);} catch (NacosException e) {Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);}return false;
}

异步更新操作

@Override
public void syncData(DistroData data, String targetServer, DistroCallback callback) {if (isNoExistTarget(targetServer)) {callback.onSuccess();}DistroDataRequest request = new DistroDataRequest(data, data.getType());Member member = memberManager.find(targetServer);try {// 异步更新操作clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));} catch (NacosException nacosException) {callback.onFailed(nacosException);}
}

节点收到这些操作请求如何处理呢?

代码翻到DistroDataRequestHandler#handle(),集群中节点收到请求后处理逻辑在这里:

@Override
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {try {switch (request.getDataOperation()) {case VERIFY:return handleVerify(request.getDistroData(), meta);case SNAPSHOT:return handleSnapshot();case ADD:case CHANGE:case DELETE:return handleSyncData(request.getDistroData());case QUERY:return handleQueryData(request.getDistroData());default:return new DistroDataResponse();}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);DistroDataResponse result = new DistroDataResponse();result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("handle distro request with exception");return result;}
}

可以看出ADD、CHANGE和DELETE均由handleSyncData处理。

private DistroDataResponse handleSyncData(DistroData distroData) {DistroDataResponse result = new DistroDataResponse();if (!distroProtocol.onReceive(distroData)) {result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("[DISTRO-FAILED] distro data handle failed");}return result;
}
@Override
public boolean processData(DistroData distroData) {switch (distroData.getType()) {case ADD:case CHANGE:ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);handlerClientSyncData(clientSyncData); // 注解@5return true;case DELETE:String deleteClientId = distroData.getDistroKey().getResourceKey();Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);clientManager.clientDisconnected(deleteClientId); // 注解@6return true;default:return false;}
}

注解@5 将同步过来的Client信息进行缓存

private void handlerClientSyncData(ClientSyncData clientSyncData) {Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());Client client = clientManager.getClient(clientSyncData.getClientId());// 注解@5.1upgradeClient(client, clientSyncData);
}

需要的是从其他节点通过过来的Client信息,ConnectionBasedClient属性isNative为false表示该连接时从其他节点同步过来的;true表示该连接客户端直接连接的。

public boolean syncClientConnected(String clientId, ClientSyncAttributes attributes) {String type = attributes.getClientAttribute(ClientConstants.CONNECTION_TYPE);ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);return clientConnected(clientFactory.newSyncedClient(clientId, attributes));
}@Override
public ConnectionBasedClient newSyncedClient(String clientId, ClientSyncAttributes attributes) {return new ConnectionBasedClient(clientId, false); // false表示从其他节点同步过来的client
}@Override
public boolean clientConnected(Client client) {Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());if (!clients.containsKey(client.getClientId())) {clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client); // 缓存client}return true;
}

注解@5.1  更新Client的Service以及Instance信息。

private void upgradeClient(Client client, ClientSyncData clientSyncData) {List<String> namespaces = clientSyncData.getNamespaces();List<String> groupNames = clientSyncData.getGroupNames();List<String> serviceNames = clientSyncData.getServiceNames();List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();Set<Service> syncedService = new HashSet<>();for (int i = 0; i < namespaces.size(); i++) {Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));Service singleton = ServiceManager.getInstance().getSingleton(service);syncedService.add(singleton);InstancePublishInfo instancePublishInfo = instances.get(i);if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {client.addServiceInstance(singleton, instancePublishInfo);NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));}}for (Service each : client.getAllPublishedService()) {if (!syncedService.contains(each)) {client.removeServiceInstance(each);NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));}}
}

注解@6 响应删除操作,从clients缓存中移除。

@Override
public boolean clientDisconnected(String clientId) {Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);ConnectionBasedClient client = clients.remove(clientId);if (null == client) {return true;}client.release();NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));return true;
}

小结: 增量同步的逻辑如下:当本节点DistroClientDataProcessor收到ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件时,会向Nacos集群的其他节点同步Client信息;集群中其他节点收到同步信息后更新或者删除本地缓存的Client信息;通过增量同步的Client信息isNative为false表示不是由客户端直连的。

三、增量事件触发

在Nacos server启动时从运行时内存信息可以看出,总共缓存了17个事件类型。当然也包括ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent。

ClientChangedEvent事件触发

当处理服务注册和注销事件时会触发ClientChangeEvent事件,详见InstanceRequestHandler#handle处理逻辑。

public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);switch (request.getType()) {// 注解@7case NamingRemoteConstants.REGISTER_INSTANCE:return registerInstance(service, request, meta);// 注解@8case NamingRemoteConstants.DE_REGISTER_INSTANCE:return deregisterInstance(service, request, meta);default:throw new NacosException(NacosException.INVALID_PARAM,String.format("Unsupported request type %s", request.getType()));}
}

注解@7 处理注册请求,会调用到addServiceInstance方法,该方法中发布了ClientEvent.ClientChangedEvent事件。

public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {if (null == publishers.put(service, instancePublishInfo)) {MetricsMonitor.incrementInstanceCount();}NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());return true;
}

注解@8 处理注销请求,会调用到removeServiceInstance方法,该方法中发布了ClientEvent.ClientChangedEvent事件

public InstancePublishInfo removeServiceInstance(Service service) {InstancePublishInfo result = publishers.remove(service);if (null != result) {MetricsMonitor.decrementInstanceCount();NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));}Loggers.SRV_LOG.info("Client remove for service {}, {}", service, getClientId());return result;
}

小结: 当有服务注册或者注销时会触发ClientEvent.ClientChangedEvent事件。

ClientDisconnectEvent事件触发

下面一段代码通过检测连接是否超过保鲜期,超过保鲜期的会被注销关闭,翻到代码ConnectionManager#start()。

@PostConstruct
public void start() {// 定时任务每3秒执行一次RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {try {// 获取缓存连接int totalCount = connections.size();Loggers.REMOTE_DIGEST.info("Connection check task start");MetricsMonitor.getLongConnectionMonitor().set(totalCount);// 所有连接集合Set<Map.Entry<String, Connection>> entries = connections.entrySet();// 获取通过SDK连接的数量int currentSdkClientCount = currentSdkClientCount();boolean isLoaderClient = loadClient >= 0;int currentMaxClient = isLoaderClient ? loadClient : connectionLimitRule.countLimit;int expelCount = currentMaxClient < 0 ? 0 : Math.max(currentSdkClientCount - currentMaxClient, 0);List<String> expelClient = new LinkedList<>();Map<String, AtomicInteger> expelForIp = new HashMap<>(16);// 1. calculate expel count  of ip.// 加载Connection ConnectionLimitRule// 默认路径为 ${usr.home}/nacos/data/loader/limitRulefor (Map.Entry<String, Connection> entry : entries) {Connection client = entry.getValue();String appName = client.getMetaInfo().getAppName();String clientIp = client.getMetaInfo().getClientIp();if (client.getMetaInfo().isSdkSource() && !expelForIp.containsKey(clientIp)) {//get limit for current ip.// 默认无limit限制int countLimitOfIp = connectionLimitRule.getCountLimitOfIp(clientIp);// 默认无limit限制if (countLimitOfIp < 0) {int countLimitOfApp = connectionLimitRule.getCountLimitOfApp(appName);countLimitOfIp = countLimitOfApp < 0 ? countLimitOfIp : countLimitOfApp;}if (countLimitOfIp < 0) { // 默认无限制countLimitOfIp = connectionLimitRule.getCountLimitPerClientIpDefault();}if (countLimitOfIp >= 0 && connectionForClientIp.containsKey(clientIp)) {AtomicInteger currentCountIp = connectionForClientIp.get(clientIp);if (currentCountIp != null && currentCountIp.get() > countLimitOfIp) {expelForIp.put(clientIp, new AtomicInteger(currentCountIp.get() - countLimitOfIp));}}}}if (expelForIp.size() > 0) { // 默认等于0Loggers.REMOTE_DIGEST.info("Over limit ip expel info,", expelForIp);}Set<String> outDatedConnections = new HashSet<>();long now = System.currentTimeMillis();// 2.get expel connection for ip limit.//for (Map.Entry<String, Connection> entry : entries) {Connection client = entry.getValue();String clientIp = client.getMetaInfo().getClientIp();AtomicInteger integer = expelForIp.get(clientIp);if (integer != null && integer.intValue() > 0) {integer.decrementAndGet();expelClient.add(client.getMetaInfo().getConnectionId());expelCount--;} else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) { // 保鲜时间超过20秒放入outDatedConnections集合outDatedConnections.add(client.getMetaInfo().getConnectionId());}}// 3. if total count is still over limit.// expelCount 默认为0if (expelCount > 0) {for (Map.Entry<String, Connection> entry : entries) {Connection client = entry.getValue();if (!expelForIp.containsKey(client.getMetaInfo().clientIp) && client.getMetaInfo().isSdkSource() && expelCount > 0) {expelClient.add(client.getMetaInfo().getConnectionId());expelCount--;outDatedConnections.remove(client.getMetaInfo().getConnectionId());}}}String serverIp = null;String serverPort = null;if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) {String[] split = redirectAddress.split(Constants.COLON);serverIp = split[0];serverPort = split[1];}for (String expelledClientId : expelClient) { // 默认空集合try {Connection connection = getConnection(expelledClientId);if (connection != null) {ConnectResetRequest connectResetRequest = new ConnectResetRequest();connectResetRequest.setServerIp(serverIp);connectResetRequest.setServerPort(serverPort);connection.asyncRequest(connectResetRequest, null);}} catch (ConnectionAlreadyClosedException e) {unregister(expelledClientId);} catch (Exception e) {Loggers.REMOTE_DIGEST.error("Error occurs when expel connection :", expelledClientId, e);}}//4.client active detection.Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());// 超过保鲜期的链接集合if (CollectionUtils.isNotEmpty(outDatedConnections)) {Set<String> successConnections = new HashSet<>();final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());for (String outDateConnectionId : outDatedConnections) {try {Connection connection = getConnection(outDateConnectionId);if (connection != null) {ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();// 超过保鲜时间的连接,重新异步发起连接connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {@Overridepublic Executor getExecutor() {return null;}@Overridepublic long getTimeout() {return 1000L;}@Overridepublic void onResponse(Response response) {latch.countDown();if (response != null && response.isSuccess()) {// 刷新激活时间connection.freshActiveTime();successConnections.add(outDateConnectionId);}}@Overridepublic void onException(Throwable e) {latch.countDown();}});} else {latch.countDown();}} catch (ConnectionAlreadyClosedException e) {latch.countDown();} catch (Exception e) {// ... latch.countDown();}}latch.await(3000L, TimeUnit.MILLISECONDS);Loggers.REMOTE_DIGEST.info("Out dated connection check successCount={}", successConnections.size());// 无效连接集合for (String outDateConnectionId : outDatedConnections) {if (!successConnections.contains(outDateConnectionId)) {Loggers.REMOTE_DIGEST.info("[{}]Unregister Out dated connection....", outDateConnectionId);// 注销关闭connectionunregister(outDateConnectionId);}}}if (isLoaderClient) {  // 重置loadClient = -1;redirectAddress = null;}} catch (Throwable e) {}}}, 1000L, 3000L, TimeUnit.MILLISECONDS);}
public synchronized void unregister(String connectionId) {Connection remove = this.connections.remove(connectionId);if (remove != null) {String clientIp = remove.getMetaInfo().clientIp;AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);if (atomicInteger != null) {int count = atomicInteger.decrementAndGet();if (count <= 0) {connectionForClientIp.remove(clientIp);}}remove.close();Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); // 异步}
}public void notifyClientDisConnected(final Connection connection) {for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {try {clientConnectionEventListener.clientDisConnected(connection);} catch (Throwable throwable) {Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",clientConnectionEventListener.getName(), throwable);}}}@Override
public boolean clientDisconnected(String clientId) {Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);ConnectionBasedClient client = clients.remove(clientId);if (null == client) {return true;}client.release();NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client)); // 发布ClientDisconnectEvent事件return true;
}

小结: 连接可以配置限制规则具体在${usr.home}/nacos/data/loader/limitRule文件配置,默认无限制;通过定时任务每隔3秒钟定时检查缓存中的所有连接包括通过来源sdk的连接和集群的连接;如果连接超过保鲜期20秒,并再次发起连接请求,未能连接成功则注销关闭该连接;注销关闭时发布ClientEvent.ClientDisconnectEvent事件。

ClientVerifyFailedEvent事件触发

上一篇文章中梳理了Nacos集群中,每个节点会对集群中其他节点每隔5秒发送校验数据,也就是心跳。当校验的结果会进行回调(gRPC为例),我们翻着看看这部分。

public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {if (isNoExistTarget(targetServer)) {callback.onSuccess();}DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);Member member = memberManager.find(targetServer);try {DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,verifyData.getDistroKey().getResourceKey(), callback, member);clusterRpcClientProxy.asyncRequest(member, request, wrapper); // 向其他节点发送本节点负责的cleintId信息} catch (NacosException nacosException) {callback.onFailed(nacosException);}
}

重点看下DistroVerifyCallbackWrapper部分,校验失败发布ClientVerifyFailedEvent事件。

@Override
public void onResponse(Response response) {if (checkResponse(response)) {NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());distroCallback.onSuccess();} else {Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);// 校验失败发布ClientVerifyFailedEvent事件NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer));NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());distroCallback.onFailed(null);}
}

最后看下ClientVerifyFailedEvent这个类,关注下成员变量包含了clientId和targetServer。当收到ClientVerifyFailedEvent时用于向targetServer目标节点添加客户端clientId信息。

public static class ClientVerifyFailedEvent extends ClientEvent {private static final long serialVersionUID = 2023951686223780851L;private final String clientId;private final String targetServer;public ClientVerifyFailedEvent(String clientId, String targetServer) {super(null);this.clientId = clientId;this.targetServer = targetServer;}public String getClientId() {return clientId;}public String getTargetServer() {return targetServer;}
}

小结: Nacos集群之间通过每5秒发送心跳校验数据请求(具体为本节点负责Client信息),其他节点接受到校验请求,如果缓存中存在该client表示校验成功,同时更新保鲜时间;否则校验失败,回调返回失败Response,请求节点收到失败的Response后会发布ClientVerifyFailedEvent事件。

Nacos7# Distro协议增量同步相关推荐

  1. mysql binlog 大数据_后起之秀 | MySQL Binlog增量同步工具go-mysql-transfer实现详解

    一. 概述 工作需要研究了下阿里开源的MySQL Binlog增量订阅消费组件canal,其功能强大.运行稳定,但是有些方面不是太符合需求,主要有如下三点: 需要自己编写客户端来消费canal解析到的 ...

  2. MySQL Binlog增量同步工具go-mysql-transfer实现详解

    go-mysql-transfer产品手册:https://www.kancloud.cn/wj596/go-mysql-transfer/2111996 一. 概述 工作需要研究了下阿里开源的MyS ...

  3. Elasticsearch 的全量同步和增量同步

    (1)全量同步 什么是全量同步:将一个mysql的整个表的所有数据都同步到es中 常用插件是logstash-input-jdbc,logstash通过sql语句分区间对数据进行查询,然后输出到es进 ...

  4. DataX以及增量同步

    目录 第一部分:DataX 一.DataX基本知识 二.DataX应用 三.DataX安装测试 四.DataX性能调优 五.DataX源码编译问题整理 六.datax增量同步 第二部分:增量同步 一. ...

  5. 数据实时增量同步之CDC工具—Canal、mysql_stream、go-mysql-transfer、Maxwell

    数据实时增量同步之CDC工具-Canal.mysql_stream.go-mysql-transfer.Maxwell 什么是CDC? CDC工具对比 实现原理: Mysql binlog 讲解: m ...

  6. Redis持久化机制 -全量同步与增量同步的区别

    全量同步与增量同步的区别 全量同步:就是每天定时(避开高峰期)或者采用一个周期实现将数据拷贝到一个地方也就是Rdb存储. 增量同步:比如采用对行为的操作实现对数据的同步,也就是AOF. 全量与增量的比 ...

  7. mysql增量同步kafka_MySQL数据实时增量同步到Kafka - Flume

    写在前面的话 需求,将MySQL里的数据实时增量同步到Kafka.接到活儿的时候,第一个想法就是通过读取MySQL的binlog日志,将数据写到Kafka.不过对比了一些工具,例如:Canel,Dat ...

  8. Kafka实现MySQL增量同步

    目标 本文是对[1]的复现和整理 环境 组件 版本 Zookeeper 3.6.0 Kafka 2.5.0 Mysql 8.0.21-0ubuntu0.20.04.4 准备工作 分别新建两个数据库A和 ...

  9. flink实时同步mysql_基于Canal与Flink实现数据实时增量同步(一)

    点击上方蓝色字体,关注我 canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB). 准备 配置 ...

  10. 如何实现文件增量同步——算法

    问题: 如何增量同步文件,例如一个文本文件有10M,分别存放在A,B两个地方,现在两个文件是完全一样的,但是我马上要在A上对这个文件进行修改,B如何实现自动和A上的文件保持一致,并且网络的传输量最少. ...

最新文章

  1. JSX设置CSS样式详解
  2. 加载多个js onload事件
  3. 转:VirtualBox虚拟机网络连接设置的四种方式
  4. 澳洲 计算机 本科学费,澳大利亚墨尔本大学一年学费和生活费清单
  5. MNIST 手写数字识别,我是如何做到886个可训练参数,识别率达到98.2%? (参数、模型压缩), Keras实现,模型优化
  6. 深度学习(六十六)生成模型、最大化似然、KL散度
  7. 培训协议与服务器有没有要求,上面说的是提供专业培训且规定了服务器和培训费以及违约金的问题。试用期辞职单位有没有权利追究违约金呢?- 法律快车法律咨询...
  8. 获得密钥_《哪吒》公映密钥延期一个月?关于“密钥延期”的全揭秘来了
  9. ASP 文件下载实例
  10. android 记事本上功能,安卓手机上有多功能记事本app吗?
  11. Qt开发技术:图形视图框架(二)场景QGraphicsScene、QGraphicsItem与QGraphicsView详解
  12. Java项目:自习室图书馆座位预约管理系统(java+SSM+JSP+easyUI+mysql)
  13. Shell一句话根据进程名杀死进程
  14. chrome最新版去掉书签栏上应用和还原默认页的方法
  15. Redis数据结构:快速的Redis有哪些慢操作?
  16. 植物大战僵尸java版_植物大战僵尸:M8版 JAVA版下载
  17. mysql导入数据库报366_管家婆普及版sql数据库如何导入366++版数据库
  18. 时间序列预测必读的20篇论文!
  19. html5添加flash动画效果,霸气的HTML5 7款无Flash HTML5动画特效
  20. 不动产测绘数据入库_房产基础地理信息数据生产管理与入库更新一体化

热门文章

  1. 计算机硕士两篇sci找工作,3篇SCI才能毕业?C9研究生:“我太难了!”
  2. JAVA练级之路(二)--- JAVA入门
  3. 基于Matlab的泊松亮斑仿真及其应用
  4. php mysql商品管理_PHP.21-商品信息管理
  5. cesuim如何添加近景天空盒
  6. AnolisOS 入门一: 安装
  7. 如何才能把平时浪费的时间改造为有意义的时间?
  8. Java学习总结IIII
  9. win10可用运行内存与实际不符处理办法(64位)
  10. PHP的线程安全ZTS与非线程(NTS)安…