ChunJun源码分析——任务提交
ChunJun源码分析——任务提交
- 任务提交的参数配置解析
- ChunJun任务-SYNC
- ChunJun任务-SQL
- 版本 ChunJun 1.12
- 注:阅读过Spark任务提交、Flink任务提交源码的朋友,应该可以看出“从SHELL提交到任务,到反射执行实际的FLINK应用代码(main方法)”部分和Spark、Flink非常相似
任务提交的参数配置解析
- 任务提交命令样例
# 手动执行的内容-样例
sh bin/chunjun-local.sh -job my-examples/task_script_multi_table.json# chunjun打印的信息-样例
start command: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.el7_9.x86_64/bin/java -cp /home/chunjun/chunjun-dist/../lib/* com.dtstack.chunjun.client.Launcher -job my-examples/task_script_multi_table.json -mode local -jobType sync -chunjunDistDir /home/chunjun/chunjun-dist
- 该命令会执行Java,运行class
com.dtstack.chunjun.client.Launcher
的main方法
package com.dtstack.chunjun.client;// import ...public class Launcher {// code ...public static void main(String[] args) throws Exception {OptionParser optionParser = new OptionParser(args);Options launcherOptions = optionParser.getOptions();findDefaultConfigDir(launcherOptions);List<String> argList = optionParser.getProgramExeArgList();// 将argList转化为HashMap,方便通过参数名称来获取参数值HashMap<String, String> temp = new HashMap<>(16);for (int i = 0; i < argList.size(); i += 2) {temp.put(argList.get(i), argList.get(i + 1));}// 清空list,填充修改后的参数值argList.clear();for (int i = 0; i < temp.size(); i++) {argList.add(temp.keySet().toArray()[i].toString());argList.add(temp.values().toArray()[i].toString());}JobDeployer jobDeployer = new JobDeployer(launcherOptions, argList);ClusterClientHelper clusterClientHelper;switch (ClusterMode.getByName(launcherOptions.getMode())) {case local:clusterClientHelper = new LocalClusterClientHelper();break;case standalone:clusterClientHelper = new StandaloneClusterClientHelper();break;case yarnSession:clusterClientHelper = new YarnSessionClusterClientHelper();break;case yarnPerJob:clusterClientHelper = new YarnPerJobClusterClientHelper();break;case yarnApplication:throw new ClusterDeploymentException("Application Mode not supported by Yarn deployments.");case kubernetesSession:clusterClientHelper = new KubernetesSessionClusterClientHelper();break;case kubernetesPerJob:throw new ClusterDeploymentException("Per-Job Mode not supported by Kubernetes deployments.");case kubernetesApplication:clusterClientHelper = new KubernetesApplicationClusterClientHelper();break;default:throw new ClusterDeploymentException(launcherOptions.getMode() + " Mode not supported.");}// add ext classURLClassLoader urlClassLoader = (URLClassLoader) Launcher.class.getClassLoader();List<URL> jarUrlList = ExecuteProcessHelper.getExternalJarUrls(launcherOptions.getAddjar());ClassLoaderManager.loadExtraJar(jarUrlList, urlClassLoader);clusterClientHelper.submit(jobDeployer);}// code ...
}
- 当调用
optionParser.getProgramExeArgList()
时,会通过job
参数对应的值(即文件路径)读取文件的内容(sync、sql脚本)
package com.dtstack.chunjun.options;// import ...public class OptionParser {@VisibleForTesting protected static final String OPTION_JOB = "job";// code ...public List<String> getProgramExeArgList() throws Exception {Map<String, Object> mapConf = MapUtil.objectToMap(properties);List<String> args = new ArrayList<>();for (Map.Entry<String, Object> one : mapConf.entrySet()) {String key = one.getKey();Object value = one.getValue();if (value == null) {continue;} else if (OPTION_JOB.equalsIgnoreCase(key)) {File file = new File(value.toString());try (FileInputStream in = new FileInputStream(file)) {byte[] fileContent = new byte[(int) file.length()];in.read(fileContent);value =URLEncoder.encode(new String(fileContent, StandardCharsets.UTF_8),StandardCharsets.UTF_8.name());}}args.add("-" + key);args.add(value.toString());}return args;}// code ...
}
- 不同类型的任务会走不同的
ClusterClientHelper
,例如LocalClusterClientHelper
package com.dtstack.chunjun.client.local;// import ...public class LocalClusterClientHelper implements ClusterClientHelper {@Overridepublic ClusterClient submit(JobDeployer jobDeployer) throws Exception {String[] args = jobDeployer.getProgramArgs().toArray(new String[0]);Main.main(args);return null;}
}
- 接着
LocalClusterClientHelper
会执行Main
中的main
方法,传入参数(其他模式下也是类似的,会利用PluginInfoUtil.getMainClass()
获取到要执行的class) - 之前提交的
args
会随着调用传进main
方法,经过解析、处理后,再分别按SQL、SYNC区分任务种类,将replacedJob(即我们编写的任务脚本)传入
package com.dtstack.chunjun;// import ...public class Main {public static Logger LOG = LoggerFactory.getLogger(Main.class);public static void main(String[] args) throws Exception {LOG.info("------------program params-------------------------");Arrays.stream(args).forEach(arg -> LOG.info("{}", arg));LOG.info("-------------------------------------------");Options options = new OptionParser(args).getOptions();String job = URLDecoder.decode(options.getJob(), StandardCharsets.UTF_8.name());String replacedJob = JobUtil.replaceJobParameter(options.getP(), job);Properties confProperties = PropertiesUtil.parseConf(options.getConfProp());StreamExecutionEnvironment env = EnvFactory.createStreamExecutionEnvironment(options);StreamTableEnvironment tEnv =EnvFactory.createStreamTableEnvironment(env, confProperties, options.getJobName());LOG.info("Register to table configuration:{}",tEnv.getConfig().getConfiguration().toString());switch (EJobType.getByName(options.getJobType())) {case SQL:exeSqlJob(env, tEnv, replacedJob, options);break;case SYNC:exeSyncJob(env, tEnv, replacedJob, options);break;default:throw new ChunJunRuntimeException("unknown jobType: ["+ options.getJobType()+ "], jobType must in [SQL, SYNC].");}LOG.info("program {} execution success", options.getJobName());}// code ...
}
ChunJun任务-SYNC
- 以
SYNC
为例,接着会调用exeSyncJob
- 对于任务脚本的解析:会再依次调用
parseConf
和SyncConf.parseJob
,最终利用Gson
将任务脚本解析为com.dtstack.chunjun.conf.SyncConf
对象。关键代码如下:- exeSyncJob
package com.dtstack.chunjun;public class Main {// code ...private static void exeSyncJob(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,String job,Options options)throws Exception {SyncConf config = parseConf(job, options);// code ...}// code ...
}
- parseConf
package com.dtstack.chunjun;public class Main {// code ...public static SyncConf parseConf(String job, Options options) {SyncConf config;try {config = SyncConf.parseJob(job);// code ...} catch (Exception e) {throw new ChunJunRuntimeException(e);}return config;}// code ...
}
- SyncConf.parseJob
package com.dtstack.chunjun.conf;// import ...public class SyncConf {// code ...public static SyncConf parseJob(String jobJson) {SyncConf config = GsonUtil.GSON.fromJson(jobJson, SyncConf.class);checkJob(config);return config;}// code ...
}
com.dtstack.chunjun.conf.SyncConf
中有成员变量JobConf job
等
package com.dtstack.chunjun.conf;// import ...public class SyncConf implements Serializable {private static final long serialVersionUID = 1L;/** ChunJun job */private JobConf job;/** ChunJun提交端的插件包路径 */private String pluginRoot;/** ChunJun运行时服务器上的远程端插件包路径 */private String remotePluginPath;private String savePointPath;/** 本次任务所需插件jar包路径列表 */private List<String> syncJarList;// code ...
}
com.dtstack.chunjun.conf.JobConf
中则有我们配置的任务脚本中的content
、setting
- 可以看到
content
虽然在脚本中是JSONArray,但解析时目前只处理了第一条配置
- 可以看到
package com.dtstack.chunjun.conf;// import ...public class JobConf implements Serializable {private static final long serialVersionUID = 1L;private LinkedList<ContentConf> content;private SettingConf setting = new SettingConf();public OperatorConf getReader() {return content.get(0).getReader();}public OperatorConf getWriter() {return content.get(0).getWriter();}public CdcConf getCdcConf() {return content.get(0).getRestoration();}public MappingConf getNameMapping() {return content.get(0).getNameMapping();}public TransformerConf getTransformer() {return content.get(0).getTransformer();}public LinkedList<ContentConf> getContent() {return content;}public void setContent(LinkedList<ContentConf> content) {this.content = content;}public SettingConf getSetting() {return setting;}public void setSetting(SettingConf setting) {this.setting = setting;}@Overridepublic String toString() {return "JobConf{" + "content=" + content + ", setting=" + setting + '}';}
}
- 跳回
com.dtstack.chunjun.Main
,再看exeSyncJob
方法
package com.dtstack.chunjun;// import ...public class Main {// code ...private static void exeSyncJob(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,String job,Options options)throws Exception {SyncConf config = parseConf(job, options);configStreamExecutionEnvironment(env, options, config);SourceFactory sourceFactory = DataSyncFactoryUtil.discoverSource(config, env);DataStream<RowData> dataStreamSource = sourceFactory.createSource();SpeedConf speed = config.getSpeed();if (speed.getReaderChannel() > 0) {dataStreamSource =((DataStreamSource<RowData>) dataStreamSource).setParallelism(speed.getReaderChannel());}dataStreamSource = addMappingOperator(config, dataStreamSource);if (null != config.getCdcConf()&& (null != config.getCdcConf().getDdl()&& null != config.getCdcConf().getCache())) {CdcConf cdcConf = config.getCdcConf();DDLHandler ddlHandler = DataSyncFactoryUtil.discoverDdlHandler(cdcConf, config);CacheHandler cacheHandler = DataSyncFactoryUtil.discoverCacheHandler(cdcConf, config);dataStreamSource =dataStreamSource.flatMap(new RestorationFlatMap(ddlHandler, cacheHandler, cdcConf));}DataStream<RowData> dataStream;boolean transformer =config.getTransformer() != null&& StringUtils.isNotBlank(config.getTransformer().getTransformSql());if (transformer) {dataStream = syncStreamToTable(tableEnv, config, dataStreamSource);} else {dataStream = dataStreamSource;}if (speed.isRebalance()) {dataStream = dataStream.rebalance();}SinkFactory sinkFactory = DataSyncFactoryUtil.discoverSink(config);DataStreamSink<RowData> dataStreamSink = sinkFactory.createSink(dataStream);if (speed.getWriterChannel() > 0) {dataStreamSink.setParallelism(speed.getWriterChannel());}JobExecutionResult result = env.execute(options.getJobName());if (env instanceof MyLocalStreamEnvironment) {PrintUtil.printResult(result.getAllAccumulatorResults());}}// code ...
}
DataSyncFactoryUtil.discoverSource
处,利用任务脚本中配置的reader名称拿到了插件的ClassName,再利用反射构建出了对应的SourceFactory
SourceFactory
对应每个connector插件,都有自己的具体实现,例如MysqlSourceFactory
、KafkaSourceFactory
、Elasticsearch7SourceFactory
等DataSyncFactoryUtil
下还有一个discoverSink
方法,用于构建SinkFactory
,原理一样
public static SourceFactory discoverSource(SyncConf config, StreamExecutionEnvironment env) {try {String pluginName = config.getJob().getReader().getName();String pluginClassName = PluginUtil.getPluginClassName(pluginName, OperatorType.source);return ClassLoaderManager.newInstance(config.getSyncJarList(),cl -> {Class<?> clazz = cl.loadClass(pluginClassName);Constructor<?> constructor =clazz.getConstructor(SyncConf.class, StreamExecutionEnvironment.class);return (SourceFactory) constructor.newInstance(config, env);});} catch (Exception e) {throw new ChunJunRuntimeException(e);}}public static SinkFactory discoverSink(SyncConf config) {// code ...}
- 接着
exeSyncJob
中调用sourceFactory.createSource()
,创建了Flink的DataStream<RowData>
- 查看
createSource
方法内部,可以发现其原理是调用了Flink的env.addSource(...)
构建Source
- 查看
- 接着
exeSyncJob
中根据任务脚本中的speed.readerChannel
配置,对DataStreamSource
的并行度进行调整- 默认使用的是
channel
配置,由exeSyncJob
方法中调用的configStreamExecutionEnvironment
方法可以看出
- 默认使用的是
- 接着
exeSyncJob
中调用了addMappingOperator
,利用任务脚本中的nameMapping
对表的元数据做转换 - 接着
exeSyncJob
中做了对CDC配置的处理 - 接着
exeSyncJob
中,对原始数据做了转换处理。用的转换逻辑是任务脚本中配置的transformSql
,代码中主要用到了Flink的tableEnv,如下- 将前面创建的
DataStreamSource
注册为Table,并创建一个视图表名(任务脚本中的reader对应的tableName) - 调用
tableEnv.sqlQuery(...)
,传入transformSql
,对前一步的视图表进行转换 - 最后将得到的DataStream再次创建为一个视图表名(任务脚本中的writer对应的tableName)。注:目前看起来这个逻辑没什么用,后续没用到。
- 将前面创建的
- 接着
exeSyncJob
中,调用DataSyncFactoryUtil.discoverSink
,生成对应的SinkFactory
。再调用SinkFactory.createSink
利用FlinkAPI(dataSet.addSink(..)
),完成了数据输出的逻辑构建。 - 最终,调用
env.execute(options.getJobName())
,开始执行Flink任务
ChunJun任务-SQL
- 以
SQL
为例,接着会调用exeSqlJob
package com.dtstack.chunjun;// import ...class Main {// code ..private static void exeSqlJob(StreamExecutionEnvironment env,StreamTableEnvironment tableEnv,String job,Options options) {try {configStreamExecutionEnvironment(env, options, null);List<URL> jarUrlList = ExecuteProcessHelper.getExternalJarUrls(options.getAddjar());StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tableEnv);TableResult execute = statementSet.execute();if (env instanceof MyLocalStreamEnvironment) {Optional<JobClient> jobClient = execute.getJobClient();if (jobClient.isPresent()) {PrintUtil.printResult(jobClient.get().getAccumulators().get());}}} catch (Exception e) {throw new ChunJunRuntimeException(e);} finally {FactoryUtil.getFactoryHelperThreadLocal().remove();TableFactoryService.getFactoryHelperThreadLocal().remove();}}// code ...
}
exeSqlJob
方法中最关键的是调用了SqlParser.parseSql(job, jarUrlList, tableEnv)
,生成Flink的StatementSet
package com.dtstack.chunjun.sql.parser;// import ...public class SqlParser {private static final char SQL_DELIMITER = ';';/*** flink support sql syntax CREATE TABLE sls_stream() with (); CREATE (TABLE|SCALA) FUNCTION* fcnName WITH com.dtstack.com; insert into tb1 select * from tb2;** @param*/public static StatementSet parseSql(String sql, List<URL> urlList, StreamTableEnvironment tableEnvironment) {if (StringUtils.isBlank(sql)) {throw new IllegalArgumentException("SQL must be not empty!");}sql = DtStringUtil.dealSqlComment(sql);StatementSet statement = tableEnvironment.createStatementSet();Splitter splitter = new Splitter(SQL_DELIMITER);List<String> stmts = splitter.splitEscaped(sql);AbstractStmtParser stmtParser = createParserChain();stmts.stream().filter(stmt -> !Strings.isNullOrEmpty(stmt.trim())).forEach(stmt -> {try {stmtParser.handleStmt(stmt, tableEnvironment, statement, urlList);} catch (Exception e) {throw new ChunJunSqlParseException(PwdUtil.desensitization(stmt), e.getMessage(), e);}});return statement;}// code ...
}
parseSql
中对SQL做了一系列解析- 调用
DtStringUtil.dealSqlComment(sql)
,删除注释内容 - 调用Flink的API,创建
StatementSet
,后续返回,用于最后执行 - 使用
Splitter
,用符号;
将sql字符串进行拆分,生成了多个独立sql语句 - 然后用责任链设计模式构建了ParserChain:UploadFileStmtParser -> CreateFunctionStmtParser -> InsertStmtParser
- 最后用
stream
按顺序将每条独立的sql进行解析处理(上面的解析链)
- 调用
- 调用
createParserChain
生成解析链,处理sql字符串时会依次执行:UploadFileStmtParser -> CreateFunctionStmtParser -> InsertStmtParser- UploadFileStmtParser
- 字符串需符合条件:正则
"(?i).*add\\s+file\\s+.+|(?i).*add\\s+jar\\s+.+"
- 执行内容:目前是空的
- 字符串需符合条件:正则
- CreateFunctionStmtParser
- 字符串需符合条件:正则
"(?i)\\s*CREATE\\s+(scalar|table|aggregate)\\s+FUNCTION\\s+(\\S+)\\s+WITH\\s+(\\S+)"
- 执行内容:利用ClassLoader加载class,注册自定义函数UDF
- 字符串需符合条件:正则
- InsertStmtParser
- 字符串需符合条件:以
insert
作为起始,即INSERT
语句 - 执行内容:执行
statementSet.addInsertSql(sql)
,将sql添加到StatementSet
中
- 字符串需符合条件:以
- 当sql字符串与前面所有的Parser都不符时,那么会调用
tEnv.executeSql(stmt)
,直接执行(例如CREATE
语句)
- UploadFileStmtParser
- 当执行完对于所有sql的解析处理后,会将包含
INSERT
语句的StatementSet
返回,最终调用statementSet.execute()
- sql在
parseSql
阶段,已经先将CREATE
语句执行完成 - 在最后才将留下来的
INSERT
语句进行处理 - 这样就不会出现“先
INSERT表
,后CREATE表
”的逻辑问题
- sql在
ChunJun源码分析——任务提交相关推荐
- MapReduce中Client提交Job源码分析
回顾 在进行submit源码分析之前,先来回顾一下WordCount案例(点击查看WordCount案例).仔细回想一下曾经Client都干了点啥?获取对象-->一通set-->job.w ...
- 【Android 电量优化】JobScheduler 源码分析 ( JobServiceContext 源码分析 | 闭环操作总结 | 用户提交任务 | 广播接收者接受相关广播触发任务执行 )★
文章目录 一.JobServiceContext 引入 二.JobServiceContext 源码分析 三.用户在应用层如何使用 JobScheduler 四.用户提交任务 五.广播接收者监听广播触 ...
- RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)
在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...
- spark 源码分析之二十 -- Stage的提交
引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...
- Spark详解(七):SparkContext源码分析以及整体作业提交流程
1. SparkContext源码分析 在任何Spark程序中,必须要创建一个SparkContext,在SparkContext中,最主要的就是创建了TaskScheduler和DAGSchedul ...
- MyBatis原理分析之四:一次SQL查询的源码分析
上回我们讲到Mybatis加载相关的配置文件进行初始化,这回我们讲一下一次SQL查询怎么进行的. 准备工作 Mybatis完成一次SQL查询需要使用的代码如下: Java代码 String res ...
- Spark源码分析之七:Task运行(一)
在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...
- Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)
一.综述 HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode.DataNode.DFSClient等众多角色的分工与合作. 首先上一段代码,客户端是如何写文件的: ...
- mybatis源码分析之事务管理器
2019独角兽企业重金招聘Python工程师标准>>> 上一篇:mybatis源码分析之Configuration 主要分析了构建SqlSessionFactory的过程中配置文件的 ...
最新文章
- Springboot结合 framework 加载静资源 出现404 问题解决 记录
- 【408预推免复习】计算机网络(谢希仁第七版)第一章——概述
- 【正一专栏】评深圳西乡砍人案——不要无辜的底层伤害
- Service Mesh 和 API Gateway 关系深度探讨
- Android fingerprint指纹/face人脸代码导读
- 机器学习(3.文本数据的特征抽取(第一种))
- Zeroc Ice 发布订阅者之demo Icestorm之clock
- b样条和三次样条_样条曲线
- 每日一笑 | IE的反射弧也太长了吧......
- java todo error_java.sql.SQLException: sql injection violation, syntax error: TODO UNIQUE unique
- python中模块导入问题(已解决)
- MySQL federated引擎试验
- 如何根据相机的参数知道摄像机的内参数矩阵
- JAVA 算法之穷举法
- 摄像头(Camera)图像测试(以Imatest为主要工具)V1.0
- 概率论考点之检验统计量(区间估计)
- Wannafly Winter Camp 2019 Day2 H Cosmic Cleaner (球体相交体积(球冠体积公式))
- DeFCN debug记录(训练过程),以及对cvpods框架的分析
- QNX系统的实时性分析-实时性能测试标准
- Keras开发环境安装方法新手教程(GPU版)