背景

我们的埋点数据上传到S3,大概是每天10亿条的数据量级别。最近花了一些时间思考和学习如何将每天如此大量的数据从S3导入到Clickhouse,为后续的实时查询做准备。

方案一

1. 先将S3的数据导入到hive,这一步操作比较简单,创建一个外部表即可,按日期字段进行分区。

CREATE TABLE `s3_to_hive_test`(id             ,aaa            ,bbb            ,ccc            ,ddd            ,……              )
PARTITIONED BY ( `ingestion_date` string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION's3://host/bucket/path'
TBLPROPERTIES ('last_modified_by'='hadoop', 'last_modified_time'='1625729438', 'parquet.column.index.access'='true', 'spark.sql.create.version'='2.2 or prior', 'spark.sql.sources.schema.numPartCols'='1', 'spark.sql.sources.schema.numParts'='1', 'spark.sql.sources.schema.part.0'='{……}', 'spark.sql.sources.schema.partCol.0'='ingestion_date')

2. 每天定时从hive把前一天的数据导入到Clickhouse,这里可以借助waterdrop(现改名为seatunnel)工具进行导入,定时调度可以自己写一个shell脚本,如果导入后还需要进行数据清洗、聚合等,推荐DolphinScheduler。

#waterdrop的config文件spark {spark.app.name = "Waterdrop"spark.executor.cores = 1spark.executor.memory = "2g" // 这个配置必需填写,否则会使用 sparksql 内置元数据库spark.default.parallelism = 12spark.driver.memory = "64g"spark.sql.catalogImplementation = "hive"
}
input {hive {pre_sql = "select `id`,`aaa`,`bbb`,`ccc`,`ddd`,…… from default.s3_to_hive_test where ……"table_name = "waterdrop_tmp"}
}
filter {
}
output {clickhouse {host = "ch_host:ch_port"database = "default"username = "***"password = "******"table = "ch_mergetree_test"bulk_size = 200000retry = 3fields =
[   id             ,aaa            ,bbb            ,ccc            ,ddd            ,……              ]}
}

3. 导入后的清洗、聚合等操作。

适用场景:

绝大多数的场景都适用,之前我们项目也是采用此方案。目前由于引入K8S进行管理,hive、Spark、waterdrop、DolphinScheduler分属不同容器,要处理太多网络通信的问题,故改用更为简单的方案三。

方案二

1. 如方案一,先将S3的数据以外部表的方式导入到hive中并按日期分区。

