【转】记一次 ClickHouse 数据迁移
转载地址:https://zhuanlan.zhihu.com/p/220172155
背景
大约在 2018 年 8 月份开始正式接触 ClickHouse,当时机房没有合适的服务器,就在 Azure 开了一台虚拟机来部署。平稳运行了两年,支撑了 YiDrone 和 YiSonar 两个重要的产品的底层数据存储和查询。前段时间采购服务器的时候预留了一些资源,加上 Azure 的免费订阅即将到期,于是准备把 ClickHouse 迁回到机房。数据量不大,只有一个节点,硬盘上的数据加起来 500G 左右。
方案调研
迁移集群实际上就是要把所有数据库(system 除外)的表结构和数据完整的复制一遍。ClickHouse 官方和社区有一些现成的解决方案,也可以自己实现。
拷贝数据目录
先观察一下 ClickHouse 在文件系统上的目录结构(配置文件 /ect/clickhouse-server/config.xml
里面配置的 <path>
),为了便于查看,只保留了 data
和 metadata
目录。
.
├── data
│ ├── default
│ ├── system
│ │ ├── asynchronous_metric_log
│ │ ├── metric_log
│ │ ├── query_log
│ │ ├── query_thread_log
│ │ └── trace_log
├── metadata
│ ├── default
│ │ └── v_table_size.sql
│ ├── default.sql
│ ├── system
│ │ ├── asynchronous_metric_log.sql
│ │ ├── metric_log.sql
│ │ ├── query_log.sql
│ │ ├── query_thread_log.sql
│ │ └── trace_log.sql
data
目录里保存的是数据,每个数据库一个目录,内部每个表一个子目录。metadata
目录里保存的是元数据,即数据库和表结构。其中<database>.sql
是 创建数据库的 DDL(ATTACH DATABASE default ENGINE = Ordinary
)<database>/<table>.sql
是建表的 DDL (ATTACH TABLE ...
).
这里的 DDL 使用的是
ATTACH
语句,进入文档 查看 ATTACH 的作用及跟 CREATE 的区别
基于这个信息,直接把 data
和 metadata
目录(要排除 system)复制到新集群,即可实现数据迁移。用一个小表做测试,验证可行。
操作流程
- 在源集群的硬盘上打包好对应数据库或表的 data 和 metadata 数据
- 拷贝到目标集群对应的目录
- 重启 clickhouse-server
使用 remote
表函数
ClickHouse 除了查询常规的表,还能使用表函数来构建一些特殊的「表」,其中 remote 函数 可用于查询另一个 ClickHouse 的表。
使用方式很简单:
SELECT * FROM remote('addresses_expr', db, table, 'user', 'password') LIMIT 10;
因此,可以借助这个功能实现数据迁移:
INSERT INTO <local_database>.<local_table>
SELECT * FROM remote('remote_clickhouse_addr', <remote_database>, <remote_table>, '<remote_user>', '<remote_password>')
操作流程
- 在源集群的
system.tables
表查询出数据库、表、DDL、分区、表引擎等信息 - 在目标集群上,运行 DDL 创建表,然后运行上述迁移语句复制数据
- 遍历所有表,执行 2
使用 clickhouse-copier
Clickhouse-copier 是 ClickHouse 官方提供的一款数据迁移工具,可用于把表从一个集群迁移到另一个(也可以是同一个)集群。Clickhouse-copier 使用 Zookeeper 来管理同步任务,可以同时运行多个 clickhouse-copier 实例。
使用方式:
clickhouse-copier --daemon --config zookeeper.xml --task-path /task/path --base-dir /path/to/dir
其中 --config zookeeper.xml
是 Zookeeper 的连接信息,--task-path /task/path
是 Zookeeper 里任务配置的节点路径。在使用时,需要先定义一个 XML 格式的任务配置文件,上传到 /task/path/description
里。同步任务是表级别的,可以配置的内容还比较多。Clickhouse-copier 可以监听 /task/path/description
的变化,动态加载新的配置而不需要重启。
操作流程
- 创建
zookeeper.xml
- 创建任务配置文件,格式见官方文档,每个表都要配置(可使用代码自动生成)
- 把配置文件内容上传到 Zookeeper
- 启动 clickhouse-copier 进程
理论上 clickhouse-copier 运行在源集群或目标集群的环境都可以,官方文档推进在源集群,这样可以节省带宽。
使用 clickhouse-backup
clickhouse-backup 是社区开源的一个 ClickHouse 备份工具,可用于实现数据迁移。其原理是先创建一个备份,然后从备份导入数据,类似 MySQL 的 mysqldump + SOURCE。这个工具可以作为常规的异地冷备方案,不过有个局限是只支持 MergeTree 系列的表。
操作流程
- 在源集群使用
clickhouse-backup create
创建备份 - 把备份文件压缩拷贝到目标集群
- 在目标集群使用
clickhouse-backup restore
恢复
对比
从官方和社区的一些资料综合来看 clickhouse-copier 功能最强大,不过考虑到数据量较少,而且对 clickhouse-copier 有些地方也不是很清楚,最终决定使用 remote
函数来做数据迁移。
关于别的数据迁移方案、更多的 clickhouse-copier 使用案例,可参考 Altinity 的博客 Clickhouse-copier in practice.
使用 remote
函数做数据迁移
使用 remote
函数还能实现更多特性:
- 对于分区表,可逐个分区进行同步,这样实际上同步的最小单位是分区,可以实现增量同步
- 可方便集成数据完整性(行数对比)检查,自动重新同步更新过的表
代码
代码如下,需要先安装 clickhouse-driver
import collections
import datetime
import functools
import logging
import timefrom clickhouse_driver import Clientsource_conn = Client(host='source-host', user='user', password='password')
target_conn = Client(host='target-host', user='user', password='password')def format_partition_expr(p):if isinstance(p, int):return preturn f"'{p}'"def execute_queries(conn, queries):if isinstance(queries, str):queries = queries.split(';')for q in queries:conn.execute(q.strip())class Table(object):def __init__(self, database, name, ddl, partition_key, is_view):self.database = databaseself.name = nameself.ddl = ddl.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')self.partition_key = partition_keyself.is_view = is_viewdef exists(self, conn):q = f"SELECT name FROM system.tables WHERE database = '{self.database}' AND name = '{self.name}'"return len(conn.execute(q)) > 0def get_partitions(self, conn):partitions = []q = f'SELECT {self.partition_key}, count() FROM {self.identity} GROUP BY {self.partition_key} ORDER BY {self.partition_key}'partitions = collections.OrderedDict(conn.execute(q))return partitionsdef get_total_count(self, conn):q = f'SELECT COUNT() FROM {self.identity}'return conn.execute(q)[0][0]def check_consistency(self):if not self.exists(target_conn):return False, Nonesource_ttl_count = self.get_total_count(source_conn)target_ttl_count = self.get_total_count(target_conn)if source_ttl_count == target_ttl_count:return True, Noneif not self.partition_key:return False, Nonesource_partitions = self.get_partitions(source_conn)target_partitions = self.get_partitions(target_conn)bug_partitions = []for p, c in source_partitions.items():if p not in target_partitions or c != target_partitions[p]:bug_partitions.append(p)return False, bug_partitionsdef create(self, replace=False):target_conn.execute(f'CREATE DATABASE IF NOT EXISTS {self.database}')if self.is_view:replace = Trueif replace:target_conn.execute(f'DROP TABLE IF EXISTS {self.identity}')target_conn.execute(self.ddl)def copy_data_from_remote(self, by_partition=True):self.create()if self.is_view:logging.info('ignore view %s', self.identity)returnis_identical, bug_partitions = self.check_consistency()if is_identical:logging.info('table %s has the same number of rows, skip', self.identity)returnif self.partition_key and by_partition:for p in bug_partitions:logging.info('copy partition %s=%s', self.partition_key, p)self._copy_partition_from_remote(p)else:self._copy_table_from_remote()def _copy_table_from_remote(self):queries = f'''DROP TABLE {self.identity};{self.ddl};INSERT INTO {self.identity}SELECT * FROM remote('{source_conn.host}', {self.identity}, '{source_conn.user}', '{source_conn.password}')'''execute_queries(target_conn, queries)def _copy_partition_from_remote(self, partition):partition = format_partition_expr(partition)queries = f'''ALTER TABLE {self.identity} DROP PARTITION {partition};INSERT INTO {self.identity}SELECT * FROM remote('{source_conn.host}', {self.identity}, '{source_conn.user}', '{source_conn.password}')WHERE {self.partition_key} = {partition}'''execute_queries(target_conn, queries)@propertydef identity(self):return f'{self.database}.{self.name}'def __str__(self):return self.identity__repr__ = __str__def get_all_tables() -> [Table]:# 查询出所有用户的数据库和表,包括视图。视图依赖其他表,所以放到最后。q = '''SELECT database, name, create_table_query, partition_key, engine = 'View' AS is_viewFROM system.tablesWHERE database NOT IN ('system')ORDER BY if(engine = 'View', 999, 0), database, name'''rows = source_conn.execute(q)tables = [Table(*values) for values in rows]return tablesdef copy_remote_tables(tables):for idx, t in enumerate(tables):start_time = datetime.datetime.now()logging.info('>>>> start to migrate table %s, progress %s/%s', t.identity, idx+1, len(tables))t.copy_data_from_remote()logging.info('<<<< migrated table %s in %s', t.identity, datetime.datetime.now() - start_time)def with_retry(max_attempts=5, backoff=120):def decorator(f):@functools.wraps(f)def inner(*args, **kwargs):attempts = 0while True:attempts += 1logging.info('start attempt #%s', attempts)try:f(*args, **kwargs)except Exception as e:if attempts >= max_attempts:raise elogging.exception('caught exception')time.sleep(backoff)else:breakreturn innerreturn decorator@with_retry(max_attempts=10, backoff=60)
def main():tables = get_all_tables()logging.info('got %d tables: %s', len(tables), tables)copy_remote_tables(tables)if __name__ == '__main__':main()
使用方式:直接运行即可,挂了重跑,不会有副作用。
局限性
仅通过对比行数来判断数据同步完整,没有比较内部数据的一致性,因此如果上游表行数不变,更新了部分字段,将无法自动识别,需要先从目标库里把表删掉重新同步。
【转】记一次 ClickHouse 数据迁移相关推荐
- 银河麒麟V10高级服务器操作系统clickhouse数据迁移技术全网唯一
银河麒麟V10操作系统clickhouse数据迁移技术 前言 笔者自述:笔者本人为中国软件行业国产化进程中的一份子,本人也是非常支持华为手机以及鸿蒙操作系统.本文介绍在本人在实际国产化切换过程中的一些 ...
- 记一次大规模数据迁移和加密
公司的核心业务合作伙伴淘宝网,最近出现泄漏用户信息的现象,找了好久找不到根源,于是乎,淘宝那边决定对所有敏感数据进行加密,从出口和入口都走密文,于是乎,我们的工作量就来了. 我们的一个底单数据库,存储 ...
- 记nexus2升级nexus3数据迁移
版本 nexus-2.14 nexus-3.31 数据迁移 方法一: 1.打开nexus2访问页面,创建upgrade(如果存在,直接进行下一步) 2.存在upgrade点击在下面的status找到 ...
- 记一次Hbase数据迁移和遇到的问题
因为集群不互通,所以采用手动方式迁移 1.下载目标集群数据 hadoop fs -get /apps/hbase/data/data/default/*c4be21d3000064c0 /mnt/da ...
- 记一次MySQL数据迁移到SQLServer全过程
- 记一次在K8s集群搭建的MySQL主从无法正常启动之数据迁移恢复实践
本章目录:记一次在K8s集群搭建的MySQL主从无法正常启动之数据迁移恢复实践 描述:在K8s集群中里利用bitnami提供的mysql:5.7.32-debian-10-r61镜像并利用helm进行 ...
- 【ClickHouse系列】clickhouse-copier是如何进行数据迁移的
clickhouse-copier是官方的数据迁移工具,用于多个集群之间的数据迁移. 详细的配置可以参照官方文档:https://clickhouse.tech/docs/en/operations/ ...
- 一部分 数据 迁移_软件测试员12小时惊魂记:数据库迁移出大事故,如何测试?...
信息时代,随着用户数量不断增加,业务量不断增长,企业原有数据库不足以有效支撑业务的发展,在此情况下,企业更多的是寻求一款更加稳定的数据库进行替代. 本文以Sybase数据库和Oracle数据库为例.O ...
- 记一次业务系统拆分的数据迁移及系统切换事项
一.迁移背景 老系统使用商业化软件,同时包含模块较多,架构无法支撑,维护成本高等考虑,需要根据业务模块拆分多个系统,新系统支持水平扩缩容 ,rcp框架等,新系统基本上包含常用的技术栈(wildfly. ...
最新文章
- 树莓派开发4-串口通讯wiringpi库
- 在一个.net sln中包含多个project,project引用同一个dll导致的错误
- ifix如何设画面大小_ifix5.1环境下的ifix服务器、客户端配置
- 比尔盖茨,马斯克、霍金都告诉你:为什么要警惕人工智能(中)
- 树莓派分辨率调整(含官方默认和kali系统)
- Struts2技术详解
- man-翻译和epoll相关的内容,部分
- 俄罗斯方块 Tetris
- PAT甲题题解-1091. Acute Stroke (30)-BFS
- java 中JFinal getModel方法和数据库使用出现问题解决办法
- POJ-2480 Longge's problem 欧拉函数
- java做网页客户端_如何成为 Java web开发者
- 32位/64位WIN2003各版本支持内存列表 彻底解决XP系统无法支持4G内存的问题
- Linux查看vga分辨率,调整ubuntu启动界面vga分辨率的方法
- 2017 年,最热开源静态网站生成器 TOP 20 揭晓!
- 华为畅享max什么时候鸿蒙,华为鸿蒙系统正式推送 具体怎么样及支持哪些手机带你前先看...
- 看完牛客网19年测试全部面筋,有了这篇测试面试100问的博客
- HTML奥运五环的实现
- php生成二维码海报
- k8s踩坑记录——证书一年有效期
热门文章
- linux mdadm 创建raid,Linux下用mdadm命令创建软raid5
- 简述php无限极分类,PHP 无限极分类
- Ubuntu 开放端口
- android怎么变iphone,EasyTouch: 让你的Android秒变“iPhone”
- Apache Kafka教程A系列:消费者群体示例
- Linux系统I/O模型和网络I/O模型
- 喜马拉雅小雅Nano首发,仅需1元即可预约限量抢购
- 如何理解计算机技术对艺术设计的意义,艺术设计的意义是什么?
- 为什么开发人员工作10多年了还会迷茫?
- 二叉搜索树的后序遍历序列(Java)