ChunJun源码分析——任务提交

  • 任务提交的参数配置解析
  • ChunJun任务-SYNC
  • ChunJun任务-SQL
  • 版本 ChunJun 1.12
  • 注:阅读过Spark任务提交、Flink任务提交源码的朋友,应该可以看出“从SHELL提交到任务,到反射执行实际的FLINK应用代码(main方法)”部分和Spark、Flink非常相似

任务提交的参数配置解析

  1. 任务提交命令样例
# 手动执行的内容-样例
sh bin/chunjun-local.sh  -job my-examples/task_script_multi_table.json# chunjun打印的信息-样例
start command: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.el7_9.x86_64/bin/java -cp /home/chunjun/chunjun-dist/../lib/* com.dtstack.chunjun.client.Launcher -job my-examples/task_script_multi_table.json -mode local -jobType sync -chunjunDistDir /home/chunjun/chunjun-dist
  1. 该命令会执行Java,运行class com.dtstack.chunjun.client.Launcher的main方法
package com.dtstack.chunjun.client;// import ...public class Launcher {// code ...public static void main(String[] args) throws Exception {OptionParser optionParser = new OptionParser(args);Options launcherOptions = optionParser.getOptions();findDefaultConfigDir(launcherOptions);List<String> argList = optionParser.getProgramExeArgList();// 将argList转化为HashMap,方便通过参数名称来获取参数值HashMap<String, String> temp = new HashMap<>(16);for (int i = 0; i < argList.size(); i += 2) {temp.put(argList.get(i), argList.get(i + 1));}// 清空list,填充修改后的参数值argList.clear();for (int i = 0; i < temp.size(); i++) {argList.add(temp.keySet().toArray()[i].toString());argList.add(temp.values().toArray()[i].toString());}JobDeployer jobDeployer = new JobDeployer(launcherOptions, argList);ClusterClientHelper clusterClientHelper;switch (ClusterMode.getByName(launcherOptions.getMode())) {case local:clusterClientHelper = new LocalClusterClientHelper();break;case standalone:clusterClientHelper = new StandaloneClusterClientHelper();break;case yarnSession:clusterClientHelper = new YarnSessionClusterClientHelper();break;case yarnPerJob:clusterClientHelper = new YarnPerJobClusterClientHelper();break;case yarnApplication:throw new ClusterDeploymentException("Application Mode not supported by Yarn deployments.");case kubernetesSession:clusterClientHelper = new KubernetesSessionClusterClientHelper();break;case kubernetesPerJob:throw new ClusterDeploymentException("Per-Job Mode not supported by Kubernetes deployments.");case kubernetesApplication:clusterClientHelper = new KubernetesApplicationClusterClientHelper();break;default:throw new ClusterDeploymentException(launcherOptions.getMode() + " Mode not supported.");}// add ext classURLClassLoader urlClassLoader = (URLClassLoader) Launcher.class.getClassLoader();List<URL> jarUrlList = ExecuteProcessHelper.getExternalJarUrls(launcherOptions.getAddjar());ClassLoaderManager.loadExtraJar(jarUrlList, urlClassLoader);clusterClientHelper.submit(jobDeployer);}// code ...
}
  1. 当调用optionParser.getProgramExeArgList()时,会通过job参数对应的值(即文件路径)读取文件的内容(sync、sql脚本)
package com.dtstack.chunjun.options;// import ...public class OptionParser {@VisibleForTesting protected static final String OPTION_JOB = "job";// code ...public List<String> getProgramExeArgList() throws Exception {Map<String, Object> mapConf = MapUtil.objectToMap(properties);List<String> args = new ArrayList<>();for (Map.Entry<String, Object> one : mapConf.entrySet()) {String key = one.getKey();Object value = one.getValue();if (value == null) {continue;} else if (OPTION_JOB.equalsIgnoreCase(key)) {File file = new File(value.toString());try (FileInputStream in = new FileInputStream(file)) {byte[] fileContent = new byte[(int) file.length()];in.read(fileContent);value =URLEncoder.encode(new String(fileContent, StandardCharsets.UTF_8),StandardCharsets.UTF_8.name());}}args.add("-" + key);args.add(value.toString());}return args;}// code ...
}
  1. 不同类型的任务会走不同的ClusterClientHelper,例如LocalClusterClientHelper
package com.dtstack.chunjun.client.local;// import ...public class LocalClusterClientHelper implements ClusterClientHelper {@Overridepublic ClusterClient submit(JobDeployer jobDeployer) throws Exception {String[] args = jobDeployer.getProgramArgs().toArray(new String[0]);Main.main(args);return null;}
}
  1. 接着LocalClusterClientHelper会执行Main中的main方法,传入参数(其他模式下也是类似的,会利用PluginInfoUtil.getMainClass()获取到要执行的class)
  2. 之前提交的args会随着调用传进main方法,经过解析、处理后,再分别按SQL、SYNC区分任务种类,将replacedJob(即我们编写的任务脚本)传入
package com.dtstack.chunjun;// import ...public class Main {public static Logger LOG = LoggerFactory.getLogger(Main.class);public static void main(String[] args) throws Exception {LOG.info("------------program params-------------------------");Arrays.stream(args).forEach(arg -> LOG.info("{}", arg));LOG.info("-------------------------------------------");Options options = new OptionParser(args).getOptions();String job = URLDecoder.decode(options.getJob(), StandardCharsets.UTF_8.name());String replacedJob = JobUtil.replaceJobParameter(options.getP(), job);Properties confProperties = PropertiesUtil.parseConf(options.getConfProp());StreamExecutionEnvironment env = EnvFactory.createStreamExecutionEnvironment(options);StreamTableEnvironment tEnv =EnvFactory.createStreamTableEnvironment(env, confProperties, options.getJobName());LOG.info("Register to table configuration:{}",tEnv.getConfig().getConfiguration().toString());switch (EJobType.getByName(options.getJobType())) {case SQL:exeSqlJob(env, tEnv, replacedJob, options);break;case SYNC:exeSyncJob(env, tEnv, replacedJob, options);break;default:throw new ChunJunRuntimeException("unknown jobType: ["+ options.getJobType()+ "], jobType must in [SQL, SYNC].");}LOG.info("program {} execution success", options.getJobName());}// code ...
}

ChunJun任务-SYNC

  1. SYNC为例,接着会调用exeSyncJob
  2. 对于任务脚本的解析:会再依次调用parseConfSyncConf.parseJob,最终利用Gson将任务脚本解析为com.dtstack.chunjun.conf.SyncConf对象。关键代码如下:
    • exeSyncJob
package com.dtstack.chunjun;public class Main {// code ...private static void exeSyncJob(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,String job,Options options)throws Exception {SyncConf config = parseConf(job, options);// code ...}// code ...
}
  • parseConf
package com.dtstack.chunjun;public class Main {// code ...public static SyncConf parseConf(String job, Options options) {SyncConf config;try {config = SyncConf.parseJob(job);// code ...} catch (Exception e) {throw new ChunJunRuntimeException(e);}return config;}// code ...
}
  • SyncConf.parseJob
package com.dtstack.chunjun.conf;// import ...public class SyncConf {// code ...public static SyncConf parseJob(String jobJson) {SyncConf config = GsonUtil.GSON.fromJson(jobJson, SyncConf.class);checkJob(config);return config;}// code ...
}
  1. com.dtstack.chunjun.conf.SyncConf中有成员变量JobConf job
 package com.dtstack.chunjun.conf;// import ...public class SyncConf implements Serializable {private static final long serialVersionUID = 1L;/** ChunJun job */private JobConf job;/** ChunJun提交端的插件包路径 */private String pluginRoot;/** ChunJun运行时服务器上的远程端插件包路径 */private String remotePluginPath;private String savePointPath;/** 本次任务所需插件jar包路径列表 */private List<String> syncJarList;// code ...
}
  1. com.dtstack.chunjun.conf.JobConf中则有我们配置的任务脚本中的contentsetting

    • 可以看到content虽然在脚本中是JSONArray,但解析时目前只处理了第一条配置
