主要记录操作,操作内容参考尚硅谷Hudi公开资料以及Hudi官方文档

具体参看官方文档:https://hudi.apache.org/docs/hoodie_deltastreamer/

文章目录

  • DeltaStreamer工具介绍
  • 命令
  • 测试
    • 启动kafka集群,准备数据
    • 准备配置文件
    • 拷贝所需hudi的jar包到Spark
    • 运行导入命令
    • 查看导入结果

DeltaStreamer工具介绍

HoodieDeltaStreamer工具 (hudi-utilities-bundle中的一部分) 提供了从DFS或Kafka等不同来源进行摄取的方式,并具有以下功能:

  • 精准一次从Kafka采集新数据,从Sqoop、HiveIncrementalPuller的输出或DFS文件夹下的文件增量导入

  • 导入的数据支持json、avro或自定义数据类型

  • 管理检查点,回滚和恢复

  • 利用 DFS 或 Confluent schema registry的 Avro Schema

  • 支持自定义转换操作

命令

利用spark提交的命令如下

spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /opt/software/hudi-0.12.1/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.12.1.jar --help

Schema Provider和Source配置项:https://hudi.apache.org/docs/hoodie_deltastreamer

如下以File Based Schema Provider和JsonKafkaSource为例演示

测试

启动kafka集群,准备数据

  • 启动kafka集群,创建topic

    /opt/module/kafka/bin/kafka-topics.sh --bootstrap-server m2:9092 --create --topic hudi_test
    
  • java生产者代码往topic发送测试数据

    <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.2</version></dependency><!--fastjson <= 1.2.80 存在安全漏洞,--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency>
    
    import com.alibaba.fastjson.JSONObject;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
    import java.util.Random;public class TestKafkaProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "192.168.2.102:9092");props.put("acks", "-1");props.put("batch.size", "1048576");props.put("linger.ms", "5");props.put("compression.type", "snappy");props.put("buffer.memory", "33554432");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);Random random = new Random();for (int i = 0; i < 1000; i++) {JSONObject model = new JSONObject();model.put("userid", i);model.put("username", "name" + i);model.put("age", 18);model.put("partition", random.nextInt(100));producer.send(new ProducerRecord<String, String>("hudi_test", model.toJSONString()));}producer.flush();producer.close();}
    }
    

准备配置文件

  • 定义arvo所需schema文件(包括source和target)

    mkdir /opt/test/hudi-props/
    vim /opt/test/hudi-props/source-schema-json.avsc
    
    {        "type": "record","name": "Profiles",   "fields": [{"name": "userid","type": [ "null", "string" ],"default": null},{"name": "username","type": [ "null", "string" ],"default": null},{"name": "age","type": [ "null", "string" ],"default": null},{"name": "partition","type": [ "null", "string" ],"default": null}]
    }
    

    cp source-schema-json.avsc target-schema-json.avsc

  • 拷贝hudi配置base.properties

    cp /opt/software/hudi-0.12.1/hudi-utilities/src/test/resources/delta-streamer-config/base.properties /opt/test/hudi-props/
    
  • 根据源码里提供的模板,编写自己的kafka source的配置文件

    cp /opt/software/hudi-0.12.1/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties /opt/test/hudi-props/vim /opt/test/hudi-props/kafka-source.properties
    

    我这边编写好的kafka source 配置文件如下

    如下配置文件我都是在本地,实际上一般都是放在hdfs上

    如果放到HDFS上,可以把hudi-props目录下文件都put上去:hadoop fs -put /opt/module/hudi-props/ /

    然后例如/opt/test/hudi-props/source-schema-json.avsc改成

    hdfs://m1:8020/hudi-props/source-schema-json.avsc

    ###
    include=/opt/test/hudi-props/base.properties
    # Key fields, for kafka example
    hoodie.datasource.write.recordkey.field=userid
    hoodie.datasource.write.partitionpath.field=partition
    # schema provider configs
    # hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latest
    hoodie.deltastreamer.schemaprovider.source.schema.file=/opt/test/hudi-props/source-schema-json.avsc
    hoodie.deltastreamer.schemaprovider.target.schema.file=/opt/test/hudi-props/target-schema-json.avsc
    # Kafka Source
    #hoodie.deltastreamer.source.kafka.topic=uber_trips
    hoodie.deltastreamer.source.kafka.topic=hudi_test
    #Kafka props
    bootstrap.servers=m2:9092
    auto.offset.reset=earliest
    group.id=test-group
    schema.registry.url=http://localhost:8081
    

拷贝所需hudi的jar包到Spark

需要把之前打包打好的hudi-utilities-bundle_2.12-0.12.1.jar放入spark的jars路径下,否则报错找不到一些类和方法。

cp /opt/software/hudi-0.12.1/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.12.1.jar /opt/module/spark-3.2.2/jars/

运行导入命令

spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
/opt/module/spark-3.2.2/jars/hudi-utilities-bundle_2.12-0.12.1.jar \
--props /opt/test/hudi-props/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider  \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource  \
--source-ordering-field userid \
--target-base-path hdfs://m1:8020/tmp/hudi/hudi_test  \
--target-table hudi_test \
--op BULK_INSERT \
--table-type MERGE_ON_READ

