数据质量工作流程

数据质量运行流程分为2个部分:在web端进行数据质量检测的流程定义,通过dolphinscheduer进行调度,提交到spark计算引擎;spark端负责解析数据质量模型的参数,通过读取数据、执行转换、输出三个步骤,完成数据质量检测任务,工作流程如下图所示。

在web端进行定义

数据质量定义如下图所示,这里只定义了一个节点。
以一个空值检测的输入参数为例,这个json文件会以字符串形式提交给spark端

{"name": "$t(null_check)","env": {"type": "batch","config": null},"readers": [{"type": "JDBC","config": {"database": "ops","password": "***","driver": "com.mysql.cj.jdbc.Driver","user": "root","output_table": "ops_ms_alarm","table": "ms_alarm","url": "jdbc:mysql://192.168.3.211:3306/ops?allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"}}],"transformers": [{"type": "sql","config": {"index": 1,"output_table": "total_count","sql": "SELECT COUNT(*) AS total FROM ops_ms_alarm"}},{"type": "sql","config": {"index": 2,"output_table": "null_items","sql": "SELECT * FROM ops_ms_alarm WHERE (alarm_time is null or alarm_time = '') "}},{"type": "sql","config": {"index": 3,"output_table": "null_count","sql": "SELECT COUNT(*) AS nulls FROM null_items"}}],"writers": [{"type": "JDBC","config": {"database": "dolphinscheduler3","password": "***","driver": "com.mysql.cj.jdbc.Driver","user": "root","table": "t_ds_dq_execute_result","url": "jdbc:mysql://192.168.3.212:3306/dolphinscheduler3?characterEncoding=utf-8&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false","sql": "select 0 as rule_type,'$t(null_check)' as rule_name,0 as process_definition_id,25 as process_instance_id,26 as task_instance_id,null_count.nulls AS statistics_value,total_count.total AS comparison_value,7 AS comparison_type,3 as check_type,0.95 as threshold,3 as operator,1 as failure_strategy,'hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测' as error_output_path,'2022-11-16 03:40:32' as create_time,'2022-11-16 03:40:32' as update_time from null_count full join total_count"}},{"type": "JDBC","config": {"database": "dolphinscheduler3","password": "***","driver": "com.mysql.cj.jdbc.Driver","user": "root","table": "t_ds_dq_task_statistics_value","url": "jdbc:mysql://192.168.3.212:3306/dolphinscheduler3?characterEncoding=utf-8&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false","sql": "select 0 as process_definition_id,26 as task_instance_id,1 as rule_id,'ZKTZKDBTRFDKXKQUDNZJVKNX8OIAEVLQ91VT2EXZD3U=' as unique_code,'null_count.nulls'AS statistics_name,null_count.nulls AS statistics_value,'2022-11-16 03:40:32' as data_time,'2022-11-16 03:40:32' as create_time,'2022-11-16 03:40:32' as update_time from null_count"}},{"type": "hdfs_file","config": {"path": "hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测","input_table": "null_items"}}]
}

spark端源码分析

DataQualityApplication程序入口

DataQualityApplication#main

public static void main(String[] args) throws Exception {//...
//从命令行获取参数String dataQualityParameter = args[0];
//   将json参数转为DataQualityConfiguration对象DataQualityConfiguration dataQualityConfiguration = JsonUtils.fromJson(dataQualityParameter,DataQualityConfiguration.class);//...
//构建 SparkRuntimeEnvironment的参数Config对象EnvConfig envConfig = dataQualityConfiguration.getEnvConfig();Config config = new Config(envConfig.getConfig());config.put("type",envConfig.getType());if (Strings.isNullOrEmpty(config.getString(SPARK_APP_NAME))) {config.put(SPARK_APP_NAME,dataQualityConfiguration.getName());}SparkRuntimeEnvironment sparkRuntimeEnvironment = new SparkRuntimeEnvironment(config);
//委托给 DataQualityContext执行DataQualityContext dataQualityContext = new DataQualityContext(sparkRuntimeEnvironment,dataQualityConfiguration);dataQualityContext.execute();
}