package com.dtstack.chunjun.conf;// import ...public class JobConf implements Serializable {private static final long serialVersionUID = 1L;private LinkedList<ContentConf> content;private SettingConf setting = new SettingConf();public OperatorConf getReader() {return content.get(0).getReader();}public OperatorConf getWriter() {return content.get(0).getWriter();}public CdcConf getCdcConf() {return content.get(0).getRestoration();}public MappingConf getNameMapping() {return content.get(0).getNameMapping();}public TransformerConf getTransformer() {return content.get(0).getTransformer();}public LinkedList<ContentConf> getContent() {return content;}public void setContent(LinkedList<ContentConf> content) {this.content = content;}public SettingConf getSetting() {return setting;}public void setSetting(SettingConf setting) {this.setting = setting;}@Overridepublic String toString() {return "JobConf{" + "content=" + content + ", setting=" + setting + '}';}
}
  1. 跳回com.dtstack.chunjun.Main,再看exeSyncJob方法
package com.dtstack.chunjun;// import ...public class Main {// code ...private static void exeSyncJob(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,String job,Options options)throws Exception {SyncConf config = parseConf(job, options);configStreamExecutionEnvironment(env, options, config);SourceFactory sourceFactory = DataSyncFactoryUtil.discoverSource(config, env);DataStream<RowData> dataStreamSource = sourceFactory.createSource();SpeedConf speed = config.getSpeed();if (speed.getReaderChannel() > 0) {dataStreamSource =((DataStreamSource<RowData>) dataStreamSource).setParallelism(speed.getReaderChannel());}dataStreamSource = addMappingOperator(config, dataStreamSource);if (null != config.getCdcConf()&& (null != config.getCdcConf().getDdl()&& null != config.getCdcConf().getCache())) {CdcConf cdcConf = config.getCdcConf();DDLHandler ddlHandler = DataSyncFactoryUtil.discoverDdlHandler(cdcConf, config);CacheHandler cacheHandler = DataSyncFactoryUtil.discoverCacheHandler(cdcConf, config);dataStreamSource =dataStreamSource.flatMap(new RestorationFlatMap(ddlHandler, cacheHandler, cdcConf));}DataStream<RowData> dataStream;boolean transformer =config.getTransformer() != null&& StringUtils.isNotBlank(config.getTransformer().getTransformSql());if (transformer) {dataStream = syncStreamToTable(tableEnv, config, dataStreamSource);} else {dataStream = dataStreamSource;}if (speed.isRebalance()) {dataStream = dataStream.rebalance();}SinkFactory sinkFactory = DataSyncFactoryUtil.discoverSink(config);DataStreamSink<RowData> dataStreamSink = sinkFactory.createSink(dataStream);if (speed.getWriterChannel() > 0) {dataStreamSink.setParallelism(speed.getWriterChannel());}JobExecutionResult result = env.execute(options.getJobName());if (env instanceof MyLocalStreamEnvironment) {PrintUtil.printResult(result.getAllAccumulatorResults());}}// code ...
}
  1. DataSyncFactoryUtil.discoverSource处,利用任务脚本中配置的reader名称拿到了插件的ClassName,再利用反射构建出了对应的SourceFactory

    • SourceFactory对应每个connector插件,都有自己的具体实现,例如MysqlSourceFactoryKafkaSourceFactoryElasticsearch7SourceFactory
    • DataSyncFactoryUtil下还有一个discoverSink方法,用于构建SinkFactory,原理一样
    public static SourceFactory discoverSource(SyncConf config, StreamExecutionEnvironment env) {try {String pluginName = config.getJob().getReader().getName();String pluginClassName = PluginUtil.getPluginClassName(pluginName, OperatorType.source);return ClassLoaderManager.newInstance(config.getSyncJarList(),cl -> {Class<?> clazz = cl.loadClass(pluginClassName);Constructor<?> constructor =clazz.getConstructor(SyncConf.class, StreamExecutionEnvironment.class);return (SourceFactory) constructor.newInstance(config, env);});} catch (Exception e) {throw new ChunJunRuntimeException(e);}}public static SinkFactory discoverSink(SyncConf config) {// code ...}
  1. 接着exeSyncJob中调用sourceFactory.createSource(),创建了Flink的DataStream<RowData>

    • 查看createSource方法内部,可以发现其原理是调用了Flink的env.addSource(...)构建Source
  2. 接着exeSyncJob中根据任务脚本中的speed.readerChannel配置,对DataStreamSource的并行度进行调整
    • 默认使用的是channel配置,由exeSyncJob方法中调用的configStreamExecutionEnvironment方法可以看出
  3. 接着exeSyncJob中调用了addMappingOperator,利用任务脚本中的nameMapping对表的元数据做转换
  4. 接着exeSyncJob中做了对CDC配置的处理
  5. 接着exeSyncJob中,对原始数据做了转换处理。用的转换逻辑是任务脚本中配置的transformSql,代码中主要用到了Flink的tableEnv,如下
    • 将前面创建的DataStreamSource注册为Table,并创建一个视图表名(任务脚本中的reader对应的tableName)
    • 调用tableEnv.sqlQuery(...),传入transformSql,对前一步的视图表进行转换
    • 最后将得到的DataStream再次创建为一个视图表名(任务脚本中的writer对应的tableName)。注:目前看起来这个逻辑没什么用,后续没用到。
  6. 接着exeSyncJob中,调用DataSyncFactoryUtil.discoverSink,生成对应的SinkFactory。再调用SinkFactory.createSink利用FlinkAPI(dataSet.addSink(..)),完成了数据输出的逻辑构建。
  7. 最终,调用env.execute(options.getJobName()),开始执行Flink任务

ChunJun任务-SQL

  1. SQL为例,接着会调用exeSqlJob
package com.dtstack.chunjun;// import ...class Main {// code ..private static void exeSqlJob(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,String job,Options options) {try {configStreamExecutionEnvironment(env, options, null);List<URL> jarUrlList = ExecuteProcessHelper.getExternalJarUrls(options.getAddjar());StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tableEnv);TableResult execute = statementSet.execute();if (env instanceof MyLocalStreamEnvironment) {Optional<JobClient> jobClient = execute.getJobClient();if (jobClient.isPresent()) {PrintUtil.printResult(jobClient.get().getAccumulators().get());}}} catch (Exception e) {throw new ChunJunRuntimeException(e);} finally {FactoryUtil.getFactoryHelperThreadLocal().remove();TableFactoryService.getFactoryHelperThreadLocal().remove();}}// code ...
}
  1. exeSqlJob方法中最关键的是调用了SqlParser.parseSql(job, jarUrlList, tableEnv),生成Flink的StatementSet
