业务背景

将hive中多个表数据同步到clickhouse中提供实时查询,表均2亿条记录。对同步工具的要求一是能够实现抽数时间不宜过长;二是能够自定义控制将数据抽取到clickhouse集群指定的节点实例上。作为一名java开发,自然不想过多依赖Hadoop那一套,网上搜索一番后决定使用seatunnel,通过简单配置化就可以实现数据的抽取。

简介

Apache SeaTunnel (Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。

官方文档:https://interestinglab.github.io/seatunnel-docs/#/

安装

安装比较简单,参考官方文档即可。

配置

config.conf 下述配置是从hive中抽数插入到clickhouse中的配置,数据源是hive的一张表,通过seatunnel插件根据id字段进行分片插入clickhouse集群不同分片。

spark {spark.sql.catalogImplementation = "hive"spark.app.name = "hive2clickhouse"spark.executor.instances = 30spark.executor.cores = 1 spark.executor.memory = "2g"spark.ui.port = 13000
}input {hive {pre_sql = "select id,name,create_time from table"table_name = "table_tmp"}
}filter {convert {source_field = "data_source"new_type = "UInt8"}org.interestinglab.waterdrop.filter.Slice {source_table_name = "table_tmp"source_field = "id"slice_num = 2slice_code = 0result_table_name = "table_8123"}org.interestinglab.waterdrop.filter.Slice {source_table_name = "table_tmp"source_field = "id"slice_num = 2slice_code = 1result_table_name = "table_8124"}
}output {clickhouse {source_table_name="table_8123"host = "ip1:8123"database = "db_name"username="username"password="pwd"table = "table1"fields = ["id","name","create_time"]clickhouse.socket_timeout = 50000retry_codes = [209, 210]retry = 3bulk_size = 500000}clickhouse {source_table_name="table_8124"host = "ip2:8124"database = "db_name"username="username"password="pwd"table = "table1"fields = ["id","name","create_time"]clickhouse.socket_timeout = 50000retry_codes = [209, 210]retry = 3bulk_size = 500000}
}

插件开发

package org.interestinglab.waterdrop.filterimport io.github.interestinglab.waterdrop.apis.BaseFilter
import io.github.interestinglab.waterdrop.config.{Config, ConfigFactory}
import org.apache.spark.sql.functions.{col, hash, lit, udf}
import org.apache.spark.sql.{Dataset, Row, SparkSession}class Slice extends BaseFilter {var conf: Config = ConfigFactory.empty()/*** Set Config.* */override def setConfig(config: Config): Unit = {this.conf = config}/*** Get Config.* */override def getConfig(): Config = {this.conf}override def checkConfig(): (Boolean, String) = {if (!conf.hasPath("source_field")) {(false, "please specify [source_field] as a non-empty string")} else if (!conf.hasPath("slice_code")) {(false, "please specify [slice_code] as a non-empty string")} else if (!conf.hasPath("slice_num")) {(false, "please specify [slice_num] as a non-empty string")} else {(true, "")}}override def process(spark: SparkSession, df: Dataset[Row]): Dataset[Row] = {val srcField = conf.getString("source_field")val sliceCode = conf.getInt("slice_code")val sliceNum = conf.getInt("slice_num")df.filter(func(hash(col(srcField)), lit(sliceNum), lit(sliceCode)))}val func = udf((s: String, num: Int, target: Int) => {val moCOde = s.toDouble % numval absValue = moCOde.toInt.absabsValue == target})
}

启动

../bin/start-waterdrop.sh --master local[4] --deploy-mode client --config.conf

seatunnel 简单使用(原名waterdrop)相关推荐

  1. Apache SeaTunnel (Incubating) 2.1.0 发布,内核重构、全面支持 Flink

    点击蓝字 关注我们 2021 年 12 月 9 日,SeaTunnel (原名 Waterdrop) 进入 Apache 孵化器.4 个月后的 2022 年 3 月 18 日,其首个 Apache 版 ...

  2. 数据同步工具—SeaTunnel简介

    SeaTunnel 简介 SeaTunnel是一个非常易于使用的超高性能分布式数据集成平台,支持海量数据的实时同步.它每天可以稳定高效地同步数百亿数据,并已用于近100家公司的生产. SeaTunne ...

  3. 数据集成框架SeaTunnel学习笔记

    文章目录 概述 介绍 应用场景 插件支持情况 安装和配置 安装 配置 使用 案例1入门 案例2传参 概述 介绍 SeaTunnel 是一个简单易用的数据集成框架,在企业中,由于开发时间或开发部门不通用 ...

  4. 【SeaTunnel】从一个数据集成组件演化成企业级的服务

    点亮 ⭐️ Star · 照亮开源之路 GitHub:https://github.com/apache/incubator-seatunnel 在 7 月 24 日 Apache SeaTunnel ...

  5. 盘点2021年晋升为Apache TLP的大数据相关项目

    时间过得真快,2021年就过去了,又到了一年总结的时候了.本文将延续之前的惯例来总结一下过去一年大数据相关的项目顺利毕业成 Apache 顶级项目.在2021年一共有四个大数据相关项目顺利毕业成顶级项 ...

  6. Shlle脚本传参调用seatunnel(原waterdrop)将hive中数据导入ClickHouse

    前言 公司分析数据已经存入hive,但需要输入参数计算得到很长一段时间的趋势变化数据(不固定查询),经调研ClickHouse时序优化后比较满足需求,并且ClickHouse在数据量大时最好采用DNS ...

  7. 【大数据】什么是数据集成?(SeaTunnel 集成工具介绍)

    文章目录 一.什么是数据集成? 二.ETL 又是什么? 三.SeaTunnel 介绍 1)概述 2)SeaTunnel 的作用 3)SeaTunnel 的特点 4)Seatunnel 优势与缺点 5) ...

  8. seatunnel 高性能分布式数据集成平台

    seatunnel 高性能分布式数据集成平台 一.介绍 二.为什么我们需要 seatunnel 三.seatunnel 使用场景 四.seatunnel 的特性 五.seatunnel 的工作流程 六 ...

  9. 马蜂窝毕博:分析完这9点工作原理,我们最终选择了 Apache SeaTunnel!

    点亮 ⭐️ Star · 照亮开源之路 https://github.com/apache/incubator-seatunnel ​ 讲师简介 ​ 毕博 马蜂窝 数据工程师 在10月15日,Apac ...