数据质量配置类

public class DataQualityConfiguration implements IConfig {@JsonProperty("name")private String name; // 名称@JsonProperty("env")private EnvConfig envConfig; // 环境配置@JsonProperty("readers")private List<ReaderConfig> readerConfigs; // reader配置@JsonProperty("transformers")private List<TransformerConfig> transformerConfigs;  // transformer配置@JsonProperty("writers")private List<WriterConfig> writerConfigs; // writer配置
//...
}

DataQualityContext#execute

从dataQualityConfiguration类中获取 readers、transformers、writers, 委托给SparkBatchExecution

public void execute() throws DataQualityException {// 将List<ReaderConfig>转为List<BatchReader>List<BatchReader> readers = ReaderFactory.getInstance().getReaders(this.sparkRuntimeEnvironment,dataQualityConfiguration.getReaderConfigs());
// 将List<TransformerConfig>转为List<BatchTransformer>List<BatchTransformer> transformers = TransformerFactory.getInstance().getTransformer(this.sparkRuntimeEnvironment,dataQualityConfiguration.getTransformerConfigs());
// 将List<WriterConfig>转为List<BatchWriter>List<BatchWriter> writers = WriterFactory.getInstance().getWriters(this.sparkRuntimeEnvironment,dataQualityConfiguration.getWriterConfigs());
// spark 运行环境if (sparkRuntimeEnvironment.isBatch()) {// 批模式sparkRuntimeEnvironment.getBatchExecution().execute(readers,transformers,writers);} else {// 流模式, 暂不支持throw new DataQualityException("stream mode is not supported now");}
}

ReaderFactory 类采用了单例和工厂方法的设计模式, 目前支持JDBC和HIVE 的数据源的读取, 对应Reader类HiveReader、JdbcReader
WriterFactory 类采用了单例和工厂方法的设计模式, 目前支持JDBC、HDFS、LOCAL_FILE 的数据源的输出, 对应Writer类 HdfsFileWriter LocalFileWriter JdbcWriter
TransformerFactory 类采用了单例和工厂方法的设计模式,目前仅支持TransformerType.SQL的转换器类型

结合json 可以看出 一个空值检测的reader、tranformer、 writer情况
1个reader : 读取源表数据

3个tranformer: total_count 行总数 、null_items 空值项(行数据) 、null_count (空值数),计算sql 如下
– SELECT COUNT() AS total FROM ops_ms_alarm
– SELECT * FROM ops_ms_alarm WHERE (alarm_time is null or alarm_time = ‘’)
– SELECT COUNT(
) AS nulls FROM null_items

3个writer:
第一个是jdbc writer, 将比较值、统计值 输出t_ds_dq_execute_result 数据质量执行结果表,

SELECT//...null_count.nulls AS statistics_value,total_count.total AS comparison_value,//...'hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测' AS error_output_path,//...
FROMnull_countFULL JOIN total_count

第二个是jdbc writer,将statistics_value写入到表 t_ds_dq_task_statistics_value

SELECT//...//...'null_count.nulls' AS statistics_name,null_count.nulls AS statistics_value,//...
FROMnull_count

第3个是hdfs writer,将空值项写入到hdfs 文件目录

{"type": "hdfs_file","config": {"path": "hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测","input_table": "null_items"}
}

目前 DolphinScheduler占不支持实时数据的质量检测。

SparkBatchExecution#execute