package com.dtstack.chunjun.sql.parser;// import ...public class SqlParser {private static final char SQL_DELIMITER = ';';/*** flink support sql syntax CREATE TABLE sls_stream() with (); CREATE (TABLE|SCALA) FUNCTION* fcnName WITH com.dtstack.com; insert into tb1 select * from tb2;** @param*/public static StatementSet parseSql(String sql, List<URL> urlList, StreamTableEnvironment tableEnvironment) {if (StringUtils.isBlank(sql)) {throw new IllegalArgumentException("SQL must be not empty!");}sql = DtStringUtil.dealSqlComment(sql);StatementSet statement = tableEnvironment.createStatementSet();Splitter splitter = new Splitter(SQL_DELIMITER);List<String> stmts = splitter.splitEscaped(sql);AbstractStmtParser stmtParser = createParserChain();stmts.stream().filter(stmt -> !Strings.isNullOrEmpty(stmt.trim())).forEach(stmt -> {try {stmtParser.handleStmt(stmt, tableEnvironment, statement, urlList);} catch (Exception e) {throw new ChunJunSqlParseException(PwdUtil.desensitization(stmt), e.getMessage(), e);}});return statement;}// code ...
}
  1. parseSql中对SQL做了一系列解析

    1. 调用DtStringUtil.dealSqlComment(sql),删除注释内容
    2. 调用Flink的API,创建StatementSet,后续返回,用于最后执行
    3. 使用Splitter,用符号;将sql字符串进行拆分,生成了多个独立sql语句
    4. 然后用责任链设计模式构建了ParserChain:UploadFileStmtParser -> CreateFunctionStmtParser -> InsertStmtParser
    5. 最后用stream按顺序将每条独立的sql进行解析处理(上面的解析链)
  2. 调用createParserChain生成解析链,处理sql字符串时会依次执行:UploadFileStmtParser -> CreateFunctionStmtParser -> InsertStmtParser
    1. UploadFileStmtParser

      • 字符串需符合条件:正则"(?i).*add\\s+file\\s+.+|(?i).*add\\s+jar\\s+.+"
      • 执行内容:目前是空的
    2. CreateFunctionStmtParser
      • 字符串需符合条件:正则"(?i)\\s*CREATE\\s+(scalar|table|aggregate)\\s+FUNCTION\\s+(\\S+)\\s+WITH\\s+(\\S+)"
      • 执行内容:利用ClassLoader加载class,注册自定义函数UDF
    3. InsertStmtParser
      • 字符串需符合条件:以insert作为起始,即INSERT语句
      • 执行内容:执行statementSet.addInsertSql(sql),将sql添加到StatementSet
    4. 当sql字符串与前面所有的Parser都不符时,那么会调用tEnv.executeSql(stmt),直接执行(例如CREATE语句)
  3. 当执行完对于所有sql的解析处理后,会将包含INSERT语句的StatementSet返回,最终调用statementSet.execute()
    1. sql在parseSql阶段,已经先将CREATE语句执行完成
    2. 在最后才将留下来的INSERT语句进行处理
    3. 这样就不会出现“先INSERT表,后CREATE表”的逻辑问题

