配置相关

  • settings.sh

    • 功能:配置环境变量
    • APP_USER/APP_GROUP:作业提交用户和组
    • JMX_PORT:java jmx端口,通常在aws或者其他容器里打开
    • INSTALL_DIR:sjs所做目录
    • LOG_DIR:日志路径
    • PIDFILE:启动sjs,产生pid存放的文件名
    • JOBSERVER_MEMORY:启动spark作业的driverMem
    • SPARK_VERSION:指定spark版本
    • SCALA_VERSION:scala版本
    • SPARK_HOME、SPARK_LOG_DIR、SPARK_CONF_DIR:spark相关配置
    • YARN_CONF_DIR、HADOOP_CONF_DIR:yarn配置
  • local.conf
    • spark.master:指定spark提交的类型,yarn-client、local[4]等
    • spark.jobserver
      • port:指定jobServer的启动端口,使用此端口进行作业提交和监控等
      • context-per-jvm:是否每个context都启动一个独立的进程
      • jobdao:指定处理jobs、jars等逻辑的类
      • datadao:通过POST/data上传到sjs的文件存放路径
      • sqldao:当jobdao指定为JobSqlDAO时使用
        • slick-driver
        • jdbc-driver
        • rootdir:H2 driver存放数据目录
        • jdbc:连接
        • dbcp:连接池
      • result-chunk-size
        • 作业返回值使用分块传输,每块大小
    • spark.contexts:启动sjs自动加载的context配置
      • 名字
    • context-settings:启动context,即app,相关配置
      • num-cpu-cores:core个数
      • memory-per-node:executor的mem,eg 512m、1G
      • dependent-jar-uris:依赖的jar包,list形式,或者字符串,使用逗号隔开
        • [“file:///xxx.jar”,”file:///xxx2.jar”],或者”file:///xxx.jar,file:///xxx2.jar”
      • 其他的spark配置,去掉前缀spark即可
        • 如:spark.speculation可配置为speculation
  • server_start.sh
    • 启动spark job server
  • manager_start.sh
    • context-per-jvm设置为true时,才会使用此脚本,用于启动context

使用

  • 启动

    • 运行脚本server_start.sh即可
  • 初始化context
    • curl -d "" 'ip:port/contexts/roncen_test_context?context-factory=spark.jobserver.context.HiveContextFactory'
  • 上传jar包
    • curl -H "Content-Type: application/java-archive" --data-binary @/home/vipshop/platform/sjs_2.0/jars/job-server-extras_2.11-0.7.0-SNAPSHOT.jar ip:8091/binaries/sql
  • 提交作业
    • curl -d "sql_file=\"hdfs://bipcluster/spark/sql/test_cassandra.sql\"" 'ip:8091/jobs?appName=sql&classPath=spark.jobserver.vip.VipHiveJob&context=roncen_test_context&sync=false'
  • 通过jobId获取job运行状态
    • curl -v 'ip:8091/jobs/xxx
  • 删除context
    • curl -X DELETE "ip:8091/contexts/roncen_test_context"

问题记录

  • server返回失败问题
  • [delete context时,context上的job并未结束]
  • 时不时返回The server was not able to produce a timely response to your request

问题1:The server was not able to produce a timely response to your request

  • 探测方法

    • curl -v 'ip:port/jobs/b2ee01d2-a495-43a3-a0e5-f2ba82330211'
    • 探测对应的jobId状态
    • 正常情况下,返回:”RUNNING”|”ERROR”|”FINISHED”
  • 获取job状态逻辑
    • spark.jobserver.WebApi中接收http的GET请求GET /jobs/<jobId>
    • 通过akka从jobInfoActor中获取job状态GetJobStatus(jobId)
      • jobDao中获取对应jobId的信息

        • JobSqlDao.getJobInfo()中,从数据库中查询对应job的信息,返回
    • 返回格式application/json给客户端
      • jobId不存在:返回No such job ID xxxx
      • 存在:
        • 构造返回格式:jobId: , startTime: , classPath: , context: , duration: , status:
        • 通过akka从JobInfoActor中获取job结果GetJobResult
          • 通过AkkaClusterSupervisorActorGetResultActor(context)得到对应的resultActor

            • 通过contextName得到对应的resultActor
            • -
          • 通过resultActorGetJobResult(jobId)得到最后的结果
        • 返回客户端结果

初始化context步骤

  • 命令示例curl -d "" 'ip:port/contexts/sql-context-for-update-on-sale-85?context-factory=spark.jobserver.context.HiveContextFactory'
  • 通过http调用WebApi中的POST /contexts/<contextName>
  • 通过akka调用AkkaClusterSupervisorActorAddContext(cName, config)

    • 判断是否存在,如果存在则返回ContextAlreadyExists
    • 调用方法startContext()

      • 生成contextActorName,”jobManager-” + uuid
      • 在${LOG_DIR}路径下创建contextDir路径,生成对应文件context.conf
        • 存放actornamecontext-factory等基础信息
      • 生成执行命令:${deploy.manager-start-cmd} contextDir cluster.selfAddress(akka地址),即./manager_start.sh xxx,此命令是在后台执行的,命令后有&
      • 判断返回值,如果失败,返回ContextInitError,如果成功,将其放入contextInitInfos的map中
      • 执行上述生成的命令
        • 执行主类spark.jobserver.JobManager
        • 获取context.conf文件中的配置信息
        • 初始化jobDao,spark.jobserver.jobdao配置,这里为spark.jobserver.io.JobSqlDAO
        • 初始化JobDAOActor,命名为dao-manager-jobmanager
        • 初始化jobManager,命名为${context.actorname}
        • join到cluster中Cluster(system).join(clusterAddress) ????
          • 发送ActorIdentity(memberActors, actorRefOpt)AkkaClusterSupervisorActor
      • AkkaClusterSupervisorActor收到此消息后
        • 遍历当前cluster所有的actorRef

          • 如果返回的actorName以jobManager开头则执行以下步骤,否则不处理
          • contextInitInfos中remove当前actorName对应的actor
          • 执行方法initContext()
          • 初始化JobResultActor resultActor
          • 通过akka将resultActor发送给正在处理的actor,即发送消息JobManagerActor.Initialize(Some(resultActor))JobManagerActor
            • JobManagerActor得到消息后,进行如下处理

              • 初始化JobStatusActor
              • 得到JobResultActor,如果resultActor没有,则初始化一个
              • 加载dependent-jar-uris指定的jar包
              • 生成contextFactory,生成context
              • 生成JobCacheImpl,用于缓存job信息
              • dependent-jar-uris指定的jar包放入sparkContext.addJar()中
              • 返回Initialized(contextName, resultActor),如果失败,则返回InitError(t)
          • 得到返回值
            • 如果成功,则将当前context放到contexts中,即contexts(ctxName) = (ref, resActor)
    • 返回成功/失败

  • 返回json类型结果

提交job到context中

  • 命令示例:curl -d "sql = \"show databases\"" 'ip:port/jobs?appName=sql&classPath=spark.jobserver.HiveTestJob&context=sql-context-for-gs-sku-check-85&sync=true'
  • 通过http调用WebApi中的POST /jobs
  • 通过akka中AkkaClusterSupervisorActorGetContext(name),得到对应context的jobManager
    • 如果没有得到,则返回NoSuchContext或者ContextInitError(err)
    • 通过jobManager进行与context进程通信,发送JobManagerActor.StartJob,用于提交作业
      • 加载未加载的jar包
      • 调用startJobInternal()
        • 通过jobSqlDao,获取当前appName上次提交作业的时间和type,如果没有则返回错误
        • 随机生成randomUUID,作为jobId
        • 通过sparkContextFactory.loadAndValidateJob()生成jobContainer
          • 通过classPath/appname,在JobCacheImpl中获取JobJarInfo,并初始化

            • 如果cache中没有,会通过akka发送消息GetBinaryPath(),从jobSqlDao中获取jar包
            • 初始化构造函数,将其放入JobContainer中,返回
        • 判断返回值,如果为Good(container),则继续,否则返回错误
        • 将结果发送给JobResultActorJobStatusActor
          • JobStatusActor

            • 发送消息SaveJobInfo到jobSqlDao,将信息存入元数据库
        • 调用方法getJobFuture()返回结果
          • 判断当前runningJob是否大于最大运行job,如果是则返回NoJobSlotsAvailable(maxRunningJobs),否则继续
          • 使用scala的Future,另起线程执行job
            • 设置SparkEnv
            • 发送消息JobInitJobStatusActor
            • 通过方法HiveTestJob.validate()判断当前job是否正常
              • 如果正常

                • 发送消息JobStartedJobStatusActor
                • 设置sparkContext的jobGroup为当前jobId,sc.setJobGroup(jobId, xxx)
                • 调用接口,执行job,HiveTestJob.runJob(jobC, jobEnv, jobData)
              • 否则发送JobValidationFailed
            • 线程执行结束
              • 成功

                • 发送JobFinishedJobStatusActor
                • 发送JobResultJobResultActor
              • 失败
                • 发送JobErroredOutJobStatusActor
    • 判断返回结果,并返回给客户端对应的http reponse
      • JobResult(jobId, res)
      • JobErroredOut
      • JobStarted(_, jobInfo)
        • 通过akka发送给JobInfoActor消息StoreJobConfig(jobInfo.jobId, postedJobConfig)

          • JobInfoActor得到消息后,通过jobDao.saveJobConfig(jobId, jobConfig)存储信息,这里为JobSqlDao
      • JobValidationFailed
      • NoSuchApplication
      • NoSuchClass
      • WrongJobType
      • WrongJobType
      • NoJobSlotsAvailable
      • ContextInitError

图形化展示

spark job server原理相关推荐

  1. 深入分析Spark任务调度的原理--Java后端同学入门Spark编程系列

    作者:陌北有棵树,Java人,架构师社区合伙人! 之前写了一篇:<我作为Java后端,分享一下入门Spark编程的经历!> 上篇是Spark入门的第一篇,写了一些关于Spark编程中RDD ...

  2. Spark累加器实现原理及基础编程

    Spark累加器实现原理及基础编程 实现原理 累加器用来把 Executor 端变量信息聚合到 Driver 端.在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这 ...

  3. spark任务shell运行_大数据系列:Spark的工作原理及架构

    介绍 本Apache Spark教程将说明Apache Spark的运行时架构以及主要的Spark术语,例如Apache SparkContext,Spark shell,Apache Spark应用 ...

  4. Spark History Server配置及其启动

    Spark history Server产生背景 以standalone运行模式为例,在运行Spark Application的时候,Spark会提供一个WEBUI列出应用程序的运行时信息:但该WEB ...

  5. Spark SQL运行原理和架构

    一 Spark SQL运行架构 Spark SQL对SQL语句的处理和关系型数据库类似,即词法/语法解析.绑定.优化.执行.Spark SQL会先将SQL语句解析成一棵树,然后使用规则(Rule)对T ...

  6. spark基础之spark sql运行原理和架构

    一 Spark SQL运行架构 Spark SQL对SQL语句的处理和关系型数据库类似,即词法/语法解析.绑定.优化.执行.Spark SQL会先将SQL语句解析成一棵树,然后使用规则(Rule)对T ...

  7. Spark 系列——Spark的Shuffle原理

    目录 一.基本介绍 1.1 Lineage 1.2 窄依赖 1.3 宽依赖 二.Spark Shuffle的原理 2.1 ShuffleManager 2.2 ShuffleWriter 2.2.1 ...

  8. YARN, MR History Server和Spark History Server

    转载来自:http://blog.csdn.net/cymvp/article/details/52090348 YARN, MR History Server和Spark History Serve ...

  9. Spark History Server和Event Log详解

    前言 Spark如何持久化event SHS启动参数 SHS工作流程 DiskCache 读取eventlog 参考 前言 这篇文章会overall的介绍一下Spark History Server和 ...

最新文章

  1. cocoapods更新
  2. 汇编语言系统调用过程
  3. C++_类和动态内存分配2-改进后的String类
  4. Android textview换行
  5. CynosDB技术详解——存储集群管理【文末有福利】
  6. python中da_python学习 da4
  7. 面试必会系列 - 1.2 Java 集合,源码讲解
  8. MySQL5.5读写分离之mysql-proxy
  9. sift计算效率优化_【计算机视觉】9. 小结
  10. 使用FileOutputStream和ObjectOutputStream向文本文件中写多个对象的信息(序列化)
  11. java 新手入门电子书_java从入门到精通第6版电子书 PDF高清版
  12. 用小乌龟git解决冲突之后,再提交,出现自己没用动过的文件
  13. Python绘制心型图案
  14. AndroidManifest merger failed with multiple errors, see logs
  15. 大厂转身,小米数字化采购缘何成功?| 爱分析调研
  16. ProcessOn‘s Bug--完善个人资料无限扩容
  17. 傅里叶变换、拉普拉斯变换与z变换对比
  18. 安卓手机上有哪些好用的小说阅读器?
  19. JAVA与MAVEN打包
  20. 3D游戏建模需要学会哪些软件?想入行游戏建模的你都学习了吗?

热门文章

  1. (翻译)‘Sign Up’ 和‘Sign In’按钮让用户混淆的原因
  2. 光纤光信号闪红灯_宽带光猫光信号闪红灯怎么弄
  3. 跟马保国老师闪电五连鞭视频通话网站源码
  4. 计算机中收藏夹中的桌面怎么找,电脑浏览器收藏夹保存在哪里
  5. 萌妹子语音陪你写代码,一个神奇的 VSCode 插件
  6. vue3.0对服务端进行渲染
  7. uniapp前端用forEach循环遍历数组
  8. 聊聊我的成长--数据库初学过程--MySQL的安装与配置
  9. android仿真平台,一种仿真机器人Android平台的车道偏离预警方法与流程
  10. LINQ: Reconciling objects, relations and XML in the .NET framework