摘要:本文介绍如何使用Hudi自带入湖工具DeltaStreamer进行数据的实时入湖。

本文分享自华为云社区《华为FusionInsight MRS实战 - Hudi实时入湖之DeltaStreamer工具最佳实践》,作者: 晋红轻 。

背景

传统大数据平台的组织架构是针对离线数据处理需求设计的,常用的数据导入方式为采用sqoop定时作业批量导入。随着数据分析对实时性要求不断提高,按小时、甚至分钟级的数据同步越来越普遍。由此展开了基于spark/flink流处理机制的(准)实时同步系统的开发。

然而实时同步从一开始就面临如下几个挑战:

  • 小文件问题。不论是spark的microbatch模式,还是flink的逐条处理模式,每次写入HDFS时都是几MB甚至几十KB的文件。长时间下来产生的大量小文件,会对HDFS namenode产生巨大的压力。
  • 对update操作的支持。HDFS系统本身不支持数据的修改,无法实现同步过程中对记录进行修改。
  • 事务性。不论是追加数据还是修改数据,如何保证事务性。即数据只在流处理程序commit操作时一次性写入HDFS,当程序rollback时,已写入或部分写入的数据能随之删除。

Hudi就是针对以上问题的解决方案之一。使用Hudi自带的DeltaStreamer工具写数据到Hudi,开启–enable-hive-sync 即可同步数据到hive表。

Hudi DeltaStreamer写入工具介绍

DeltaStreamer工具使用参考 https://hudi.apache.org/cn/docs/writing_data.html

HoodieDeltaStreamer实用工具 (hudi-utilities-bundle中的一部分) 提供了从DFS或Kafka等不同来源进行摄取的方式,并具有以下功能。

  • 从Kafka单次摄取新事件,从Sqoop、HiveIncrementalPuller输出或DFS文件夹中的多个文件
  • 支持json、avro或自定义记录类型的传入数据
  • 管理检查点,回滚和恢复
  • 利用DFS或Confluent schema注册表的Avro模式。
  • 支持自定义转换操作

场景说明

  1. 生产库数据通过CDC工具(debezium)实时录入到MRS集群中Kafka的指定topic里。
  2. 通过Hudi提供的DeltaStreamer工具,读取Kafka指定topic里的数据并解析处理。
  3. 同时使用DeltaStreamer工具将处理后的数据写入到MRS集群的hive里。

样例数据简介

生产库MySQL原始数据:

CDC工具debezium简介

对接步骤具体参考:https://fusioninsight.github.io/ecosystem/zh-hans/Data_Integration/DEBEZIUM/

完成对接后,针对MySQL生产库分别做增、改、删除操作对应的kafka消息

增加操作: insert into hudi.hudisource3 values (11,“蒋语堂”,“38”,“女”,“图”,“播放器”,“28732”);

对应kafka消息体:

更改操作: UPDATE hudi.hudisource3 SET uname=‘Anne Marie333’ WHERE uid=11;

对应kafka消息体:

删除操作: delete from hudi.hudisource3 where uid=11;

对应kafka消息体:

调试步骤

华为MRS Hudi样例工程获取

根据实际MRS版本登录github获取样例代码: https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0

打开工程SparkOnHudiJavaExample

样例代码修改及介绍

1.debeziumJsonParser

说明:对debezium的消息体进行解析,获取到op字段。

源码如下:

package com.huawei.bigdata.hudi.examples;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;public class debeziumJsonParser {public static String getOP(String message){JSONObject json_obj = JSON.parseObject(message);String op = json_obj.getJSONObject("payload").get("op").toString();return  op;}
}

2.MyJsonKafkaSource

说明:DeltaStreamer默认使用org.apache.hudi.utilities.sources.JsonKafkaSource消费kafka指定topic的数据,如果消费阶段涉及数据的解析操作,则需要重写MyJsonKafkaSource进行处理。

以下是源码,增加注释