CREATE TABLE `s3_to_hive_test`(id             ,aaa            ,bbb            ,ccc            ,ddd            ,……              )
PARTITIONED BY ( `ingestion_date` string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION's3://host/bucket/path'
TBLPROPERTIES ('last_modified_by'='hadoop', 'last_modified_time'='1625729438', 'parquet.column.index.access'='true', 'spark.sql.create.version'='2.2 or prior', 'spark.sql.sources.schema.numPartCols'='1', 'spark.sql.sources.schema.numParts'='1', 'spark.sql.sources.schema.part.0'='{……}', 'spark.sql.sources.schema.partCol.0'='ingestion_date')

2. Clickhouse建立hive集成表

CREATE TABLE IF NOT EXISTS hive_to_ch_test
(`id` String NULL,`aaa` String NULL,`bbb` String  NULL,`ccc` String NULL,`ddd` String NULL,……
) ENGINE = Hive('thrift://host:port', 'database', 'table');
PARTITION BY ingestion_date

3. Clickhouse内部将hive集成表的数据导入到MergeTree表

insert into ch_mergetree_test(id             ,aaa            ,bbb            ,ccc            ,ddd            ,……                   )
SELECTid,ifNull(aaa, ''),ifNull(bbb, ''),ifNull(ccc, ''),ifNull(ddd, ''),……
FROM hive_to_ch_test
WHERE ……

4. 后续的数据清洗、聚合等操作

在实测中,Clickhouse到22.4版本为止似乎还不支持hive集成表的底层存储为S3这种形式。具体表现为能建立hive集成表,但查询的时候报以下错误:

Query id: bfeb2774-eb2b-4b2c-9230-64bd6d35acfe0 rows in set. Elapsed: 0.013 sec. Received exception from server (version 22.4.5):
Code: 210. DB::Exception: Received from localhost:9000. DB::Exception: Unable to connect to HDFS: InvalidParameter: Cannot parse URI: hdfs://****, missing port or invalid HA configuration     Caused by: HdfsConfigNotFound: Config key: dfs.ha.namenodes.**** not found. (NETWORK_ERROR)

但支持底层为HDFS,如下图所示。(注意:如HDFS是HA模式,需要参考官方文档进行一些额外的配置,否则也会报以上错误) 适用场景:

Clickhouse版本必须高于22.1,因为hive集成表引擎是在22.1版本才发布的。详见Clickhouse ChangeLog

方案三

大道至简,直接省略S3导入到Hive这个步骤。

1. 建立S3集成表

DROP TABLE IF EXISTS s3_to_ch_test ;CREATE TABLE s3_to_ch_test ( `id` String NULL,`aaa` String NULL,`bbb` String  NULL,`ccc` String NULL,`ddd` String NULL,……             ) ENGINE=S3(concat('https://s3_host/bucket/path/*' ),'accessKey','secretKey', 'Parquet')
SETTINGS input_format_parquet_allow_missing_columns=true ;

2. Clickhouse内部将数据从S3集成表导入MergeTree表

insert into ch_mergetree_test(id             ,aaa            ,bbb            ,ccc            ,ddd            ,……                   )
SELECTid,ifNull(aaa, ''),ifNull(bbb, ''),ifNull(ccc, ''),ifNull(ddd, ''),……
FROM s3_to_ch_test
WHERE ……

3. 后续的数据清洗、集成等操作

当然,在具体实施过程,还是会遇到一些坑。

坑一、超出内存限制

由于我们在S3中存储的文件格式是Parquet类型,Parquet是面向分析型业务的列式存储格式,Clickhouse在处理Parquet文件是内存密集型的。我尝试将output_format_parquet_row_group_size 参数调小,但没有任何作用,仍报上述异常。猜测Clickhouse在查询数据时并不是按文件一个个读取并插入的,而是将所有文件的列为单位装到内存中。因为我单个文件最大也才10G,分配了50G内存仍超出内存。

解决方案:调大内存限制的值。

方式1. 临时设置(仅对当前session有效):

set max_memory_usage=150000000000;select * from system.settings where name='max_memory_usage';

方式2. 修改/etc/clickhouse-server/users.xml文件(长期有效)

<max_memory_usage>150000000000</max_memory_usage>

坑二、效率问题

解决方案1:加大内存

内存100G的情况下,实测10亿条数据导入到MergeTree表消耗时间4300+ Sec,加到150G消耗时间3800+ Sec。

解决方案2:增加线程数

当内存150G,max_insert_threads=1时,耗时3837s

         当内存150G,max_insert_threads=2时,耗时1939s

当内存150G,max_insert_threads=3时,耗时1407s

当内存150G,max_insert_threads=4时,超出内存限制!!

虽然增加线程数对导入效率会有明显提升,但不意味着线程数越多越好,因为高线程数是以高内存为代价的,需要根据服务器内存和导入数据量的情况,平衡好max_memory_usage和max_insert_threads的关系。

Clickhouse 从S3/Hive导入数据相关推荐

  1. 从S3中导入数据到Dynamodb

    本节如果你已经从Dynamodb中导出过数据,而且导出的文件以及被存入S3.文件内部结构会在Verify Data Export File 中描写叙述. 我们称之前导出数据的原始表为source ta ...

  2. sqoop从hive导入数据到mysql时出现主键冲突

    今天在将一个hive数仓表导出到mysql数据库时出现进度条一直维持在95%一段时间后提示失败的情况,搞了好久才解决.使用的环境是HUE中的Oozie的workflow任何调用sqoop命令,该死的o ...

  3. Hadoop Hive导入数据命令

  4. 学习笔记Hive(四) —— Hive应用(2)—— Hive导入及导出数据

    四.Hive导入及导出数据 通过HDFS直接导入导出 通过Hive命令导入导出 4.1.Hive导入数据的语法 LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRI ...

  5. Hive导入和导出数据

    Hive导入数据 (1)从本地文件导入 load data local inpath 'test.txt' into table test; (2)从HDFS导入 load data inpath ' ...

  6. 存在的hive插入数据_往hive表中插入数据以及导出数据

    转载:https://blog.csdn.net/qq_26442553/article/details/80380590 转载:https://blog.csdn.net/weixin_436817 ...

  7. Shlle脚本传参调用seatunnel(原waterdrop)将hive中数据导入ClickHouse

    前言 公司分析数据已经存入hive,但需要输入参数计算得到很长一段时间的趋势变化数据(不固定查询),经调研ClickHouse时序优化后比较满足需求,并且ClickHouse在数据量大时最好采用DNS ...

  8. hive导入导出数据案例

    查询数据: use ods;set /user.password=ODS-SH;select * from base_cdma_all limit 10; use tag_bonc;select * ...

  9. Hive数据导入——数据存储在Hadoop分布式文件系统中,往Hive表里面导入数据只是简单的将数据移动到表所在的目录中!...

    转自:http://blog.csdn.net/lifuxiangcaohui/article/details/40588929 Hive是基于Hadoop分布式文件系统的,它的数据存储在Hadoop ...

最新文章

  1. spdlog源码阅读 (1): sinks
  2. Hadoop文件系统元数据fsimage和编辑日志edits
  3. VM上安装Linux找不到硬盘
  4. Python之PIL库的运用、GIF处理
  5. 一个球从100 米高的自由落下的反弹高度
  6. 借助TensorFlow框架,到底能做什么?
  7. CCCC L1-002. 打印沙漏【图形打印】
  8. SSD(based on Caffe)环境配置
  9. jquery1.9 下检测浏览器类型和版本的方法
  10. 阶段3 3.SpringMVC·_07.SSM整合案例_07.ssm整合之编写MyBatis框架测试保存的方法
  11. binder机制原理android,Binder机制1---Binder原理介绍
  12. SQL中的Northwind数据库
  13. 投递简历用什么邮箱最好用?
  14. 2022年淘宝天猫双十一预售红包优惠券满减活动什么时候开始天猫淘宝双11预售定金可以退款吗?
  15. SQL 添加、删除、更改字段(属性)
  16. 中标麒麟 NeoKylin 5.1 安装C++环境
  17. re2c使用小结(2)
  18. 记录自己折腾不止的人生,留住时光的一抹轨迹
  19. MySQL:递减/递减更新一列字段值
  20. Oracle使用max()函数遇到的坑

热门文章

  1. UniApp + JAVA连接百度云ocr进行身份证识别
  2. Nikto详细使用教程
  3. 小程序app.json: app.json 未找到报错解决记录
  4. 由浅入深理解latent diffusion/stable diffusion(2):扩散生成模型的工作原理
  5. (四)图像的空域锐化_一阶梯度算子
  6. 如何使用threejs实现第一人称视角的移动
  7. 阿里的数据中台正在背离初心
  8. 中国传媒大学助学自考,多省份组织统考和实践科目报考
  9. 6.10初步建立项目
  10. sys_config.fex