public class SparkBatchExecution implements Execution<BatchReader, BatchTransformer, BatchWriter> {private final SparkRuntimeEnvironment environment;public SparkBatchExecution(SparkRuntimeEnvironment environment) throws ConfigRuntimeException {this.environment = environment;}@Overridepublic void execute(List<BatchReader> readers, List<BatchTransformer> transformers, List<BatchWriter> writers) {// 为每一个reader注册输入临时表readers.forEach(reader -> registerInputTempView(reader, environment));if (!readers.isEmpty()) {// 取readers列表的第一个reader读取数据集合, reader的实现类有HiveReader、JdbcReaderDataset<Row> ds = readers.get(0).read(environment);for (BatchTransformer tf:transformers) {// 执行转换ds = executeTransformer(environment, tf, ds);
// 将转换后结果写到临时表registerTransformTempView(tf, ds);}for (BatchWriter sink: writers) {// 执行将转换结果由writer输出, writer的实现类有JdbcWriter、LocalFileWriter、HdfsFileWriterexecuteWriter(environment, sink, ds);}}
// 结束environment.sparkSession().stop();}
}

SparkBatchExecution#registerInputTempView

//注册输入临时表, 临时表表名为OUTPUT_TABLE的名字private void registerInputTempView(BatchReader reader, SparkRuntimeEnvironment environment) {Config conf = reader.getConfig();if (Boolean.TRUE.equals(conf.has(OUTPUT_TABLE))) {// ops_ms_alarmString tableName = conf.getString(OUTPUT_TABLE);        registerTempView(tableName, reader.read(environment));} else {throw new ConfigRuntimeException("[" + reader.getClass().getName() + "] must be registered as dataset, please set \"output_table\" config");}}

调用 Dataset.createOrReplaceTempView方法

private void registerTempView(String tableName, Dataset<Row> ds) {if (ds != null) {ds.createOrReplaceTempView(tableName);} else {throw new ConfigRuntimeException("dataset is null, can not createOrReplaceTempView");}
}

执行转换executeTransformer

private Dataset<Row> executeTransformer(SparkRuntimeEnvironment environment, BatchTransformer transformer, Dataset<Row> dataset) {Config config = transformer.getConfig();Dataset<Row> inputDataset;Dataset<Row> outputDataset = null;if (Boolean.TRUE.equals(config.has(INPUT_TABLE))) {// 从INPUT_TABLE获取表名String[] tableNames = config.getString(INPUT_TABLE).split(",");// outputDataset合并了inputDataset数据集合for (String sourceTableName: tableNames) {inputDataset = environment.sparkSession().read().table(sourceTableName);if (outputDataset == null) {outputDataset = inputDataset;} else {outputDataset = outputDataset.union(inputDataset);}}} else {//  配置文件无INPUT_TABLEoutputDataset = dataset;}
// 如果配置文件中配置了TMP_TABLE, 将outputDataset 注册到TempViewif (Boolean.TRUE.equals(config.has(TMP_TABLE))) {if (outputDataset == null) {outputDataset = dataset;}String tableName = config.getString(TMP_TABLE);registerTempView(tableName, outputDataset);}
//  转换器进行转换return transformer.transform(outputDataset, environment);
}

SqlTransformer#transform 最终是使用spark-sql进行处理, 所以核心还是这个sql语句,sql 需要在web端生成好,参加前面的json文件。

public class SqlTransformer implements BatchTransformer {private final Config config;public SqlTransformer(Config config) {this.config = config;}
//...@Overridepublic Dataset<Row> transform(Dataset<Row> data, SparkRuntimeEnvironment env) {return env.sparkSession().sql(config.getString(SQL));}
}

将数据输出到指定的位置executeWriter

private void executeWriter(SparkRuntimeEnvironment environment, BatchWriter writer, Dataset<Row> ds) {Config config = writer.getConfig();Dataset<Row> inputDataSet = ds;if (Boolean.TRUE.equals(config.has(INPUT_TABLE))) {String sourceTableName = config.getString(INPUT_TABLE);inputDataSet = environment.sparkSession().read().table(sourceTableName);}writer.write(inputDataSet, environment);
}

dolphinscheduler-数据质量-源码分析相关推荐

  1. django之:网页伪静态 JsonResponse form表单携带文件数据 CBV源码分析 模板语法传值 模板语法之过滤器 标签 自定义标签函数 过滤器、inclusion_tag模板的继承导入

    目录标题 一:网页伪静态 1.定义 2.如何实现 二:视图层 1.视图函数返回值问题 2.视图层返回json格式的数据 3.form表单携带文件数据 4.CBV源码分析 1.CBV和FBV: 2.CB ...

  2. Lpms-B2 IMU数据采源码分析 及 TCP/IP握手简单分析

    数据采集代码 源码的数据采集程序,可见第38行其中使用了pollData和update进行数据采集. void LpmsSensorManager::run(void) {MicroMeasure m ...

  3. solr dataimport 数据导入源码分析(二)

    上文说由DataImporter类进一步处理,DataImporter类的简要代码如下 /******************************************************* ...

  4. CopyOnWriteArrayList实现原理及源码分析

    点击上方"方志朋",选择"置顶或者星标" 你的关注意义重大! CopyOnWriteArrayList是Java并发包中提供的一个并发容器,它是个线程安全且读操 ...

  5. 深入理解GO语言:GC原理及源码分析

    Go 中的runtime 类似 Java的虚拟机,它负责管理包括内存分配.垃圾回收.栈处理.goroutine.channel.切片(slice).map 和反射(reflection)等.Go 的可 ...

  6. skywalking源码分析第十六篇一agent端JVMService之度量上报

    文章目录 原理图 原理图一基于MXBean进行Metrics数据收集 源码分析一JVMService 总结 原理图 通过prepare构建Metrics存储缓冲队列 初始化grpc客户端 通过boot ...

  7. NSQ源码分析之Topic

    什么是Topic Topic作为nsqd的重要组成部分,里面存在一些有趣的设计,单独开一篇文章进行学习. 每个nsqd实例旨在一次处理多个数据流.这些数据流称为"topics",一 ...

  8. 物联网协议之MQTT源码分析(二)

    此篇文章继上一篇物联网协议之MQTT源码分析(一)而写的第二篇MQTT发布消息以及接收Broker消息的源码分析,想看MQTT连接的小伙伴可以去看我上一篇哦. juejin.im/post/5cd66 ...

  9. 云客Drupal源码分析之类型化数据Typed Data API

    各位<云客drupal源码分析>系列的读者: 本系列一直以每周一篇的速度进行博客原创更新,希望帮助大家理解drupal底层原理,并缩短学习时间,但自<插件系统(上)>主题开始博 ...

最新文章

  1. 规范的 Commit Message
  2. 50个“杀手级”AI项目 !(附链接)
  3. libvirt(virt-install命令介绍)
  4. linux 内核可装载模块 版本检查机制
  5. java变量数据类型_Java——变量和数据类型
  6. openstack mysql 故障_Openstack mysql自发连接错误2006,“mysql服务器已经离开
  7. linux配置jdk环境
  8. LoadRunner监控Linux的三种方法
  9. 51单片机的定时器深入讲解
  10. SpringCloud-Learning
  11. oracle数据库11gr2,Oracle 11g R2 X64数据库安装
  12. MATLAB IIR滤波器设计函数buttord与butter
  13. zabbix监控之模板使用、网络发现及邮件报警功能
  14. 微信打开网页:如需浏览,请长按网址复制后使用浏览器访问怎么解决
  15. mPaas集成项目、新建mPaaS项目
  16. pdproxy度盘下载器不限速(xp版也可用) v2021
  17. cocos 《成语接龙》---Button(按钮)组件
  18. SharePoint - CAML
  19. try固定搭配_英语词汇:regret cease try等词的固定搭配用法
  20. 软件测试1 软件测试分类

热门文章

  1. 单位个人邮箱怎么注册,如何建立设置个人邮箱?
  2. hbase在海量用户日志中快速查询访问路径的使用场景
  3. 设计模式系列文章-1.设计模式的概述及UML图
  4. android自带语音识别,Android如何实现自带谷歌语音识别垃圾分类APP
  5. 【转】iOS游戏/应用的营销及推广技巧(1)
  6. 计算机课第一章答案,计算机应用基础第一章课后习题参考答案答案
  7. 计算机软件与理论专业大学排名,2012年计算机软件与理论、计算机应用技术分专业全国大学排名...
  8. 软帝java_「软帝学院」Java怎么学?从零开始学Java!
  9. jsp和html的区别以及jsp是如何实现动态的
  10. java Set 遍历