package com.huawei.bigdata.hudi.examples;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import java.util.Map;/*** Read json kafka data.*/
public class MyJsonKafkaSource extends JsonSource {private static final Logger LOG = LogManager.getLogger(MyJsonKafkaSource.class);private final KafkaOffsetGen offsetGen;private final HoodieDeltaStreamerMetrics metrics;public MyJsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,SchemaProvider schemaProvider) {super(properties, sparkContext, sparkSession, schemaProvider);HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder();this.metrics = new HoodieDeltaStreamerMetrics(builder.withProperties(properties).build());properties.put("key.deserializer", StringDeserializer.class);properties.put("value.deserializer", StringDeserializer.class);offsetGen = new KafkaOffsetGen(properties);}@Overrideprotected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());if (totalNewMsgs <= 0) {return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));}JavaRDD<String> newDataRDD = toRDD(offsetRanges);return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));}private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).filter((x)->{//过滤空行和脏数据String msg = (String)x.value();if (msg == null) {return false;}try{String op = debeziumJsonParser.getOP(msg);}catch (Exception e){return false;}return true;}).map((x) -> {//将debezium接进来的数据解析写进map,在返回map的tostring, 这样结构改动最小String msg = (String)x.value();String op = debeziumJsonParser.getOP(msg);JSONObject json_obj = JSON.parseObject(msg, Feature.OrderedField);Boolean is_delete = false;String out_str = "";Object out_obj = new Object();if(op.equals("c")){out_obj =  json_obj.getJSONObject("payload").get("after");}else if(op.equals("u")){out_obj =   json_obj.getJSONObject("payload").get("after");}else {is_delete = true;out_obj =   json_obj.getJSONObject("payload").get("before");}Map out_map = (Map)out_obj;out_map.put("_hoodie_is_deleted",is_delete);out_map.put("op",op);return out_map.toString();});}
}

3.TransformerExample

说明: 入湖hudi表或者hive表时候需要指定的字段

以下是源码,增加注释

package com.huawei.bigdata.hudi.examples;import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;/*** 功能描述* 对获取的数据进行format*/
public class TransformerExample implements Transformer, Serializable {/*** format data** @param JavaSparkContext jsc* @param SparkSession sparkSession* @param Dataset<Row> rowDataset* @param TypedProperties properties* @return Dataset<Row>*/@Overridepublic Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,TypedProperties properties) {JavaRDD<Row> rowJavaRdd = rowDataset.toJavaRDD();List<Row> rowList = new ArrayList<>();for (Row row : rowJavaRdd.collect()) {Row one_row = buildRow(row);rowList.add(one_row);}JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList);List<StructField> fields = new ArrayList<>();builFields(fields);StructType schema = DataTypes.createStructType(fields);Dataset<Row> dataFrame = sparkSession.createDataFrame(stringJavaRdd, schema);return dataFrame;}private void builFields(List<StructField> fields) {fields.add(DataTypes.createStructField("uid", DataTypes.IntegerType, true));fields.add(DataTypes.createStructField("uname", DataTypes.StringType, true));fields.add(DataTypes.createStructField("age", DataTypes.StringType, true));fields.add(DataTypes.createStructField("sex", DataTypes.StringType, true));fields.add(DataTypes.createStructField("mostlike", DataTypes.StringType, true));fields.add(DataTypes.createStructField("lastview", DataTypes.StringType, true));fields.add(DataTypes.createStructField("totalcost", DataTypes.StringType, true));fields.add(DataTypes.createStructField("_hoodie_is_deleted", DataTypes.BooleanType, true));fields.add(DataTypes.createStructField("op", DataTypes.StringType, true));}private Row buildRow(Row row) {Integer uid = row.getInt(0);String uname = row.getString(1);String age = row.getString(2);String sex = row.getString(3);String mostlike = row.getString(4);String lastview = row.getString(5);String totalcost = row.getString(6);Boolean _hoodie_is_deleted = row.getBoolean(7);String op = row.getString(8);Row returnRow = RowFactory.create(uid, uname, age, sex, mostlike, lastview, totalcost, _hoodie_is_deleted, op);return returnRow;}
}

4.DataSchemaProviderExample

说明: 分别指定MyJsonKafkaSource返回的数据格式为source schema,TransformerExample写入的数据格式为target schema

以下是源码

