转载地址:https://zhuanlan.zhihu.com/p/220172155

背景

大约在 2018 年 8 月份开始正式接触 ClickHouse,当时机房没有合适的服务器,就在 Azure 开了一台虚拟机来部署。平稳运行了两年,支撑了 YiDrone 和 YiSonar 两个重要的产品的底层数据存储和查询。前段时间采购服务器的时候预留了一些资源,加上 Azure 的免费订阅即将到期,于是准备把 ClickHouse 迁回到机房。数据量不大,只有一个节点,硬盘上的数据加起来 500G 左右。

方案调研

迁移集群实际上就是要把所有数据库(system 除外)的表结构和数据完整的复制一遍。ClickHouse 官方和社区有一些现成的解决方案,也可以自己实现。

拷贝数据目录

先观察一下 ClickHouse 在文件系统上的目录结构(配置文件 /ect/clickhouse-server/config.xml 里面配置的 <path>),为了便于查看,只保留了 data 和 metadata 目录。

.
├── data
│   ├── default
│   ├── system
│   │   ├── asynchronous_metric_log
│   │   ├── metric_log
│   │   ├── query_log
│   │   ├── query_thread_log
│   │   └── trace_log
├── metadata
│   ├── default
│   │   └── v_table_size.sql
│   ├── default.sql
│   ├── system
│   │   ├── asynchronous_metric_log.sql
│   │   ├── metric_log.sql
│   │   ├── query_log.sql
│   │   ├── query_thread_log.sql
│   │   └── trace_log.sql
  • data 目录里保存的是数据,每个数据库一个目录,内部每个表一个子目录。
  • metadata 目录里保存的是元数据,即数据库和表结构。其中
    • <database>.sql 是 创建数据库的 DDL(ATTACH DATABASE default ENGINE = Ordinary
    • <database>/<table>.sql 是建表的 DDL (ATTACH TABLE ...).

这里的 DDL 使用的是 ATTACH 语句,进入文档 查看 ATTACH 的作用及跟 CREATE 的区别

基于这个信息,直接把 data 和 metadata 目录(要排除 system)复制到新集群,即可实现数据迁移。用一个小表做测试,验证可行。

操作流程

  1. 在源集群的硬盘上打包好对应数据库或表的 data 和 metadata 数据
  2. 拷贝到目标集群对应的目录
  3. 重启 clickhouse-server

使用 remote 表函数

ClickHouse 除了查询常规的表,还能使用表函数来构建一些特殊的「表」,其中 remote 函数 可用于查询另一个 ClickHouse 的表。

使用方式很简单:

SELECT * FROM remote('addresses_expr', db, table, 'user', 'password') LIMIT 10;

因此,可以借助这个功能实现数据迁移:

INSERT INTO <local_database>.<local_table>
SELECT * FROM remote('remote_clickhouse_addr', <remote_database>, <remote_table>, '<remote_user>', '<remote_password>')

操作流程

  1. 在源集群的 system.tables 表查询出数据库、表、DDL、分区、表引擎等信息
  2. 在目标集群上,运行 DDL 创建表,然后运行上述迁移语句复制数据
  3. 遍历所有表,执行 2

使用 clickhouse-copier

Clickhouse-copier 是 ClickHouse 官方提供的一款数据迁移工具,可用于把表从一个集群迁移到另一个(也可以是同一个)集群。Clickhouse-copier 使用 Zookeeper 来管理同步任务,可以同时运行多个 clickhouse-copier 实例。

使用方式:

clickhouse-copier --daemon --config zookeeper.xml --task-path /task/path --base-dir /path/to/dir

其中 --config zookeeper.xml 是 Zookeeper 的连接信息,--task-path /task/path 是 Zookeeper 里任务配置的节点路径。在使用时,需要先定义一个 XML 格式的任务配置文件,上传到 /task/path/description 里。同步任务是表级别的,可以配置的内容还比较多。Clickhouse-copier 可以监听 /task/path/description 的变化,动态加载新的配置而不需要重启。

操作流程

  1. 创建 zookeeper.xml
  2. 创建任务配置文件,格式见官方文档,每个表都要配置(可使用代码自动生成)
  3. 把配置文件内容上传到 Zookeeper
  4. 启动 clickhouse-copier 进程

理论上 clickhouse-copier 运行在源集群或目标集群的环境都可以,官方文档推进在源集群,这样可以节省带宽。

使用 clickhouse-backup

clickhouse-backup 是社区开源的一个 ClickHouse 备份工具,可用于实现数据迁移。其原理是先创建一个备份,然后从备份导入数据,类似 MySQL 的 mysqldump + SOURCE。这个工具可以作为常规的异地冷备方案,不过有个局限是只支持 MergeTree 系列的表。

操作流程

  1. 在源集群使用 clickhouse-backup create 创建备份
  2. 把备份文件压缩拷贝到目标集群
  3. 在目标集群使用 clickhouse-backup restore 恢复

对比

从官方和社区的一些资料综合来看 clickhouse-copier 功能最强大,不过考虑到数据量较少,而且对 clickhouse-copier 有些地方也不是很清楚,最终决定使用 remote 函数来做数据迁移。

关于别的数据迁移方案、更多的 clickhouse-copier 使用案例,可参考 Altinity 的博客 Clickhouse-copier in practice.

使用 remote 函数做数据迁移

使用 remote 函数还能实现更多特性:

  • 对于分区表,可逐个分区进行同步,这样实际上同步的最小单位是分区,可以实现增量同步
  • 可方便集成数据完整性(行数对比)检查,自动重新同步更新过的表

代码

代码如下,需要先安装 clickhouse-driver

import collections
import datetime
import functools
import logging
import timefrom clickhouse_driver import Clientsource_conn = Client(host='source-host', user='user', password='password')
target_conn = Client(host='target-host', user='user', password='password')def format_partition_expr(p):if isinstance(p, int):return preturn f"'{p}'"def execute_queries(conn, queries):if isinstance(queries, str):queries = queries.split(';')for q in queries:conn.execute(q.strip())class Table(object):def __init__(self, database, name, ddl, partition_key, is_view):self.database = databaseself.name = nameself.ddl = ddl.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')self.partition_key = partition_keyself.is_view = is_viewdef exists(self, conn):q = f"SELECT name FROM system.tables WHERE database = '{self.database}' AND name = '{self.name}'"return len(conn.execute(q)) > 0def get_partitions(self, conn):partitions = []q = f'SELECT {self.partition_key}, count() FROM {self.identity} GROUP BY {self.partition_key} ORDER BY {self.partition_key}'partitions = collections.OrderedDict(conn.execute(q))return partitionsdef get_total_count(self, conn):q = f'SELECT COUNT() FROM {self.identity}'return conn.execute(q)[0][0]def check_consistency(self):if not self.exists(target_conn):return False, Nonesource_ttl_count = self.get_total_count(source_conn)target_ttl_count = self.get_total_count(target_conn)if source_ttl_count == target_ttl_count:return True, Noneif not self.partition_key:return False, Nonesource_partitions = self.get_partitions(source_conn)target_partitions = self.get_partitions(target_conn)bug_partitions = []for p, c in source_partitions.items():if p not in target_partitions or c != target_partitions[p]:bug_partitions.append(p)return False, bug_partitionsdef create(self, replace=False):target_conn.execute(f'CREATE DATABASE IF NOT EXISTS {self.database}')if self.is_view:replace = Trueif replace:target_conn.execute(f'DROP TABLE IF EXISTS {self.identity}')target_conn.execute(self.ddl)def copy_data_from_remote(self, by_partition=True):self.create()if self.is_view:logging.info('ignore view %s', self.identity)returnis_identical, bug_partitions = self.check_consistency()if is_identical:logging.info('table %s has the same number of rows, skip', self.identity)returnif self.partition_key and by_partition:for p in bug_partitions:logging.info('copy partition %s=%s', self.partition_key, p)self._copy_partition_from_remote(p)else:self._copy_table_from_remote()def _copy_table_from_remote(self):queries = f'''DROP TABLE {self.identity};{self.ddl};INSERT INTO {self.identity}SELECT * FROM remote('{source_conn.host}', {self.identity}, '{source_conn.user}', '{source_conn.password}')'''execute_queries(target_conn, queries)def _copy_partition_from_remote(self, partition):partition = format_partition_expr(partition)queries = f'''ALTER TABLE {self.identity} DROP PARTITION {partition};INSERT INTO {self.identity}SELECT * FROM remote('{source_conn.host}', {self.identity}, '{source_conn.user}', '{source_conn.password}')WHERE {self.partition_key} = {partition}'''execute_queries(target_conn, queries)@propertydef identity(self):return f'{self.database}.{self.name}'def __str__(self):return self.identity__repr__ = __str__def get_all_tables() -> [Table]:# 查询出所有用户的数据库和表,包括视图。视图依赖其他表,所以放到最后。q = '''SELECT database, name, create_table_query, partition_key, engine = 'View' AS is_viewFROM system.tablesWHERE database NOT IN ('system')ORDER BY if(engine = 'View', 999, 0), database, name'''rows = source_conn.execute(q)tables = [Table(*values) for values in rows]return tablesdef copy_remote_tables(tables):for idx, t in enumerate(tables):start_time = datetime.datetime.now()logging.info('>>>> start to migrate table %s, progress %s/%s', t.identity, idx+1, len(tables))t.copy_data_from_remote()logging.info('<<<< migrated table %s in %s', t.identity, datetime.datetime.now() - start_time)def with_retry(max_attempts=5, backoff=120):def decorator(f):@functools.wraps(f)def inner(*args, **kwargs):attempts = 0while True:attempts += 1logging.info('start attempt #%s', attempts)try:f(*args, **kwargs)except Exception as e:if attempts >= max_attempts:raise elogging.exception('caught exception')time.sleep(backoff)else:breakreturn innerreturn decorator@with_retry(max_attempts=10, backoff=60)
def main():tables = get_all_tables()logging.info('got %d tables: %s', len(tables), tables)copy_remote_tables(tables)if __name__ == '__main__':main()

使用方式:直接运行即可,挂了重跑,不会有副作用。

局限性

仅通过对比行数来判断数据同步完整,没有比较内部数据的一致性,因此如果上游表行数不变,更新了部分字段,将无法自动识别,需要先从目标库里把表删掉重新同步。

【转】记一次 ClickHouse 数据迁移相关推荐

  1. 银河麒麟V10高级服务器操作系统clickhouse数据迁移技术全网唯一

    银河麒麟V10操作系统clickhouse数据迁移技术 前言 笔者自述:笔者本人为中国软件行业国产化进程中的一份子,本人也是非常支持华为手机以及鸿蒙操作系统.本文介绍在本人在实际国产化切换过程中的一些 ...

  2. 记一次大规模数据迁移和加密

    公司的核心业务合作伙伴淘宝网,最近出现泄漏用户信息的现象,找了好久找不到根源,于是乎,淘宝那边决定对所有敏感数据进行加密,从出口和入口都走密文,于是乎,我们的工作量就来了. 我们的一个底单数据库,存储 ...

  3. 记nexus2升级nexus3数据迁移

    版本 nexus-2.14 nexus-3.31 数据迁移 方法一: 1.打开nexus2访问页面,创建upgrade(如果存在,直接进行下一步) 2.存在upgrade点击在下面的status找到 ...

  4. 记一次Hbase数据迁移和遇到的问题

    因为集群不互通,所以采用手动方式迁移 1.下载目标集群数据 hadoop fs -get /apps/hbase/data/data/default/*c4be21d3000064c0 /mnt/da ...

  5. 记一次MySQL数据迁移到SQLServer全过程

  6. 记一次在K8s集群搭建的MySQL主从无法正常启动之数据迁移恢复实践

    本章目录:记一次在K8s集群搭建的MySQL主从无法正常启动之数据迁移恢复实践 描述:在K8s集群中里利用bitnami提供的mysql:5.7.32-debian-10-r61镜像并利用helm进行 ...

  7. 【ClickHouse系列】clickhouse-copier是如何进行数据迁移的

    clickhouse-copier是官方的数据迁移工具,用于多个集群之间的数据迁移. 详细的配置可以参照官方文档:https://clickhouse.tech/docs/en/operations/ ...

  8. 一部分 数据 迁移_软件测试员12小时惊魂记:数据库迁移出大事故,如何测试?...

    信息时代,随着用户数量不断增加,业务量不断增长,企业原有数据库不足以有效支撑业务的发展,在此情况下,企业更多的是寻求一款更加稳定的数据库进行替代. 本文以Sybase数据库和Oracle数据库为例.O ...

  9. 记一次业务系统拆分的数据迁移及系统切换事项

    一.迁移背景 老系统使用商业化软件,同时包含模块较多,架构无法支撑,维护成本高等考虑,需要根据业务模块拆分多个系统,新系统支持水平扩缩容 ,rcp框架等,新系统基本上包含常用的技术栈(wildfly. ...

最新文章

  1. 树莓派开发4-串口通讯wiringpi库
  2. 在一个.net sln中包含多个project,project引用同一个dll导致的错误
  3. ifix如何设画面大小_ifix5.1环境下的ifix服务器、客户端配置
  4. 比尔盖茨,马斯克、霍金都告诉你:为什么要警惕人工智能(中)
  5. 树莓派分辨率调整(含官方默认和kali系统)
  6. Struts2技术详解
  7. man-翻译和epoll相关的内容,部分
  8. 俄罗斯方块 Tetris
  9. PAT甲题题解-1091. Acute Stroke (30)-BFS
  10. java 中JFinal getModel方法和数据库使用出现问题解决办法
  11. POJ-2480 Longge's problem 欧拉函数
  12. java做网页客户端_如何成为 Java web开发者
  13. 32位/64位WIN2003各版本支持内存列表 彻底解决XP系统无法支持4G内存的问题
  14. Linux查看vga分辨率,调整ubuntu启动界面vga分辨率的方法
  15. 2017 年,最热开源静态网站生成器 TOP 20 揭晓!
  16. 华为畅享max什么时候鸿蒙,华为鸿蒙系统正式推送 具体怎么样及支持哪些手机带你前先看...
  17. 看完牛客网19年测试全部面筋,有了这篇测试面试100问的博客
  18. HTML奥运五环的实现
  19. php生成二维码海报
  20. k8s踩坑记录——证书一年有效期

热门文章

  1. linux mdadm 创建raid,Linux下用mdadm命令创建软raid5
  2. 简述php无限极分类,PHP 无限极分类
  3. Ubuntu 开放端口
  4. android怎么变iphone,EasyTouch: 让你的Android秒变“iPhone”
  5. Apache Kafka教程A系列:消费者群体示例
  6. Linux系统I/O模型和网络I/O模型
  7. 喜马拉雅小雅Nano首发,仅需1元即可预约限量抢购
  8. 如何理解计算机技术对艺术设计的意义,艺术设计的意义是什么?
  9. 为什么开发人员工作10多年了还会迷茫?
  10. 二叉搜索树的后序遍历序列(Java)