ChunJun源码分析——任务提交相关推荐

  1. MapReduce中Client提交Job源码分析

    回顾 在进行submit源码分析之前,先来回顾一下WordCount案例(点击查看WordCount案例).仔细回想一下曾经Client都干了点啥?获取对象-->一通set-->job.w ...

  2. 【Android 电量优化】JobScheduler 源码分析 ( JobServiceContext 源码分析 | 闭环操作总结 | 用户提交任务 | 广播接收者接受相关广播触发任务执行 )★

    文章目录 一.JobServiceContext 引入 二.JobServiceContext 源码分析 三.用户在应用层如何使用 JobScheduler 四.用户提交任务 五.广播接收者监听广播触 ...

  3. RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)

    在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...

  4. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  5. Spark详解(七):SparkContext源码分析以及整体作业提交流程

    1. SparkContext源码分析 在任何Spark程序中,必须要创建一个SparkContext,在SparkContext中,最主要的就是创建了TaskScheduler和DAGSchedul ...

  6. MyBatis原理分析之四:一次SQL查询的源码分析

    上回我们讲到Mybatis加载相关的配置文件进行初始化,这回我们讲一下一次SQL查询怎么进行的. 准备工作 Mybatis完成一次SQL查询需要使用的代码如下: Java代码   String res ...

  7. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  8. Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)

    一.综述 HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode.DataNode.DFSClient等众多角色的分工与合作. 首先上一段代码,客户端是如何写文件的: ...

  9. mybatis源码分析之事务管理器

    2019独角兽企业重金招聘Python工程师标准>>> 上一篇:mybatis源码分析之Configuration 主要分析了构建SqlSessionFactory的过程中配置文件的 ...