最新文章

  1. nginx中的rewrite用法及实例
  2. esp32 micropython spiffs_二十一,ESP32 SPIFFS:读取一个文件
  3. 《Linux内核分析》课程总结
  4. 【算法与数据结构专场】BitMap算法基本操作代码实现
  5. linux自定义和使用 shell 环境(一)
  6. nginx的模块化体系结构
  7. 支付宝开放新玩法:搜商家可领消费券
  8. linux中mysql不显示中文_linux中解决mysql中文乱码方法
  9. R for data science 之 stringr包
  10. Scala中I/O类使用详细解析
  11. 关于如何把支持VS2015的插件BabeLua改成支持VS2017
  12. 变频器LED显示灯闪烁_图文并茂,彻头彻尾认识变频器,适合初学者!
  13. AE-after Effects 笔记
  14. 钟表维修管理系统技术解析(三) 工单录入
  15. 机器学习——聚类算法(一)
  16. Python 内置函数dir()与对象的特殊属性以及一切都是对象的轻谈
  17. vue-live2d 看板娘
  18. JavaWeb HTML
  19. 基于lamp搭建Discuz论坛
  20. VS 2017生成exe(msi)文件

热门文章

  1. UCOSII系统时间管理
  2. IPv4/IPv6协议分析 实验报告
  3. TTL电平和CMOS电平
  4. 27-YongGC、MinorGC、 Major GC、FullGC傻傻分不清
  5. 工控监控计算机,华北工控:嵌入式计算机在配电站在线监控和事故预警系统......
  6. 计算机专业为什么学机床电焊,机械类最吃香的专业:为什么说学机械穷一辈子?...
  7. 基于分段平面性的单目深度估计 P3Depth: Monocular Depth Estimation with a Piecewise Planarity Prior
  8. day07CSRF漏洞
  9. 2023互联网拿捏大厂面试,2023最新版的Java面试突击班手册
  10. burpsuite导入网站的客户端证书