package com.huawei.bigdata.hudi.examples;import org.apache.avro.Schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaSparkContext;/*** 功能描述* 提供sorce和target的schema*/
public class DataSchemaProviderExample extends SchemaProvider {public DataSchemaProviderExample(TypedProperties props, JavaSparkContext jssc) {super(props, jssc);}/*** source schema** @return Schema*/@Overridepublic Schema getSourceSchema() {Schema avroSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"hoodie_source\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}");return avroSchema;}/*** target schema** @return Schema*/@Overridepublic Schema getTargetSchema() {Schema avroSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"mytest_record\",\"namespace\":\"hoodie.mytest\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}");return avroSchema;}
}

将工程打包(hudi-security-examples-0.7.0.jar)以及json解析包(fastjson-1.2.4.jar)上传至MRS客户端

DeltaStreamer启动命令

登录客户端执行一下命令获取环境变量以及认证

source /opt/hadoopclient/bigdata_env
kinit developuser
source /opt/hadoopclient/Hudi/component_env

DeltaStreamer启动命令如下:

spark-submit --master yarn-client \
--jars /opt/hudi-demo2/fastjson-1.2.4.jar,/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \
--driver-class-path /opt/hadoopclient/Hudi/hudi/conf:/opt/hadoopclient/Hudi/hudi/lib/*:/opt/hadoopclient/Spark2x/spark/jars/*:/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
spark-internal --props file:///opt/hudi-demo2/kafka-source.properties \
--target-base-path /tmp/huditest/delta_demo2 \
--table-type COPY_ON_WRITE  \
--target-table delta_demo2  \
--source-ordering-field uid \
--source-class com.huawei.bigdata.hudi.examples.MyJsonKafkaSource \
--schemaprovider-class com.huawei.bigdata.hudi.examples.DataSchemaProviderExample \
--transformer-class com.huawei.bigdata.hudi.examples.TransformerExample \
--enable-hive-sync --continuous

kafka.properties配置

// hudi配置
hoodie.datasource.write.recordkey.field=uid
hoodie.datasource.write.partitionpath.field=
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.datasource.write.hive_style_partitioning=true
hoodie.delete.shuffle.parallelism=10
hoodie.upsert.shuffle.parallelism=10
hoodie.bulkinsert.shuffle.parallelism=10
hoodie.insert.shuffle.parallelism=10
hoodie.finalize.write.parallelism=10
hoodie.cleaner.parallelism=10
hoodie.datasource.write.precombine.field=uid
hoodie.base.path = /tmp/huditest/delta_demo2
hoodie.timeline.layout.version = 1// hive config
hoodie.datasource.hive_sync.table=delta_demo2
hoodie.datasource.hive_sync.partition_fields=
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
hoodie.datasource.hive_sync.use_jdbc=false// Kafka Source topic
hoodie.deltastreamer.source.kafka.topic=hudisource
// checkpoint
hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/delta_demo2/checkpoint/// Kafka props
bootstrap.servers=172.16.9.117:21005
auto.offset.reset=earliest
group.id=a5
offset.rang.limit=10000

注意:kafka服务端配置 allow.everyone.if.no.acl.found 为true

使用Spark查询

spark-shell --master yarnval roViewDF = spark.read.format("org.apache.hudi").load("/tmp/huditest/delta_demo2/*")
roViewDF.createOrReplaceTempView("hudi_ro_table")
spark.sql("select * from  hudi_ro_table").show()

Mysql增加操作对应spark中hudi表查询结果:

Mysql更新操作对应spark中hudi表查询结果:

删除操作:

使用Hive查询

beelineselect * from delta_demo2;

Mysql增加操作对应hive表中查询结果:

Mysql更新操作对应hive表中查询结果:

Mysql删除操作对应hive表中查询结果:

点击关注,第一时间了解华为云新鲜技术~

Hudi自带工具DeltaStreamer的实时入湖最佳实践相关推荐

  1. 多库多表场景下使用 Amazon EMR CDC 实时入湖最佳实践

    一.前言CDC(Change Data Capture) 从广义上讲所有能够捕获变更数据的技术都可以称为 CDC,但本篇文章中对 CDC 的定义限定为以非侵入的方式实时捕获数据库的变更数据.例如:通过 ...

  2. 华为云MRS基于Hudi和HetuEngine构建实时数据湖最佳实践

    数据湖与实时数据湖是什么? 各个行业企业都在构建企业级数据湖,将企业内多种格式数据源汇聚的大数据平台,通过严格的数据权限和资源管控,将数据和算力开放给各种使用者.一份数据支持多种分析,是数据湖最大的特 ...

  3. 华为云FusionInsight MRS实战 - Hudi实时入湖之DeltaStreamer工具最佳实践

    背景 传统大数据平台的组织架构是针对离线数据处理需求设计的,常用的数据导入方式为采用sqoop定时作业批量导入.随着数据分析对实时性要求不断提高,按小时.甚至分钟级的数据同步越来越普遍.由此展开了基于 ...

  4. Flink原理解析50篇(四)-基于 Flink CDC 打通数据实时入湖

    在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎Flink和数据湖Apache Iceberg两种技术,来解决业务数据实时入湖相关的问题. 0 ...

  5. 基于Flink CDC打通数据实时入湖

    作者 | 数据社       责编 | 欧阳姝黎 在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎 Flink 和数据湖 Apache Ice ...

  6. 基于Flink+ClickHouse构建实时游戏数据分析最佳实践

    简介:本实践介绍如何快速收集海量用户行为数据,实现秒级响应的实时用户行为分析,并通过实时流计算.云数据库ClickHouse等技术进行深入挖掘和分析,得到用户特征和画像,实现个性化系统推荐服务. 直达 ...

  7. Apache Hudi 在 B 站构建实时数据湖的实践

    简介: B 站选择 Flink + Hudi 的数据湖技术方案,以及针对其做出的优化. 本文作者喻兆靖,介绍了为什么 B 站选择 Flink + Hudi 的数据湖技术方案,以及针对其做出的优化.主要 ...

  8. 基于 Apache Flink Table Store 的全增量一体实时入湖

    摘要:本文简要回顾了数据入湖(仓)的发展阶段,针对在数据库数据入湖中面临的问题,提出了使用 Flink Table Store 作为全增量一体入湖的解决方案,并辅以开源 Demo 的测试结果作为展示. ...

  9. 新浪微博的实时数据湖建设实践

    一 摘要 Apache Flink是目前大数据领域最流行的流批一体化计算引擎,而数据湖技术也是互联网时代的产物,以Iceberg.Hudi和Delta为代表的数据湖技术应运而生.Iceberg目前已经 ...

最新文章

  1. Microsoft .NET Framework 4.6.1
  2. python实现D‘Agostino‘s K-squared test正态分布检验
  3. 掌握好这些不变的底层知识,任他东西南北风!
  4. 【收藏】蘑菇博客mogu_blog项目文档
  5. [Android]在Dagger 2中使用RxJava来进行异步注入(翻译)
  6. ArcUser 2006第2期拾零
  7. FTP服务器日志解析
  8. 常见英文缩写 (持续更新……)
  9. 『华为ICT』数据通信网络基础
  10. 软件使用疑难杂症解决记录篇——科瑞工程量计算簿
  11. 各种好看的cosplay小姐姐热门图片,统统爬取收藏
  12. 采用 MRT-LBM 模拟旋转圆柱绕流2---MATLAB代码--王富海2017--基于 MRT-LBM 的流场与声场仿真计算
  13. 网页怎么预先加载模型_修补预先训练的语言模型
  14. 【愚公系列】2023年05月 攻防世界-MOBILE(Phishing is not a crime-2)
  15. STM32 GPS定位
  16. Resources资源
  17. 如何快速的学习ssh框架
  18. 计算机是如何执行程序的
  19. PQGrid商业化的表格组件
  20. boss直聘zp_stoken逆向分析源码放送

热门文章

  1. CSS 动态超链接样式 LVFHA 或 LVHFA
  2. imail PHP,NT下基于邮件服务软件(IMAIL)的邮件发送程序–(本地版)-PHP教程,邮件处理...
  3. Vrep当中的一些好用的工具(未完)
  4. 休息是为了更好的出发
  5. 北京理工大学计算机学院研究生培养方案,北京理工大学2018版学术型研究生培养方案.PDF...
  6. 手机距离传感器坏了有什么影响_适合手机兼职的工作有什么影响吗
  7. [转] polymorphic databinding solutions
  8. angular复习笔记4-模板
  9. 牛客寒假算法基础集训营2 处女座的测验(一) (数论+构造)
  10. 学习人工智能的头四个月