最新文章

  1. Springboot结合 framework 加载静资源 出现404 问题解决 记录
  2. 【408预推免复习】计算机网络(谢希仁第七版)第一章——概述
  3. 【正一专栏】评深圳西乡砍人案——不要无辜的底层伤害
  4. Service Mesh 和 API Gateway 关系深度探讨
  5. Android fingerprint指纹/face人脸代码导读
  6. 机器学习(3.文本数据的特征抽取(第一种))
  7. Zeroc Ice 发布订阅者之demo Icestorm之clock
  8. b样条和三次样条_样条曲线
  9. 每日一笑 | IE的反射弧也太长了吧......
  10. java todo error_java.sql.SQLException: sql injection violation, syntax error: TODO UNIQUE unique
  11. python中模块导入问题(已解决)
  12. MySQL federated引擎试验
  13. 如何根据相机的参数知道摄像机的内参数矩阵
  14. JAVA 算法之穷举法
  15. 摄像头(Camera)图像测试(以Imatest为主要工具)V1.0
  16. 概率论考点之检验统计量(区间估计)
  17. Wannafly Winter Camp 2019 Day2 H Cosmic Cleaner (球体相交体积(球冠体积公式))
  18. DeFCN debug记录(训练过程),以及对cvpods框架的分析
  19. QNX系统的实时性分析-实时性能测试标准
  20. Keras开发环境安装方法新手教程(GPU版)

热门文章

  1. Data Lake数据湖详解2.0
  2. 5A景区破产!千名老板陷落!景区如何盈利?旅游如何破局?
  3. 习题 3.10 根据函数编写一程序,输入x,输出y值
  4. ArangoDB-AQL简单操作
  5. 【蓝牙学习笔记二】初识BLE蓝牙协议栈
  6. 搜狗前三季度亏损5.4亿元:广告收入下滑,预计将在Q4退市
  7. 鸿蒙运行内存4G手机,手机运行内存真的越大越好吗?其实你们都被忽悠了
  8. M1 Mac mini 使用半年体验 - Mac的新未来
  9. Unity多点触摸屏幕交互之TouchScript(二)单个物体为对象的内容交互
  10. ITSS运维资质认证再评估相关事项及准备资料