spark job server原理
配置相关
- settings.sh
- 功能:配置环境变量
- APP_USER/APP_GROUP:作业提交用户和组
- JMX_PORT:java jmx端口,通常在aws或者其他容器里打开
- INSTALL_DIR:sjs所做目录
- LOG_DIR:日志路径
- PIDFILE:启动sjs,产生pid存放的文件名
- JOBSERVER_MEMORY:启动spark作业的driverMem
- SPARK_VERSION:指定spark版本
- SCALA_VERSION:scala版本
- SPARK_HOME、SPARK_LOG_DIR、SPARK_CONF_DIR:spark相关配置
- YARN_CONF_DIR、HADOOP_CONF_DIR:yarn配置
- local.conf
- spark.master:指定spark提交的类型,yarn-client、local[4]等
- spark.jobserver
- port:指定jobServer的启动端口,使用此端口进行作业提交和监控等
- context-per-jvm:是否每个context都启动一个独立的进程
- jobdao:指定处理jobs、jars等逻辑的类
- datadao:通过
POST/data
上传到sjs的文件存放路径 - sqldao:当
jobdao
指定为JobSqlDAO
时使用- slick-driver
- jdbc-driver
- rootdir:H2 driver存放数据目录
- jdbc:连接
- dbcp:连接池
- result-chunk-size
- 作业返回值使用分块传输,每块大小
- spark.contexts:启动sjs自动加载的context配置
- 名字
- context-settings:启动context,即app,相关配置
- num-cpu-cores:core个数
- memory-per-node:executor的mem,eg 512m、1G
- dependent-jar-uris:依赖的jar包,list形式,或者字符串,使用逗号隔开
- [“file:///xxx.jar”,”file:///xxx2.jar”],或者”file:///xxx.jar,file:///xxx2.jar”
- 其他的spark配置,去掉前缀spark即可
- 如:spark.speculation可配置为speculation
- server_start.sh
- 启动spark job server
- manager_start.sh
- context-per-jvm设置为true时,才会使用此脚本,用于启动context
使用
- 启动
- 运行脚本server_start.sh即可
- 初始化context
curl -d "" 'ip:port/contexts/roncen_test_context?context-factory=spark.jobserver.context.HiveContextFactory'
- 上传jar包
curl -H "Content-Type: application/java-archive" --data-binary @/home/vipshop/platform/sjs_2.0/jars/job-server-extras_2.11-0.7.0-SNAPSHOT.jar ip:8091/binaries/sql
- 提交作业
curl -d "sql_file=\"hdfs://bipcluster/spark/sql/test_cassandra.sql\"" 'ip:8091/jobs?appName=sql&classPath=spark.jobserver.vip.VipHiveJob&context=roncen_test_context&sync=false'
- 通过jobId获取job运行状态
curl -v 'ip:8091/jobs/xxx
- 删除context
curl -X DELETE "ip:8091/contexts/roncen_test_context"
问题记录
- server返回失败问题
- [delete context时,context上的job并未结束]
- 时不时返回
The server was not able to produce a timely response to your request
问题1:The server was not able to produce a timely response to your request
- 探测方法
curl -v 'ip:port/jobs/b2ee01d2-a495-43a3-a0e5-f2ba82330211'
- 探测对应的jobId状态
- 正常情况下,返回:”RUNNING”|”ERROR”|”FINISHED”
- 获取job状态逻辑
spark.jobserver.WebApi
中接收http的GET请求GET /jobs/<jobId>
- 通过akka从
jobInfoActor
中获取job状态GetJobStatus(jobId)
- 从
jobDao
中获取对应jobId的信息JobSqlDao.getJobInfo()
中,从数据库中查询对应job的信息,返回
- 从
- 返回格式
application/json
给客户端- jobId不存在:返回
No such job ID xxxx
- 存在:
- 构造返回格式:
jobId: , startTime: , classPath: , context: , duration: , status:
- 通过akka从
JobInfoActor
中获取job结果GetJobResult
- 通过
AkkaClusterSupervisorActor
的GetResultActor(context)
得到对应的resultActor
- 通过contextName得到对应的resultActor
- -
- 通过
resultActor
的GetJobResult(jobId)
得到最后的结果
- 通过
- 返回客户端结果
- 构造返回格式:
- jobId不存在:返回
初始化context步骤
- 命令示例
curl -d "" 'ip:port/contexts/sql-context-for-update-on-sale-85?context-factory=spark.jobserver.context.HiveContextFactory'
- 通过http调用
WebApi
中的POST /contexts/<contextName>
通过akka调用
AkkaClusterSupervisorActor
的AddContext(cName, config)
- 判断是否存在,如果存在则返回
ContextAlreadyExists
调用方法
startContext()
- 生成contextActorName,”jobManager-” + uuid
- 在${LOG_DIR}路径下创建contextDir路径,生成对应文件
context.conf
- 存放
actorname
、context-factory
等基础信息
- 存放
- 生成执行命令:
${deploy.manager-start-cmd} contextDir cluster.selfAddress(akka地址)
,即./manager_start.sh xxx
,此命令是在后台执行的,命令后有&
- 判断返回值,如果失败,返回
ContextInitError
,如果成功,将其放入contextInitInfos
的map中 - 执行上述生成的命令
- 执行主类
spark.jobserver.JobManager
- 获取
context.conf
文件中的配置信息 - 初始化jobDao,
spark.jobserver.jobdao
配置,这里为spark.jobserver.io.JobSqlDAO
- 初始化JobDAOActor,命名为
dao-manager-jobmanager
- 初始化jobManager,命名为
${context.actorname}
- join到cluster中
Cluster(system).join(clusterAddress)
????- 发送
ActorIdentity(memberActors, actorRefOpt)
到AkkaClusterSupervisorActor
- 发送
- 执行主类
AkkaClusterSupervisorActor
收到此消息后- 遍历当前cluster所有的actorRef
- 如果返回的actorName以
jobManager
开头则执行以下步骤,否则不处理 - 从
contextInitInfos
中remove当前actorName对应的actor - 执行方法
initContext()
- 初始化
JobResultActor resultActor
- 通过akka将
resultActor
发送给正在处理的actor,即发送消息JobManagerActor.Initialize(Some(resultActor))
到JobManagerActor
JobManagerActor
得到消息后,进行如下处理- 初始化
JobStatusActor
- 得到
JobResultActor
,如果resultActor
没有,则初始化一个 - 加载
dependent-jar-uris
指定的jar包 - 生成contextFactory,生成context
- 生成
JobCacheImpl
,用于缓存job信息 - 将
dependent-jar-uris
指定的jar包放入sparkContext.addJar()中 - 返回
Initialized(contextName, resultActor)
,如果失败,则返回InitError(t)
- 初始化
- 得到返回值
- 如果成功,则将当前context放到
contexts
中,即contexts(ctxName) = (ref, resActor)
- 如果成功,则将当前context放到
- 如果返回的actorName以
- 遍历当前cluster所有的actorRef
返回成功/失败
- 判断是否存在,如果存在则返回
- 返回json类型结果
提交job到context中
- 命令示例:
curl -d "sql = \"show databases\"" 'ip:port/jobs?appName=sql&classPath=spark.jobserver.HiveTestJob&context=sql-context-for-gs-sku-check-85&sync=true'
- 通过http调用
WebApi
中的POST /jobs
- 通过akka中
AkkaClusterSupervisorActor
的GetContext(name)
,得到对应context的jobManager
- 如果没有得到,则返回
NoSuchContext
或者ContextInitError(err)
- 通过
jobManager
进行与context进程通信,发送JobManagerActor.StartJob
,用于提交作业- 加载未加载的jar包
- 调用
startJobInternal()
- 通过jobSqlDao,获取当前appName上次提交作业的时间和type,如果没有则返回错误
- 随机生成randomUUID,作为jobId
- 通过
sparkContextFactory.loadAndValidateJob()
生成jobContainer- 通过classPath/appname,在
JobCacheImpl
中获取JobJarInfo
,并初始化- 如果cache中没有,会通过akka发送消息
GetBinaryPath()
,从jobSqlDao中获取jar包 - 初始化构造函数,将其放入JobContainer中,返回
- 如果cache中没有,会通过akka发送消息
- 通过classPath/appname,在
- 判断返回值,如果为Good(container),则继续,否则返回错误
- 将结果发送给
JobResultActor
和JobStatusActor
JobStatusActor
- 发送消息
SaveJobInfo
到jobSqlDao,将信息存入元数据库
- 发送消息
- 调用方法
getJobFuture()
返回结果- 判断当前runningJob是否大于最大运行job,如果是则返回
NoJobSlotsAvailable(maxRunningJobs)
,否则继续 - 使用scala的Future,另起线程执行job
- 设置SparkEnv
- 发送消息
JobInit
到JobStatusActor
- 通过方法
HiveTestJob.validate()
判断当前job是否正常- 如果正常
- 发送消息
JobStarted
到JobStatusActor
- 设置sparkContext的jobGroup为当前jobId,
sc.setJobGroup(jobId, xxx)
- 调用接口,执行job,
HiveTestJob.runJob(jobC, jobEnv, jobData)
- 发送消息
- 否则发送
JobValidationFailed
- 如果正常
- 线程执行结束
- 成功
- 发送
JobFinished
到JobStatusActor
- 发送
JobResult
到JobResultActor
- 发送
- 失败
- 发送
JobErroredOut
到JobStatusActor
- 发送
- 成功
- 判断当前runningJob是否大于最大运行job,如果是则返回
- 判断返回结果,并返回给客户端对应的http reponse
JobResult(jobId, res)
JobErroredOut
JobStarted(_, jobInfo)
- 通过akka发送给
JobInfoActor
消息StoreJobConfig(jobInfo.jobId, postedJobConfig)
JobInfoActor
得到消息后,通过jobDao.saveJobConfig(jobId, jobConfig)
存储信息,这里为JobSqlDao
- 通过akka发送给
JobValidationFailed
NoSuchApplication
NoSuchClass
WrongJobType
WrongJobType
NoJobSlotsAvailable
ContextInitError
- 如果没有得到,则返回
图形化展示
spark job server原理相关推荐
- 深入分析Spark任务调度的原理--Java后端同学入门Spark编程系列
作者:陌北有棵树,Java人,架构师社区合伙人! 之前写了一篇:<我作为Java后端,分享一下入门Spark编程的经历!> 上篇是Spark入门的第一篇,写了一些关于Spark编程中RDD ...
- Spark累加器实现原理及基础编程
Spark累加器实现原理及基础编程 实现原理 累加器用来把 Executor 端变量信息聚合到 Driver 端.在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这 ...
- spark任务shell运行_大数据系列:Spark的工作原理及架构
介绍 本Apache Spark教程将说明Apache Spark的运行时架构以及主要的Spark术语,例如Apache SparkContext,Spark shell,Apache Spark应用 ...
- Spark History Server配置及其启动
Spark history Server产生背景 以standalone运行模式为例,在运行Spark Application的时候,Spark会提供一个WEBUI列出应用程序的运行时信息:但该WEB ...
- Spark SQL运行原理和架构
一 Spark SQL运行架构 Spark SQL对SQL语句的处理和关系型数据库类似,即词法/语法解析.绑定.优化.执行.Spark SQL会先将SQL语句解析成一棵树,然后使用规则(Rule)对T ...
- spark基础之spark sql运行原理和架构
一 Spark SQL运行架构 Spark SQL对SQL语句的处理和关系型数据库类似,即词法/语法解析.绑定.优化.执行.Spark SQL会先将SQL语句解析成一棵树,然后使用规则(Rule)对T ...
- Spark 系列——Spark的Shuffle原理
目录 一.基本介绍 1.1 Lineage 1.2 窄依赖 1.3 宽依赖 二.Spark Shuffle的原理 2.1 ShuffleManager 2.2 ShuffleWriter 2.2.1 ...
- YARN, MR History Server和Spark History Server
转载来自:http://blog.csdn.net/cymvp/article/details/52090348 YARN, MR History Server和Spark History Serve ...
- Spark History Server和Event Log详解
前言 Spark如何持久化event SHS启动参数 SHS工作流程 DiskCache 读取eventlog 参考 前言 这篇文章会overall的介绍一下Spark History Server和 ...
最新文章
- cocoapods更新
- 汇编语言系统调用过程
- C++_类和动态内存分配2-改进后的String类
- Android textview换行
- CynosDB技术详解——存储集群管理【文末有福利】
- python中da_python学习 da4
- 面试必会系列 - 1.2 Java 集合,源码讲解
- MySQL5.5读写分离之mysql-proxy
- sift计算效率优化_【计算机视觉】9. 小结
- 使用FileOutputStream和ObjectOutputStream向文本文件中写多个对象的信息(序列化)
- java 新手入门电子书_java从入门到精通第6版电子书 PDF高清版
- 用小乌龟git解决冲突之后,再提交,出现自己没用动过的文件
- Python绘制心型图案
- AndroidManifest merger failed with multiple errors, see logs
- 大厂转身,小米数字化采购缘何成功?| 爱分析调研
- ProcessOn‘s Bug--完善个人资料无限扩容
- 傅里叶变换、拉普拉斯变换与z变换对比
- 安卓手机上有哪些好用的小说阅读器?
- JAVA与MAVEN打包
- 3D游戏建模需要学会哪些软件?想入行游戏建模的你都学习了吗?
热门文章
- (翻译)‘Sign Up’ 和‘Sign In’按钮让用户混淆的原因
- 光纤光信号闪红灯_宽带光猫光信号闪红灯怎么弄
- 跟马保国老师闪电五连鞭视频通话网站源码
- 计算机中收藏夹中的桌面怎么找,电脑浏览器收藏夹保存在哪里
- 萌妹子语音陪你写代码,一个神奇的 VSCode 插件
- vue3.0对服务端进行渲染
- uniapp前端用forEach循环遍历数组
- 聊聊我的成长--数据库初学过程--MySQL的安装与配置
- android仿真平台,一种仿真机器人Android平台的车道偏离预警方法与流程
- LINQ: Reconciling objects, relations and XML in the .NET framework