查看导入结果

  • 启动spark-sql

    spark-sql \--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
    
  • 指定location创建hudi表

    use spark_hudi;create table hudi_test using hudi
    location 'hdfs://m1:8020/tmp/hudi/hudi_test'
    
  • 查询hudi表

    select * from hudi_test;
    

数据湖之Hudi:基于Spark引擎使用Hudi的DeltaStreamer组件工具相关推荐

  1. 数据湖:分布式开源处理引擎Spark

    系列专题:数据湖系列文章 1. 什么是Spark Apache Spark是一种高效且多用途的集群计算平台.换句话说,Spark 是一种开源的,大规模数据处理引擎.它提供了一整套开发 API,包括流计 ...

  2. 数据湖基本概念--什么是数据湖,数据湖又能干什么?为什么是Hudi

    一.什么是数据湖 对于经常跟数据打交道的同学,初步听到数据湖这个概念的时候,肯定有点懵,但是相信大家对于数据仓库 这个概念并不陌生. 到了20世纪80年代以后,基于关系型数据库的事务处理成为了企业IT ...

  3. 数据湖应用解析:Spark on Elasticsearch一致性问题

    概述 Spark与Elasticsearch(es)的结合,是近年来大数据解决方案很火热的一个话题.一个是出色的分布式计算引擎,另一个是出色的搜索引擎.近年来,越来越多的成熟方案落地到行业产品中,包括 ...

  4. grafana计算不同时间的差值_大数据时代!如何基于Spark Streaming构建实时计算平台...

    随着互联网技术的迅速发展,用户对于数据处理的时效性.准确性与稳定性要求越来越高,如何构建一个稳定易用并提供齐备的监控与预警功能的实时计算平台也成了很多公司一个很大的挑战. 自2015年携程实时计算平台 ...

  5. 多云时代下数据管理技术_建立一个混合的多云数据湖并使用Apache Spark执行数据处理...

    多云时代下数据管理技术 Azure / GCP / AWS / Terraform / Spark (Azure/GCP/AWS/Terraform/Spark) Five years back wh ...

  6. 数据湖(二):什么是Hudi

    https://bbs.csdn.net/forums/lansonhttps://bbs.csdn.net/forums/lanson 文章目录 什么是Hudi 什么是Hudi Apache Hud ...

  7. 大数据开源框架之基于Spark的气象数据处理与分析

    Spark配置请看: (30条消息) 大数据开源框架环境搭建(七)--Spark完全分布式集群的安装部署_木子一个Lee的博客-CSDN博客 目录 实验说明: 实验要求: 实验步骤: 数据获取: 数据 ...

  8. 如何快速构建企业级数据湖仓?

    更多技术交流.求职机会,欢迎关注字节跳动数据平台微信公众号,回复[1]进入官方交流群 本文整理自火山引擎开发者社区技术大讲堂第四期演讲,主要介绍了数据湖仓开源趋势.火山引擎 EMR 的架构及特点,以及 ...

  9. 数据湖技术之Hudi 集成 Spark

    数据湖技术之Hudi 集成 Spark 数据湖框架Hudi,从诞生之初支持Spark进行操作,后期支持Flink,接下来先看看与Spark整合使用,并且在0.9.0版本中,提供SparkSQL支持,编 ...

最新文章

  1. 「图像分类」从数据集和经典网络开始
  2. 2000坐标系高程与85高程转换_科普 | 如何在大疆智图中设置坐标系
  3. 在茫茫人海中发现相似的你——局部敏感哈希(LSH)
  4. Disjoint Set
  5. linux系统中/etc/syslog.conf文件解读
  6. 安卓psp模拟器联机教程_刺激战场国际服下载方法教程!安卓ios模拟器都有
  7. 看了一下lua的实现
  8. Google Test(Primer)(三)——断言
  9. 吴恩达机器学习练习2:optimset和fminunc函数
  10. 记一次open-falcon手动push数据
  11. app营销实训报告_数据科学实训总结(15):一睹python数据分析的真容
  12. 2020淘宝双十一快速刷金币工具
  13. 《关于促进互联网金融健康发展的指导意见》
  14. 活出生命的意义-读后感
  15. RTL8762 开发板试用
  16. linux net子系统-系统调用层
  17. C/S结构是什么意思?有什么优点?
  18. Elasticsearch 的全量同步和增量同步
  19. 手机通讯录误删怎么恢复?教你解决这个问题
  20. 笔记本选购指南-2022年初购买笔记本建议

热门文章

  1. 英飞凌XMC2GO控制读写英飞凌BGT24LTR22射频板寄存器
  2. jhipster修改jdl生成的实体类报错:liquibase.exception.ValidationFailedException: Validation Failed
  3. 计算机桌面色盲模式怎么取消,Win10色盲模式:颜色反转,色弱自动调整!
  4. linux同步到云盘,Linux命令行上传文件到百度网盘
  5. 谷歌的请求索引功能恢复了
  6. deepin系统安装wine
  7. 无废话硬核分享:Linux 基础知识点总结很详细,全的很,吐血奉献
  8. 太阳系“第十大行星”只比冥王星大一点
  9. flag是什么意思c语言6,flag是什么意思(给自己立flag是什么梗)
  10. 树莓派4b hat板机械图ad版(ad6.9)