saturn-console

启动

saturn console本身为一个springboot项目,所有的bean都在saturn-console 下的applicationContext.xml中定义。所以启动主要就是初始化负责各种操作的bean

    <!-- -----  权限管理主要针对job -------->
<bean id="authorizationService"class="com.vip.saturn.job.console.service.impl.AuthorizationServiceImpl"/> <bean id="authorizationManageServiceImpl"class="com.vip.saturn.job.console.service.impl.AuthorizationManageServiceImpl"/><!-- -----  系统配置管理,console中的一些系统配置相关 --------><bean id="systemConfigService"class="com.vip.saturn.job.console.service.impl.SystemConfigServiceImpl"/><!-- -----  告警  --------><bean id="alarmStatisticsService"class="com.vip.saturn.job.console.service.impl.AlarmStatisticsServiceImpl"/><!-- -----  主页和统计信息 --------><bean id="dashboardService"class="com.vip.saturn.job.console.service.impl.DashboardServiceImpl"/><!-- -----  执行器管理 --------><bean id="executorService"class="com.vip.saturn.job.console.service.impl.ExecutorServiceImpl"/><!-- -----  job --------><bean id="jobService"class="com.vip.saturn.job.console.service.impl.JobServiceImpl"/><!-- -----  域和zk集群管理 --------><bean id="namespaceZkClusterMappingService"class="com.vip.saturn.job.console.service.impl.NamespaceZkClusterMappingServiceImpl"/><!-- -----  报告告警 --------><bean id="reportAlarmService"class="com.vip.saturn.job.console.service.impl.ReportAlarmServiceImpl"/><!-- ----- 判断cron --------><bean id="utilsService"class="com.vip.saturn.job.console.service.impl.UtilsServiceImpl"/><!-- -----  判断zk和数据库是不同 --------><bean id="zkDBDiffService"class="com.vip.saturn.job.console.service.impl.ZkDBDiffServiceImpl"/><!-- -----  查看和管理zk中的path --------><bean id="zkTreeService"class="com.vip.saturn.job.console.service.impl.ZkTreeServiceImpl"/><!-- ----- 更新job的配置 --------><bean id="updateJobConfigService"class="com.vip.saturn.job.console.service.impl.UpdateJobConfigServiceImpl"/><!-- -----  RestApi 针对job的一些http请求 --------><bean id="restApiService" class="com.vip.saturn.job.console.service.impl.RestApiServiceImpl"/>
<!-- -----  将执行的结果(每天执行多少,失败等等)持久化到数据库 --------><bean id="statisticPersistence"class="com.vip.saturn.job.console.service.impl.statistics.StatisticsPersistence"/>
<!-- -----  dashboard统计数据(执行多少,失败等等) --------><bean id="statisticRefreshService"class="com.vip.saturn.job.console.service.impl.statistics.StatisticsRefreshServiceImpl"/>
<!-- -----  注册中心管理 --------><bean id="registryCenterService"class="com.vip.saturn.job.console.service.impl.RegistryCenterServiceImpl"/>
<!-- -----  权限,登陆 --------><bean id="authenticationService"class="com.vip.saturn.job.console.service.impl.AuthenticationServiceImpl"/>

上诉的bean中,大部分都是一些定时任务。重点关注namespacejob管理,包括创建,分片等等。

namespace的创建

console创建域,需要传入namespacezkCluster,后台由RegistryCenterControllercreateNamespace进行创建。核心逻辑如下:

  @Transactional(rollbackFor = {Exception.class})@Overridepublic void createNamespace(NamespaceDomainInfo namespaceDomainInfo) throws SaturnJobConsoleException {String namespace = namespaceDomainInfo.getNamespace();String zkClusterKey = namespaceDomainInfo.getZkCluster();// 从注册中心查询是否存在该zk集群ZkCluster currentCluster = getZkCluster(zkClusterKey);if (currentCluster == null) {throw new SaturnJobConsoleHttpException(HttpStatus.BAD_REQUEST.value(),String.format(ERR_MSG_TEMPLATE_FAIL_TO_CREATE, namespace, "not found zkcluster" + zkClusterKey));}// 判断当前所有的集群下是否有该namespace,也就是说namespace是所有集群唯一的if (checkNamespaceExists(namespace)) {throw new SaturnJobConsoleHttpException(HttpStatus.BAD_REQUEST.value(),String.format(ERR_MSG_NS_ALREADY_EXIST, namespace));}try {// 创建 namespaceInfoNamespaceInfo namespaceInfo = constructNamespaceInfo(namespaceDomainInfo);namespaceInfoService.create(namespaceInfo);// 创建 zkcluster 和 namespaceInfo 关系,并写入数据库,如果从现有数据库中发现有该namespace,则更新数据,否则插入新数据。namespaceZkClusterMapping4SqlService.insert(namespace, "", zkClusterKey, NAMESPACE_CREATOR_NAME);// refresh 数据到注册中心,该方法不是通过直接刷新数据到zk,而是通过刷新sys_config 中的uid来异步进行刷新notifyRefreshRegCenter();} catch (Exception e) {log.error(e.getMessage(), e);throw new SaturnJobConsoleHttpException(HttpStatus.INTERNAL_SERVER_ERROR.value(),String.format(ERR_MSG_TEMPLATE_FAIL_TO_CREATE, namespace, e.getMessage()));}}

上文中代码会和RegistryCenterServiceImpl进行交互。

 public void init() {getConsoleClusterId();localRefresh();initLocalRefreshThreadPool();startLocalRefreshTimer();startLocalRefreshIfNecessaryTimer();}private void initLocalRefreshThreadPool() {localRefreshThreadPool = Executors.newSingleThreadExecutor(new ConsoleThreadFactory("refresh-RegCenter-thread", false));}private void startLocalRefreshTimer() {//每隔5分钟执行一次localRefresh}private void startLocalRefreshIfNecessaryTimer() {/* 每一秒钟检查,当system的配置发生变化,当前的uuid 和最新的uuid 不同的时候,执行loaclRefresh */}private synchronized void localRefresh() {// 有删减// 刷新注册中心,主要包括,对比zk集群是否变化,包括域的关闭和迁移refreshRegistryCenter();// console 的选主和数据更新,在console的系统配置中设置的zk集群才能够被当前的console管理。// 该数据为mysql中的数据,dashboard的选主逻辑为在zk中注册$SaturnSlef/saturn-console/dashboard/leaderrefreshDashboardLeaderTreeCache();// 是否需要创建或者迁移namespaceShardingManager,每一个域名都对应一个namespaceShardingManagerrefreshNamespaceShardingListenerManagerMap();}

该类的初始化主要为上文的代码。所以,域名的创建逻辑是直接写入数据库,然后通过refresh写入zk。一个namespace的创建就完成了,主要是在数据库中写入数据(namespace_zkcluster_mapping),更新sys_config中的uid异步刷新。刷新后,注册中心会有该namespace的节点,以及针对dashboard的选主和分片管理,其中每一个namespace会有一个分片管理NamespaceShardingManager,该类主要用于管理执行器的上下线,分片等信息。

     private void start0() throws Exception {//shardingTreeCacheService.start();// create ephemeral node $SaturnExecutors/leader/host & $Jobs// 主要是针对saturn的执行器选举,为console节点namespaceShardingService.leaderElection();// 针对已经存在的job增加JobServersTriggerShardingListener,用于当executor上线下后进行分片,节点为$Jobs/${jobName}/servers// JobServersTriggerShardingListener addJobListenersService.addExistJobPathListener();// 上下线Listener,节点为/$SaturnExecutors/executors//ExecutorOnlineOfflineTriggerShardingListener  ExecutorTrafficTriggerShardingListeneraddOnlineOfflineListener();// 分片 节点为/$SaturnExecutors/sharding//SaturnExecutorsShardingTriggerShardingListeneraddExecutorShardingListener();// 选主// /$SaturnExecutors/leaderaddLeaderElectionListener();// 新增与删除/$Jobs// AddOrRemoveJobListeneraddNewOrRemoveJobListener();}

针对上面不同的listerner最后的执行的方法都是AbstractAsyncShardingTask中的run方法。当前我们创建了一个namespace且需要分片的时候,对应的NamespaceShardingManager会在zk上注册多个节点,并且监听对应的executor上下线,job创建等事件。分片是由当前executor的主节点来进行的,也就是说当新建一个namespace的时候,该namespace所有的executor分片由获取到该namespaceleader进行操作,每一步操作都会判断当前操作的对象是否为namespaceleader(是否已经创建latch节点,以及host是否就是当前节点)。

选举:

 public void leaderElection() throws Exception {lock.lockInterruptibly();try {if (hasLeadership()) {return;}log.info("{}-{} leadership election start", namespace, hostValue);try (LeaderLatch leaderLatch = new LeaderLatch(curatorFramework,SaturnExecutorsNode.LEADER_LATCHNODE_PATH)) {leaderLatch.start();int timeoutSeconds = 60;if (leaderLatch.await(timeoutSeconds, TimeUnit.SECONDS)) {if (!hasLeadership()) {becomeLeader();} else {log.info("{}-{} becomes a follower", namespace, hostValue);}} else {log.error("{}-{} leadership election is timeout({}s)", namespace, hostValue, timeoutSeconds);}} catch (InterruptedException e) {log.info("{}-{} leadership election is interrupted", namespace, hostValue);throw e;} catch (Exception e) {log.error(namespace + "-" + hostValue + " leadership election error", e);throw e;}} finally {lock.unlock();}}

选举其实就是在$SaturnExecutors/leader节点下写入latch节点,成为leader后,回持久化$JOB节点在zk,然后将host写入$SaturnExecutors/leader/host里。

如果当前操作实例成为leader,则进行jobexecutor管理工作,包括上下线,分片等。

console所有的操作最后其实都是异步执行AbstractAsyncShardingTask里面的方法:

 public void run() {logStartInfo();boolean isAllShardingTask = this instanceof ExecuteAllShardingTask;try {// 如果当前变为非leader,则直接返回if (!namespaceShardingService.isLeadershipOnly()) {return;}// 如果需要全量分片,且当前线程不是全量分片线程,则直接返回,没必要做分片,由于console在选举成功后就会设置全量分片为true,而且立马将全量分片的task//提交给线程池。所以一开始是执行全量分片的if (namespaceShardingService.isNeedAllSharding() && !isAllShardingTask) {log.info("the {} will be ignored, because there will be {}", this.getClass().getSimpleName(),ExecuteAllShardingTask.class.getSimpleName());return;}// 从zk中获取所有的jobList<String> allJobs = getAllJobs();// 获取所有enable的jobList<String> allEnableJobs = getAllEnableJobs(allJobs);//最后在线的executor。位于$SaturnExecutors/sharding/content,该节点包含了executor的ip,分片值,以及负载等信息List<Executor> oldOnlineExecutorList = getLastOnlineExecutorList();// 如果当前是全分片,则从/$SaturnExecutors/executors 下拉去所有在线的executor,否则为nullList<Executor> customLastOnlineExecutorList = customLastOnlineExecutorList();// 如果不是全量分片,则copy最后在县的executor。List<Executor> lastOnlineExecutorList = customLastOnlineExecutorList == null? copyOnlineExecutorList(oldOnlineExecutorList) : customLastOnlineExecutorList;//最后没有被摘取流量的executor,$SaturnExecutors/executors/xx/noTraffic true 已经被摘流量;false,otherwise;List<Executor> lastOnlineTrafficExecutorList = getTrafficExecutorList(lastOnlineExecutorList);List<Shard> shardList = new ArrayList<>();// 摘取,该方法为抽象方法,if (pick(allJobs, allEnableJobs, shardList, lastOnlineExecutorList, lastOnlineTrafficExecutorList)) {// 放回putBackBalancing(allEnableJobs, shardList, lastOnlineExecutorList, lastOnlineTrafficExecutorList);// 如果当前变为非leader,则返回if (!namespaceShardingService.isLeadershipOnly()) {return;}// 持久化分片结果if (shardingContentIsChanged(oldOnlineExecutorList, lastOnlineExecutorList)) {namespaceShardingContentService.persistDirectly(lastOnlineExecutorList);}// notify the shards-changed jobs of all enable jobs.Map<String, Map<String, List<Integer>>> enabledAndShardsChangedJobShardContent = getEnabledAndShardsChangedJobShardContent(isAllShardingTask, allEnableJobs, oldOnlineExecutorList, lastOnlineExecutorList);namespaceShardingContentService.persistJobsNecessaryInTransaction(enabledAndShardsChangedJobShardContent);// sharding count ++increaseShardingCount();}} catch (InterruptedException e) {log.info("{}-{} {} is interrupted", namespaceShardingService.getNamespace(),namespaceShardingService.getHostValue(), this.getClass().getSimpleName());Thread.currentThread().interrupt();} catch (Throwable t) {log.error(t.getMessage(), t);if (!isAllShardingTask) { // 如果当前不是全量分片,则需要全量分片来拯救异常namespaceShardingService.setNeedAllSharding(true);namespaceShardingService.shardingCountIncrementAndGet();executorService.submit(new ExecuteAllShardingTask(namespaceShardingService));} else { // 如果当前是全量分片,则告警并关闭当前服务,重选leader来做事情raiseAlarm();shutdownNamespaceShardingService();}} finally {if (isAllShardingTask) { // 如果是全量分片,不再进行全量分片namespaceShardingService.setNeedAllSharding(false);}namespaceShardingService.shardingCountDecrementAndGet();}}

针对上面的pick方法:

ExecuteAllShardingTask : 域下重排,移除已经存在所有executor,重新获取executors,重新获取作业shardsExecuteExtractTrafficShardingTask : 摘取executor流量,标记该executor的noTraffic为true,并移除其所有作业分片,只摘取所有非本地作业分片,设置totalLoadLevel为0ExecuteJobDisableShardingTask : 作业禁用,摘取所有executor运行的该作业的shard,注意要相应地减loadLevel,不需要放回ExecuteJobEnableShardingTask: 作业启用,获取该作业的shards,注意要过滤不能运行该作业的executorsExecuteJobForceShardShardingTask: 作业重排,移除所有executor的该作业shard,重新获取该作业的shards,finally删除forceShard结点ExecuteJobServerOfflineShardingTask: 作业的executor下线,将该executor运行的该作业分片都摘取,如果是本地作业,则移除ExecuteJobServerOnlineShardingTask : 作业的executor上线,executor级别平衡摘取,但是只能摘取该作业的shard;添加的新的shardExecuteOfflineShardingTask : executor下线,摘取该executor运行的所有非本地模式作业,移除该executor

总的来说,其实就是完成$SaturnExecutors/sharding/content下的内容,下面的内容是一个数组,从0开始,节点中记录的对象为

[{"executorName":"aaa","ip":"0.0.0.0","noTraffic":false,"jobNameList":["job1","job2"],"shardList":[{"jobName":"job1","item":0,loadlevel:1},{"jobName":"job2","item":0,loadlevel:1}]}]

后面的shardList就是分片后的结果。

上述就是一个namespace的创建过程中的逻辑。

namespace 迁移

console中能够将namespace迁移到其他的zk集群中。迁移主要逻辑就是修改数据库和注册中心的数据。迁移过程过程会记录失败和成功的个数到temporary_shared_status表中。更新过程是先从数据库中拿到当前namespacezk集群名称,然后将当前的节点移动到目的zk集群,删除当前对应的节点,然后刷新数据库中的数据。

如果在迁移过程中,删除了zk节点,但是namespace还没有刷新到数据库中,那么需要通过diff方法将数据补齐。暂时没有发现其他的途径

job

job的创建

job的创建和复制本质上是一样的,只是一个是手写配置,一个是从数据库中拉数据。具体的代码在JobServiceImplcreateJob中。

private void addOrCopyJob(String namespace, JobConfig jobConfig, String jobNameCopied, String createdBy)throws SaturnJobConsoleException {// 一半情况下为空List<JobConfig> unSystemJobs = getUnSystemJobs(namespace);Set<JobConfig> streamChangedJobs = new HashSet<>();validateJobConfig(namespace, jobConfig, unSystemJobs, streamChangedJobs);// 如果数据存在相同作业名,则抛异常// 直接再查一次,不使用unSystemJobs,因为也不能与系统作业名相同String jobName = jobConfig.getJobName();if (currentJobConfigService.findConfigByNamespaceAndJobName(namespace, jobName) != null) {throw new SaturnJobConsoleException(ERROR_CODE_BAD_REQUEST, String.format("该作业(%s)已经存在", jobName));}// 如果zk存在该作业,则尝试删除CuratorRepository.CuratorFrameworkOp curatorFrameworkOp = registryCenterService.getCuratorFrameworkOp(namespace);if (curatorFrameworkOp.checkExists(JobNodePath.getJobNodePath(jobName))) {if (!removeJobFromZk(jobName, curatorFrameworkOp)) {throw new SaturnJobConsoleException(ERROR_CODE_BAD_REQUEST,String.format("该作业(%s)正在删除中,请稍后再试", jobName));}}// 该域作业总数不能超过一定数量,sys_config 表中MAX_JOB_NUM字段,默认为100int maxJobNum = getMaxJobNum();if (jobIncExceeds(namespace, maxJobNum, 1)) {throw new SaturnJobConsoleException(ERROR_CODE_BAD_REQUEST,String.format("总作业数超过最大限制(%d),作业名%s创建失败", maxJobNum, jobName));}// 如果是copy作业,则从数据库中复制被拷贝的作业的配置到新的作业配置JobConfig myJobConfig = jobConfig;if (jobNameCopied != null) {myJobConfig = currentJobConfigService.findConfigByNamespaceAndJobName(namespace, jobNameCopied);SaturnBeanUtils.copyPropertiesIgnoreNull(jobConfig, myJobConfig);}// 设置作业配置字段默认值,并且强制纠正某些字段correctConfigValueWhenAddJob(myJobConfig);// 添加该作业到数据库currentJobConfigService.create(constructJobConfig4DB(namespace, myJobConfig, createdBy, createdBy));// 更新关联作业的上下游for (JobConfig streamChangedJob : streamChangedJobs) {currentJobConfigService.updateStream(constructJobConfig4DB(namespace, streamChangedJob, null, createdBy));}// 添加该作业配置到zk,并联动更新关联作业的上下游createJobConfigToZk(myJobConfig, streamChangedJobs, curatorFrameworkOp);}

最后将作业各种配置写入zk

     // 添加作业
private void createJobConfigToZk(JobConfig jobConfig, Set<JobConfig> streamChangedJobs,CuratorRepository.CuratorFrameworkOp curatorFrameworkOp) throws SaturnJobConsoleException {curatorTransactionOp.create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_ENABLED), jobConfig.getEnabled()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_DESCRIPTION), jobConfig.getDescription()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_CUSTOM_CONTEXT),jobConfig.getCustomContext()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_JOB_TYPE), jobConfig.getJobType()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_JOB_MODE), jobConfig.getJobMode()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_SHARDING_ITEM_PARAMETERS),jobConfig.getShardingItemParameters()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_JOB_PARAMETER),jobConfig.getJobParameter()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_QUEUE_NAME), jobConfig.getQueueName()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_CHANNEL_NAME),jobConfig.getChannelName()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_FAILOVER), jobConfig.getFailover()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_MONITOR_EXECUTION), "true").create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_TIMEOUT_4_ALARM_SECONDS),jobConfig.getTimeout4AlarmSeconds()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_TIMEOUT_SECONDS),jobConfig.getTimeoutSeconds()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_TIME_ZONE), jobConfig.getTimeZone()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_CRON), jobConfig.getCron()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_PAUSE_PERIOD_DATE),jobConfig.getPausePeriodDate()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_PAUSE_PERIOD_TIME),jobConfig.getPausePeriodTime()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_PROCESS_COUNT_INTERVAL_SECONDS),jobConfig.getProcessCountIntervalSeconds()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_SHARDING_TOTAL_COUNT),jobConfig.getShardingTotalCount()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_SHOW_NORMAL_LOG),jobConfig.getShowNormalLog()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_LOAD_LEVEL), jobConfig.getLoadLevel()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_JOB_DEGREE), jobConfig.getJobDegree()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_ENABLED_REPORT),jobConfig.getEnabledReport()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_PREFER_LIST), jobConfig.getPreferList()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_USE_DISPREFER_LIST),jobConfig.getUseDispreferList()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_LOCAL_MODE), jobConfig.getLocalMode()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_USE_SERIAL), jobConfig.getUseSerial()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_DEPENDENCIES),jobConfig.getDependencies()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_GROUPS), jobConfig.getGroups()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_RERUN), jobConfig.getRerun()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_UPSTREAM), jobConfig.getUpStream()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_DOWNSTREAM), jobConfig.getDownStream()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_JOB_CLASS), jobConfig.getJobClass());
}

可以看到,写入zk中的数据特别的多,基本上是配置的所有数据,如果没有则都使用默认值。一半情况下,最开始初始化的时候,只会涉及到job的名称,类全名和分片信息。其他的数据都是在详情里面配置的。

Executor

executor启动

本文只涉及到executor外部启动方式,不涉及到内嵌模式。

executor本身也是个java进程,所以仅仅需要找到对应的main方法就可以知道了解到启动的逻辑。

如果在本地使用saturn:run -Dnamespace=demo1 -DexecutorName=exec01 -DVIP_SATURN_CONSOLE_URI=http://127.0.0.1:9088 -f pom.xml的方式启动,

那么已经配置了namespace,console-url两个必须填入的,在插件的SaturnJobRunMojo中,我们发现他帮我们下载了executorzip然后解压缩,最后传入了saturnLibDirappLibDir,一个用于加载saturn依赖的包,一个是业务代码通过saturn:zip得到的zip包解压缩。为什么不能使用springboot,我们后续会了解到。

saturn-executor启动源码中,首先是初始化classLoader,包含两个,一个executorSaturnClassLoader,一个为jobJobClassLoader,两者的实现都是差不多的,主要都是继承URLClassLoader,该classloader的主要作用就是可以通过jar或者url等方式获取到class

public class SaturnClassLoader extends URLClassLoader {public SaturnClassLoader(URL[] urls, ClassLoader parent) {super(urls, parent);}@Overrideprotected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {synchronized (getClassLoadingLock(name)) {Class<?> findClass = findLoadedClass(name);if (findClass == null) {findClass = super.loadClass(name, resolve);}return findClass;}}

classLoader继承了URLClassLoader,上文抛出一个问题,就是为什么不能用springboot的打包方式。当我们需要执行一个jar的时候,jar包中对应会有个META-INF/MANIFEST.MF文件,里面有个参数就是Main-Class当我们使用springboot插件打包的时候,该参数被修改为org.springframework.boot.loader.JarLauncher,而我们定义的main方法的class最终变成了start-class也就是说.而Jarlauncher最后使用的classloaderorg.springframework.boot.loader.LaunchedURLClassLoader,他会去扫描当前jar包里的BOOT-INF/classesBOOT-INF/lib,所以如果使用JobClassLoader的去加载原本写好的Main方法,而loadClass是不会去包里面查看BOOT-INF/classes或者BOOT-INF/lib下的内容的。这里涉及到springboot启动的时候的时候进行的类加载。

使用saturnmaven插件进行打包的实现莫过于将当前所有依赖的jar放入一个zip中。

启动的时候,传入-saturnLibDir -appLibDir -namespace,其中saturnLibDir也就是saturnexecutor/lib包将由SaturnClassLoader加载,appLibDir是上文提到的zip解压缩后文件夹,是由JobClassLoader加载。这种分开的方式可以有效去掉jar冲突。

运行启动人口为executor下的Main,核心启动为SaturnExecutor.buildExecutor。首先是初始化一些和日志相关配置。然后初始化Job,由于我们在写Job的时候,会在resources下配置saturn.properties,里面会写入app.class.

private static Object validateAndLoadSaturnApplication(ClassLoader jobClassLoader) {try {Properties properties = getSaturnProperty(jobClassLoader); //获取saturn.propertiesif (properties == null) {return null;}String appClassStr = properties.getProperty("app.class"); // 获取app.classif (StringUtils.isBlank(appClassStr)) {return null;}appClassStr = appClassStr.trim();ClassLoader oldCL = Thread.currentThread().getContextClassLoader();try {Thread.currentThread().setContextClassLoader(jobClassLoader);Class<?> appClass = jobClassLoader.loadClass(appClassStr); // 加载当前的appclassClass<?> saturnApplicationClass = jobClassLoader.loadClass(SATURN_APPLICATION_CLASS); // 加载实现SaturnApplication的类if (saturnApplicationClass.isAssignableFrom(appClass)) { // 判断初始化class 是否实现saturn的SaturnApplication借口Object saturnApplication = appClass.newInstance(); //初始化对象appClass.getMethod("init").invoke(saturnApplication);//调用初始化方法。return saturnApplication; // 这里返回的是Object,但是其实可以返回为SaturnApplication} else {throw new RuntimeException("the app.class " + appClassStr + " must be instance of " + SATURN_APPLICATION_CLASS);}} finally {Thread.currentThread().setContextClassLoader(oldCL);}} catch (RuntimeException e) {LogUtils.error(log, LogEvents.ExecutorEvent.INIT, "Fail to load SaturnApplication", e);throw e;} catch (Exception e) {LogUtils.error(log, LogEvents.ExecutorEvent.INIT, "Fail to load SaturnApplication", e);throw new RuntimeException(e);}}

如果是springboot项目,则可以直接使用saturnspringboot

public class GenericSpringBootSaturnApplication extends AbstractSpringSaturnApplication {@Overridepublic void init() {if (applicationContext != null) {destroy();}applicationContext = run();}@Overridepublic void destroy() {if (applicationContext != null) {SpringApplication.exit(applicationContext);applicationContext = null;}}/*** 启动SpringBoot,默认启动方式为SpringApplication.run(source()),其中source()可查看{@link #source()}方法*/protected ApplicationContext run() {return SpringApplication.run(source());}/*** 使用默认方式启动SpringBoot时,加载的source*/protected Object source() {return this.getClass();}}

其实可以看出,springboot项目主要就是为了获得这个ApplicationContext。后面saturnJob是从applicationContext中直接获取bean的,所以我们的job需要注册为springbean.

最后初始化SaturnExecutor的对象

private SaturnExecutor(String namespace, String executorName, ClassLoader executorClassLoader,ClassLoader jobClassLoader, Object saturnApplication) {this.executorName = executorName;this.namespace = namespace;this.executorClassLoader = executorClassLoader;this.jobClassLoader = jobClassLoader;this.saturnApplication = saturnApplication;this.raiseAlarmExecutorService = Executors.newSingleThreadExecutor(new SaturnThreadFactory(executorName + "-raise-alarm-thread", false));this.shutdownJobsExecutorService = Executors.newCachedThreadPool(new SaturnThreadFactory(executorName + "-shutdownJobSchedulers-thread", true));initRestartThread();registerShutdownHandler();}

其中initRestartThead会调用到

 public void execute() throws Exception {shutdownLock.lockInterruptibly();try {if (isShutdown) {return;}long startTime = System.currentTimeMillis();shutdown0();try {StartCheckUtil.add2CheckList(StartCheckItem.ZK, StartCheckItem.UNIQUE, StartCheckItem.JOBKILL);LogUtils.info(log, LogEvents.ExecutorEvent.INIT, "start to discover from saturn console");// 通过console的url,通过/rest/v1/discovery?namespace 获取到namespace的zkMap<String, String> discoveryInfo = discover();String zkConnectionString = discoveryInfo.get(DISCOVER_INFO_ZK_CONN_STR);if (StringUtils.isBlank(zkConnectionString)) {LogUtils.error(log, LogEvents.ExecutorEvent.INIT, "zk connection string is blank!");throw new RuntimeException("zk connection string is blank!");}saturnExecutorExtension.postDiscover(discoveryInfo);// 初始化注册中心initRegistryCenter(zkConnectionString.trim());// 检测是否存在仍然有正在运行的SHELL作业checkAndKillExistedShellJobs();// 初始化timeout schedulerTimeoutSchedulerExecutor.createScheduler(executorName);// 先注册Executor再启动作业,防止Executor因为一些配置限制而抛异常了,而作业线程已启动,导致作业还运行了一会// 在$SaturnExecutors/executors下注册当前的executor,包括{lastBeginTime,clean,version,ip}registerExecutor();// 启动定时清空nohup文件的线程periodicTruncateNohupOutService = new PeriodicTruncateNohupOutService(executorName);periodicTruncateNohupOutService.start();// 启动零点清0成功数错误数的线程resetCountService = new ResetCountService(executorName);resetCountService.startRestCountTimer();// 添加新增作业时的回调方法,启动已经存在的作业saturnExecutorService.registerJobsWatcher();} catch (Throwable t) {saturnExecutorExtension.handleExecutorStartError(t);shutdown0();throw t;}} finally {shutdownLock.unlock();}}

首先将当前的executor注册到zk上,然后对当前namespace下的$JOB节点进行监听。

 public void registerJobsWatcher() throws Exception {if (initNewJobService != null) {initNewJobService.shutdown();}initNewJobService = new InitNewJobService(this);initNewJobService.start();}
//InitNewJobService
public void start() throws Exception {treeCache = TreeCache.newBuilder((CuratorFramework) regCenter.getRawClient(), JobNodePath.ROOT).setExecutor(new CloseableExecutorService(Executors.newSingleThreadExecutor(new SaturnThreadFactory(executorName + "-$Jobs-watcher", false)),true)).setMaxDepth(1).build();executorService = Executors.newSingleThreadExecutor(new SaturnThreadFactory(executorName + "-initNewJob-thread", false));treeCache.getListenable().addListener(new InitNewJobListener(), executorService);treeCache.start();}

InitNewJobListener中是负责对当前节点node的监听,使用的是zktreecache,Curator包括了Node Cache 、Path Cache、Tree Cache三类。其中Node Cache节点缓存可以用于ZNode节点的监听,Path Cache子节点缓存用于ZNode的子节点的监听,而Tree Cache树缓存是Path Cache的增强,不光能监听子节点,也能监听ZNode节点自身。

监听的事件核心处理逻辑为:

     String jobName = StringUtils.substringAfterLast(path, "/");// /${namespace}/$Jobs/${jobName}/${jobclass}String jobClassPath = JobNodePath.getNodeFullPath(jobName, ConfigurationNode.JOB_CLASS);// wait 5 seconds at most until jobClass created .WAIT_JOBCLASS_ADDED_COUNT=25for (int i = 0; i < WAIT_JOBCLASS_ADDED_COUNT; i++) {// 判断是否已经存在当前job的完整classif (!regCenter.isExisted(jobClassPath)) {Thread.sleep(200L);continue;}// 是否已经创建if (!jobNames.contains(jobName)) {if (canInitTheJob(jobName) && initJobScheduler(jobName)) {//加入已经创建的list,为什么不用set也很奇怪。jobNames.add(jobName);}} else {//logbreak;}

创建分为两种,如果当前的executor配置了VIP_SATURN_INIT_JOB_BY_GROUPS的值,也就是配置了group则只初始group中的数据。否则初始化所由的job。而初始化job位于initJobScheduler:

private boolean initJobScheduler(String jobName) {try {// job 创建失败的记录JOB_INIT_FAILED_RECORDS.get(executorName).putIfAbsent(jobName, new HashSet<Integer>());// 所有在console创建job或者update job的参数,从注册中心拉JobConfiguration jobConfig = new JobConfiguration(regCenter, jobName);if (jobConfig.getSaturnJobClass() == null) {throw new JobException("unexpected error, the saturnJobClass cannot be null, jobName is %s, jobType is %s",jobName, jobConfig.getJobType());}if (jobConfig.isDeleting()) {String serverNodePath = JobNodePath.getServerNodePath(jobName, executorName);regCenter.remove(serverNodePath);LogUtils.warn(log, jobName, "the job is on deleting");return false;}//新建JobScheduler 对象JobScheduler scheduler = new JobScheduler(regCenter, jobConfig);// 设置saturnExecutorService,主要包含了executor的注册信息和关系scheduler.setSaturnExecutorService(saturnExecutorService);//初始化scheduler.init();// 如果创建成功,则从当前失败记录里移除JOB_INIT_FAILED_RECORDS.get(executorName).get(jobName).clear();return true;} catch (JobInitAlarmException e) {if (!SystemEnvProperties.VIP_SATURN_DISABLE_JOB_INIT_FAILED_ALARM) {// no need to log exception stack as it should be logged in the original happen placeraiseAlarmForJobInitFailed(jobName, e);}} catch (Throwable t) {LogUtils.warn(log, jobName, "job initialize failed, but will not stop the init process", t);}return false;}

可以看到,重点最后都在JobScheduler这个类中,而这个类应该是和Job一对一的。

 public JobScheduler(final CoordinatorRegistryCenter coordinatorRegistryCenter,final JobConfiguration jobConfiguration) {this.jobName = jobConfiguration.getJobName();this.executorName = coordinatorRegistryCenter.getExecutorName();this.currentConf = jobConfiguration;this.coordinatorRegistryCenter = coordinatorRegistryCenter;this.jobNodeStorage = new JobNodeStorage(coordinatorRegistryCenter, jobConfiguration);initExecutorService();JobRegistry.addJobScheduler(executorName, jobName, this);zkCacheManager = new ZkCacheManager((CuratorFramework) coordinatorRegistryCenter.getRawClient(), jobName,executorName);// 包含一些job的config信息,包括cron等信息configService = new ConfigurationService(this);//选举,每一个job都有自己的leader,路径 /${namespace}/$Jobs/${job}/leader/election/leaderElectionService = new LeaderElectionService(this);//主要用于 持久化/${namespace}/$Jobs/${job}/servers下面的executor信息,包括ip,执行的成功失败等信息// 还会有标记是否立即执行,该节点为临时节点serverService = new ServerService(this);//用于分片信息shardingService = new ShardingService(this);//执行上下文executionContextService = new ExecutionContextService(this);//执行作业的服务,主要包含更新下次启动时间,注册作业完成情况等信息executionService = new ExecutionService(this);// 失败转移failoverService = new FailoverService(this);//统计作业执行情况statisticsService = new StatisticsService(this);//执行情况统计analyseService = new AnalyseService(this);// 每一个namespace 最多执行500个job VIP_SATURN_MAX_NUMBER_OF_JOBS=500limitMaxJobsService = new LimitMaxJobsService(this);//针对上面的所有service,都会有对应的zklisterner,这里主要是将所有的service和zk节点进行注册listenerManager = new ListenerManager(this);//上传结果的信息reportService = new ReportService(this);}

后面针对上诉的service在执行的过程中的作用在展开,现在进入JobScheduler.init().

 public void init() {try {startAll();createJob();serverService.persistServerOnline(job);// Notify job enabled or disabled after that all are ready, include job was initialized.configService.notifyJobEnabledOrNot();} catch (Throwable t) {shutdown(false);throw t;}}

初始化涉及到两步,第一步就是开始前面所有的service,然后就是创建job。由于service是跟随着job的生命周期来运行的,所以关注于job的生命周期,也就是它的初始化,运行,运行状态转换和停止等。其中createJob就是涉及到job的初始化:

 private void createJob() {//从typemanager 中获取到当前的job类型,本文主要涉及为javaClass<?> jobClass = currentConf.getSaturnJobClass();try {job = (AbstractElasticJob) jobClass.newInstance();} catch (Exception e) {LogUtils.error(log, jobName, "unexptected error", e);throw new JobException(e);}//设置JobScheduler 中的servicejob.setJobScheduler(this);job.setConfigService(configService);job.setShardingService(shardingService);job.setExecutionContextService(executionContextService);job.setExecutionService(executionService);job.setFailoverService(failoverService);job.setServerService(serverService);job.setExecutorName(executorName);job.setReportService(reportService);job.setJobName(jobName);job.setNamespace(coordinatorRegistryCenter.getNamespace());job.setSaturnExecutorService(saturnExecutorService);// 初始化jobjob.init();}

所有的jobType:

public void registerJobType() {JobTypeManager.register( //JobTypeBuilder.newBuilder().name("JAVA_JOB").cron().java().allowedShutdownGracefully().triggerClass(CronTrigger.class).handlerClass(SaturnJavaJob.class).build());JobTypeManager.register( //JobTypeBuilder.newBuilder().name("SHELL_JOB").cron().shell().allowedShutdownGracefully().triggerClass(CronTrigger.class).handlerClass(SaturnScriptJob.class).build());JobTypeManager.register( //JobTypeBuilder.newBuilder().name("PASSIVE_JAVA_JOB").passive().java().allowedShutdownGracefully().triggerClass(PassiveTrigger.class).handlerClass(SaturnJavaJob.class).build());JobTypeManager.register( //JobTypeBuilder.newBuilder().name("PASSIVE_SHELL_JOB").passive().shell().allowedShutdownGracefully().triggerClass(PassiveTrigger.class).handlerClass(SaturnScriptJob.class).build());
}

java相关的job最后使用的对象为SaturnJavaJob。首先看下具体的一个业务任务代码。

// AbstractSaturnJavaJob extends AbstractElasticJob
@Component
public class DemoJob extends AbstractSaturnJavaJob {private static final Logger log = LoggerFactory.getLogger(DemoJob.class);@Resourceprivate DemoService demoService;@Overridepublic SaturnJobReturn handleJavaJob(String jobName, Integer shardItem, String shardParam,SaturnJobExecutionContext shardingContext) throws InterruptedException {log.info("{} is running, item is {}", jobName, shardItem);demoService.doing();return new SaturnJobReturn();}}

回到SaturnJavaJob.init

public void init() {//AbstractSaturnJavaJob.init(super.init()):{ //代码块为super.init()Class<? extends Trigger> triggerClass = configService.getJobType().getTriggerClass();Trigger trigger = null;try {trigger = triggerClass.newInstance();trigger.init(this);} catch (Exception e) {LogUtils.error(log, jobName, "Trigger init failed", e);throw new JobException(e);}scheduler = new SaturnScheduler(this, trigger);scheduler.start();getExecutorService();}// 创建job的对象createJobBusinessInstanceIfNecessary();// 获取job的versiongetJobVersionIfNecessary();
}

更具对应的job类型获取到Trigger类型。然后初始化TriggerSaturnScheduler。并且启动SaturnScheduler。最后是将任务交给SaturnWorker对象执行:

//new SaturnWorker(job, trigger.createTriggered(false, null), trigger.createQuartzTrigger()); SaturnScheduler创建方法Triggered 方法其实主要用于判断是否已经执行
public SaturnWorker(AbstractElasticJob job, Triggered notTriggered, Trigger trigger) {this.job = job;this.notTriggered = notTriggered;this.triggered = notTriggered;initTrigger(trigger);//  this.triggerObj = (OperableTrigger) trigger; 初始化triggerObj
}
SaturnWorker.run()
while (!halted.get()) {try {synchronized (sigLock) {while (paused && !halted.get()) {try {sigLock.wait(1000L);} catch (InterruptedException ignore) {}}if (halted.get()) {break;}}boolean noFireTime = false; // 没有下次执行时间,初始化为falselong timeUntilTrigger = 1000;// 即上文传入的trigger,if (triggerObj != null) {triggerObj.updateAfterMisfire(null);long now = System.currentTimeMillis();Date nextFireTime = triggerObj.getNextFireTime();if (nextFireTime != null) {timeUntilTrigger = nextFireTime.getTime() - now;} else {noFireTime = true;}}while (!noFireTime && timeUntilTrigger > 2) {synchronized (sigLock) {// 是否停止if (halted.get()) {break;}//是否已经执行if (triggered.isYes()) {break;}try {sigLock.wait(timeUntilTrigger);} catch (InterruptedException ignore) {}// 计算下次执行时间if (triggerObj != null) {long now = System.currentTimeMillis();Date nextFireTime = triggerObj.getNextFireTime();if (nextFireTime != null) {timeUntilTrigger = nextFireTime.getTime() - now;} else {noFireTime = true;}}}}boolean goAhead;Triggered currentTriggered = notTriggered; //notTriggered = trigger.createTriggered(false, null)// 触发执行只有两个条件:1.时间到了 2.点立即执行synchronized (sigLock) {// 是否停止或者暂停goAhead = !halted.get() && !paused;// 重置立即执行标志,赋值当前立即执行数据if (triggered.isYes()) { // 初始化为falsecurrentTriggered = triggered;triggered = notTriggered;} else if (goAhead) { // 非立即执行。即,执行时间到了,或者没有下次执行时间goAhead = goAhead && !noFireTime; // 有下次执行时间,即执行时间到了,才执行作业if (goAhead) { // 执行时间到了,更新执行时间if (triggerObj != null) {triggerObj.triggered(null);//这里使用的是quartz,传入null用于计算下次cron的时间}} else { // 没有下次执行时间,则尝试睡一秒,防止不停的循环导致CPU使用率过高(如果cron不再改为周期性执行)try {sigLock.wait(1000L);} catch (InterruptedException ignore) {}}}}// job开始执行if (goAhead) {job.execute(currentTriggered);}} catch (RuntimeException e) {LogUtils.error(log, job.getJobName(), e.getMessage(), e);}}

由于saturnworker是异步放入线程池进行运行,所以此时主线程已经进入了下一步init方法:

private void createJobBusinessInstanceIfNecessary() {// 从configuration也就是zk中获取到job的classString jobClassStr = configService.getJobConfiguration().getJobClass();if (StringUtils.isBlank(jobClassStr)) {LogUtils.error(log, jobName, "jobClass is not set");throw new JobInitAlarmException("jobClass is not set");}jobClassStr = jobClassStr.trim();LogUtils.info(log, jobName, "start to create job business instance, jobClass is {}", jobClassStr);if (jobBusinessInstance == null) {ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();ClassLoader jobClassLoader = saturnExecutorService.getJobClassLoader();// 还是上文中的JobclassloaderThread.currentThread().setContextClassLoader(jobClassLoader);try {// 如果说,我们使用springboot的fatjar,那么在这里,就不能够获取到springbootjar包中的/BOOT-INF/Classes,从而爆出CLassNotfound的一场Class<?> jobClass = jobClassLoader.loadClass(jobClassStr);//从上文中,如果是spring项目,就会有applicationcontext中获取到对应的jobbean,该方法调用的其实是AbstractSaturnApplication,下的getJobInstance。也就是说,如果不想通过spring来管理自己的job,那么可以在对应的application中实现getJobInstance 方法,该方法需要返回对应class的job实例。spring中的实现 applicationContext.getBean(jobClass) 。jobBusinessInstance = tryToGetSaturnBusinessInstanceFromSaturnApplication(jobClassLoader, jobClass);if (jobBusinessInstance == null) {try {// 如果没有从application中获取到,那么可以调用当前class的静态方法getObject,来初始化job。jobBusinessInstance = jobClass.getMethod("getObject").invoke(null);if (jobBusinessInstance != null) {LogUtils.info(log, jobName, "get job instance from getObject");}} catch (NoSuchMethodException e) {LogUtils.info(log, jobName,"the jobClass hasn't the static getObject method, will initialize job by default no arguments constructor method");}}// 业务没有重写getObject方法,BaseSaturnJob会默认返回nullif (jobBusinessInstance == null) {jobBusinessInstance = jobClass.newInstance();LogUtils.info(log, jobName, "get job instance from newInstance");}// 该类暂时没发现有啥用处,setSaturnApi也是为空实现SaturnApi saturnApi = new SaturnApi(getNamespace(), executorName);jobClass.getMethod("setSaturnApi", Object.class).invoke(jobBusinessInstance, saturnApi);} catch (Throwable t) {throw new JobInitAlarmException(logBusinessExceptionIfNecessary(jobName, t));} finally {Thread.currentThread().setContextClassLoader(oldClassLoader);}}if (jobBusinessInstance == null) {LogUtils.error(log, jobName, "job instance is null");throw new JobInitAlarmException("job instance is null");}
}

现在,saturnworker线程已经开始运行,而job也已经初始化完成。

Job的执行

上文已经将job初始化,而且其实已经调用了job.execute(currentTriggered);,也就是job其实已经开始进入执行阶段,接下来会看下job的具体执行过程:

//AbstractElasticJob
public final void execute(final Triggered triggered) {LogUtils.debug(log, jobName, "Saturn start to execute job [{}]", jobName);// 对每一个jobScheduler,作业对象只有一份,多次使用,所以每次开始执行前先要resetreset();if (configService == null) {LogUtils.warn(log, jobName, "configService is null");return;}JobExecutionMultipleShardingContext shardingContext = null;try {// 如果当前的job运行上报,cron和passive作业默认上报,而且当前host下没有分配failover的,则进行分片// job在执行的过程中,会在execution下if (!configService.isEnabledReport() || failoverService.getLocalHostFailoverItems().isEmpty()) {// 如果当前允许上报,或者当前executor的失败转移分片为空,则进行分片shardingService.shardingIfNecessary();}// 判断当前job是否执行if (!configService.isJobEnabled()) {LogUtils.debug(log, jobName, "{} is disabled, cannot be continued, do nothing about business.",jobName);return;}//分片上下文shardingContext = executionContextService.getJobExecutionShardingContext(triggered);if (shardingContext.getShardingItems() == null || shardingContext.getShardingItems().isEmpty()) {LogUtils.debug(log, jobName, "{} 's items of the executor is empty, do nothing about business.",jobName);callbackWhenShardingItemIsEmpty(shardingContext);return;}if (configService.isInPausePeriod()) {LogUtils.info(log, jobName,"the job {} current running time is in pausePeriod, do nothing about business.", jobName);return;}executeJobInternal(shardingContext);if (isFailoverSupported() && configService.isFailover() && !stopped && !forceStopped && !aborted) {failoverService.failoverIfNecessary();}LogUtils.debug(log, jobName, "Saturn finish to execute job [{}], sharding context:{}.", jobName,shardingContext);} catch (Exception e) {LogUtils.warn(log, jobName, e.getMessage(), e);} finally {running = false;}
}

判断是否需要分片中,会有判断是否有运行本作业服务器的失败转移序列号

 /*** 获取运行在本作业服务器的失效转移序列号.** @return 运行在本作业服务器的失效转移序列号*/public List<Integer> getLocalHostFailoverItems() {//zk中的路径为:/${namespace}/$Jobs/${jonName}/execution 里面是分片数组List<String> items = getJobNodeStorage().getJobNodeChildrenKeys(ExecutionNode.ROOT);List<Integer> result = new ArrayList<>(items.size());for (String each : items) {int item = Integer.parseInt(each);// zk路径为/${namespace}/$Jobs/${jonName}/execution/${item}/failoverString node = FailoverNode.getExecutionFailoverNode(item);// 判断是否存在/${namespace}/$Jobs/${jonName}/execution/${item}/failover//上诉路径为一个零时节点,如果该节点存在,那么failover中的值为executorNameif (getJobNodeStorage().isJobNodeExisted(node) && executorName.equals(getJobNodeStorage().getJobNodeDataDirectly(node))) {result.add(item);}}Collections.sort(result);return result;}

由于cron是默认需要分片的,所以其实从某种意义上来说每次执行的时候都会进行分片

/*** 如果需要分片且当前节点为主节点, 则作业分片.*/public synchronized void shardingIfNecessary() throws JobShuttingDownException {if (isShutdown) {return;}GetDataStat getDataStat = null;// 从/${namespace}/$Jobs/${jonName}/leader/sharding/necessary中获取数据if (getJobNodeStorage().isJobNodeExisted(ShardingNode.NECESSARY)) {getDataStat = getNecessaryDataStat();}// sharding necessary内容为空,或者内容是"0"则返回,否则,需要进行sharding处理if (getDataStat == null || SHARDING_UN_NECESSARY.equals(getDataStat.getData())) {return;}// 如果不是leader,则等待leader处理完成(这也是一个死循环,知道满足跳出循环的条件:1. 被shutdown 2. 无须sharding而且不处于processing状态)if (blockUntilShardingComplatedIfNotLeader()) {return;}// 如果有作业分片处于running状态则等待(无限期)waitingOtherJobCompleted();// 建立一个临时节点,标记shardig处理中//processinggetJobNodeStorage().fillEphemeralJobNode(ShardingNode.PROCESSING, "");try {// 删除作业下面的所有JobServer的sharding节点clearShardingInfo();int retryCount = 3;while (!isShutdown) {boolean needRetry = false;int version = getDataStat.getVersion();// 首先尝试从job/leader/sharding/neccessary节点获取,如果失败,会从$SaturnExecutors/sharding/content下面获取// key is executor, value is sharding itemsMap<String, List<Integer>> shardingItems = namespaceShardingContentService.getShardContent(jobName, getDataStat.getData());try {// 所有jobserver的(检查+创建),加上设置sharding necessary内容为0,都是一个事务CuratorTransactionFinal curatorTransactionFinal = getJobNodeStorage().getClient().inTransaction().check().forPath("/").and();for (Entry<String, List<Integer>> entry : shardingItems.entrySet()) {//创建/${namespace}/$Jobs/${jonName}/servers/${executorName}/sharding,写入一些必要的数据curatorTransactionFinal.create().forPath(JobNodePath.getNodeFullPath(jobName, ShardingNode.getShardingNode(entry.getKey())),ItemUtils.toItemsString(entry.getValue()).getBytes(StandardCharsets.UTF_8)).and();}curatorTransactionFinal.setData().withVersion(version).forPath(JobNodePath.getNodeFullPath(jobName, ShardingNode.NECESSARY),SHARDING_UN_NECESSARY.getBytes(StandardCharsets.UTF_8)).and();curatorTransactionFinal.commit();} catch (BadVersionException e) {LogUtils.warn(log, jobName, "zookeeper bad version exception happens.", e);needRetry = true;retryCount--;} catch (Exception e) {// 可能多个sharding task导致计算结果有滞后,但是server机器已经被删除,导致commit失败// 实际上可能不影响最终结果,仍然能正常分配分片,因为还会有resharding事件被响应// 修改日志级别为warn级别,避免不必要的告警LogUtils.warn(log, jobName, "Commit shards failed", e);}if (needRetry) {if (retryCount >= 0) {LogUtils.info(log, jobName,"Bad version because of concurrency, will retry to get shards later");Thread.sleep(200L); // NOSONARgetDataStat = getNecessaryDataStat();} else {LogUtils.warn(log, jobName, "Bad version because of concurrency, give up to retry");break;}} else {break;}}} catch (Exception e) {LogUtils.error(log, jobName, e.getMessage(), e);} finally {getJobNodeStorage().removeJobNodeIfExisted(ShardingNode.PROCESSING);}}

分片完成以后,执行之前去获取当前executor的分片信息:

/*** 获取当前作业服务器运行时分片上下文.** @return 当前作业服务器运行时分片上下文*/public JobExecutionMultipleShardingContext getJobExecutionShardingContext(final Triggered triggered) {SaturnExecutionContext result = new SaturnExecutionContext();result.setJobName(configService.getJobName());result.setShardingTotalCount(configService.getShardingTotalCount());result.setTriggered(triggered);//从/${namespace}/$Jobs/${jonName}/servers/${executorName}/sharding中获取本地的,这个值是在上文中的分片中写入的List<Integer> shardingItems = getShardingItems();boolean isEnabledReport = configService.isEnabledReport();if (isEnabledReport) {//从${namespace}/$Jobs/${jonName}/execution/${item}/running 获取到正在执行的itemremoveRunningItems(shardingItems);}// 设置当前需要执行的分片result.setShardingItems(shardingItems);// 传入的参数result.setJobParameter(configService.getJobParameter());// 业务上下文result.setCustomContext(configService.getCustomContext());result.setJobConfiguration(jobConfiguration);if (coordinatorRegistryCenter != null) {result.setNamespace(coordinatorRegistryCenter.getNamespace());result.setExecutorName(coordinatorRegistryCenter.getExecutorName());}if (result.getShardingItems().isEmpty()) {return result;}// 获取参数对照表Map<Integer, String> shardingItemParameters = configService.getShardingItemParameters();if (shardingItemParameters.containsKey(-1)) { // 本地模式for (int each : result.getShardingItems()) {result.getShardingItemParameters().put(each, shardingItemParameters.get(-1));}} else {for (int each : result.getShardingItems()) {if (shardingItemParameters.containsKey(each)) {result.getShardingItemParameters().put(each, shardingItemParameters.get(each));}}}if (jobConfiguration.getTimeoutSeconds() > 0) {result.setTimetoutSeconds(jobConfiguration.getTimeoutSeconds());}// 返回执行上下文return result;}

获取到分片信息,开始执行:

private void executeJobInternal(final JobExecutionMultipleShardingContext shardingContext) throws Exception {//注册分片执行状态为runningexecutionService.registerJobBegin(shardingContext);try {//进入执行逻辑executeJob(shardingContext);} finally {List<Integer> shardingItems = shardingContext.getShardingItems();if (!shardingItems.isEmpty()) {Date nextFireTimePausePeriodEffected = jobScheduler.getNextFireTimePausePeriodEffected();boolean isEnabledReport = configService.isEnabledReport();for (int item : shardingItems) {if (isEnabledReport && !checkIfZkLostAfterExecution(item)) {continue;}if (!aborted) {//作业完成信息注册,此信息用于页面展现。注意,无论作业是否上报状态(对应/config/enabledReport/节点),都会注册此信息。executionService.registerJobCompletedByItem(shardingContext, item, nextFireTimePausePeriodEffected);}// 如果当前为失败转移的分片,则从失败转移列表中移除当前的分片节点if (isFailoverSupported() && configService.isFailover()) {failoverService.updateFailoverComplete(item);}}}// 执行依赖作业,是通过http接口调用consoleUri + "/rest/v1/" + namespace + "/jobs/" + jobName + "/runDownStream";完成runDownStream(shardingContext);}
}

最后执行executeJob:

protected final void executeJob(final JobExecutionMultipleShardingContext shardingContext) {if (!(shardingContext instanceof SaturnExecutionContext)) {LogUtils.error(log, jobName, "!!! The context must be instance of SaturnJobExecutionContext !!!");return;}long start = System.currentTimeMillis();SaturnExecutionContext saturnContext = (SaturnExecutionContext) shardingContext;saturnContext.setSaturnJob(true);// 针对分片返回结果Map<Integer, SaturnJobReturn> retMap = new HashMap<Integer, SaturnJobReturn>();// shardingItemParameters为参数表解析出来的Key/Value值Map<Integer, String> shardingItemParameters = saturnContext.getShardingItemParameters();// items为需要处理的作业分片List<Integer> items = saturnContext.getShardingItems();LogUtils.info(log, jobName, "Job {} handle items: {}", jobName, items);for (Integer item : items) {// 兼容配置错误,如配置3个分片, 参数表配置为0=*, 2=*, 则1分片不会执行if (!shardingItemParameters.containsKey(item)) {LogUtils.error(log, jobName,"The {} item's parameter is not valid, will not execute the business code, please check shardingItemParameters",items);SaturnJobReturn errRet = new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL,"Config of parameter is not valid, check shardingItemParameters", SaturnSystemErrorGroup.FAIL);retMap.put(item, errRet);}}Map<Integer, SaturnJobReturn> handleJobMap = handleJob(saturnContext);if (handleJobMap != null) {retMap.putAll(handleJobMap);}// 汇总修改for (Integer item : items) {if (item == null) {continue;}SaturnJobReturn saturnJobReturn = retMap.get(item);if (saturnJobReturn == null) {saturnJobReturn = new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL,"Can not find the corresponding SaturnJobReturn", SaturnSystemErrorGroup.FAIL);retMap.put(item, saturnJobReturn);}// 将结果上传到ProcessCountStatistics,最后传入zkupdateExecuteResult(saturnJobReturn, saturnContext, item);}long end = System.currentTimeMillis();LogUtils.info(log, jobName, "{} finished, totalCost={}ms, return={}", jobName, (end - start), retMap);}

如果当前的jobjava,则进入:

//SaturnJavaJob
protected Map<Integer, SaturnJobReturn> handleJob(final SaturnExecutionContext shardingContext) {final Map<Integer, SaturnJobReturn> retMap = new HashMap<Integer, SaturnJobReturn>();synchronized (futureTaskMap) {futureTaskMap.clear();final String jobName = shardingContext.getJobName();final int timeoutSeconds = getTimeoutSeconds();ExecutorService executorService = getExecutorService();// 处理自定义参数String jobParameter = shardingContext.getJobParameter();// shardingItemParameters为参数表解析出来的Key/Value值Map<Integer, String> shardingItemParameters = shardingContext.getShardingItemParameters();for (final Entry<Integer, String> shardingItem : shardingItemParameters.entrySet()) {final Integer key = shardingItem.getKey();try {String jobValue = shardingItem.getValue();final String itemVal = getRealItemValue(jobParameter, jobValue); // 作业分片的对应值// 将当前的job放入异步现场池进行处理,让分片可以几乎并发执行的ShardingItemFutureTask shardingItemFutureTask = new ShardingItemFutureTask(createCallable(jobName, key, itemVal, timeoutSeconds, shardingContext, this), null);Future<?> callFuture = executorService.submit(shardingItemFutureTask);if (timeoutSeconds > 0) {TimeoutSchedulerExecutor.scheduleTimeoutJob(shardingContext.getExecutorName(), timeoutSeconds,shardingItemFutureTask);}shardingItemFutureTask.setCallFuture(callFuture);futureTaskMap.put(key, shardingItemFutureTask);} catch (Throwable t) {LogUtils.error(log, jobName, t.getMessage(), t);retMap.put(key, new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL, t.getMessage(),SaturnSystemErrorGroup.FAIL));}}}// 汇总执行的过程for (Entry<Integer, ShardingItemFutureTask> entry : futureTaskMap.entrySet()) {Integer item = entry.getKey();ShardingItemFutureTask futureTask = entry.getValue();try {futureTask.getCallFuture().get();} catch (Exception e) {LogUtils.error(log, jobName, e.getMessage(), e);retMap.put(item, new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL, e.getMessage(),SaturnSystemErrorGroup.FAIL));continue;}retMap.put(item, futureTask.getCallable().getSaturnJobReturn());}synchronized (futureTaskMap) {futureTaskMap.clear();}return retMap;}

最后是放入JavaShardingItemCallable进行执行的:

public SaturnJobReturn call() {reset();SaturnSystemOutputStream.initLogger();currentThread = Thread.currentThread();SaturnJobReturn temp = null;try {beforeExecution();// 调用saturn的抽象方法temp = doExecution();// 在此之后,不能再强制停止本线程breakForceStop = true;} catch (Throwable t) {// 在此之后,不能再强制停止本线程breakForceStop = true;// 不是超时,不是强制停止。 打印错误日志,设置SaturnJobReturn。if (status.get() != TIMEOUT && status.get() != FORCE_STOP) {LogUtils.error(log, jobName, t.toString(), t);temp = new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL, t.getMessage(),SaturnSystemErrorGroup.FAIL);}} finally {if (status.compareAndSet(INIT, SUCCESS)) {saturnJobReturn = temp;}if (saturnJob != null && saturnJob.getConfigService().showNormalLog()) {String jobLog = SaturnSystemOutputStream.clearAndGetLog();if (jobLog != null && jobLog.length() > SaturnConstant.MAX_JOB_LOG_DATA_LENGTH) {LogUtils.info(log, jobName,"As the job log exceed max length, only the previous {} characters will be reported",SaturnConstant.MAX_JOB_LOG_DATA_LENGTH);jobLog = jobLog.substring(0, SaturnConstant.MAX_JOB_LOG_DATA_LENGTH);}this.shardingContext.putJobLog(this.item, jobLog);}}return saturnJobReturn;}

细说状态

上文已经完整的走了一遍job的运行,接下来就来详细看下executorjob的状态,通过源码结合saturnwiki。架构图:

saturn是重度依赖zk的,无论是executor的管理,job的配置下发以及结果都是通过zk来做的。

executor的状态:

上线:

上文就已经提到,当上线的时候会在对应的/${namespace}/$SaturnExecutors/executors/${executorName}

而在console中,每一个namespace都会有对应的NamespaceShardingManager,里面会针对/${namespace}/$SaturnExecutors/executors/路径有监听器ExecuteOnlineShardingTask。当触发事件为TreeCacheEvent.Type.NODE_ADDED的时候,就会执行上线的逻辑,该逻辑会走到上文提到的AbstractAsyncShardingTask.run方法,会进行对应的分片算法(实现了AbstractAsyncShardingTask.pick)并且充分片后,将分片后的数据写入/${namespace}/$Jobs/${jonName}/leader/sharding/necessary,结合前文,每次job执行前回去判断是否需要分片,依据就是这个necessary是否为0,因为每次分片完成后,executorleader会将这个值写为0.

下线:和上线差不多,只是监听的事件为TreeCacheEvent.Type.NODE_REMOVED

job的状态:

job的状态分为两种,一种是认为操作,也就是从console上点击作业禁用,新增,启用,删除等等。这些操作其实都是从console的角度做的,也就是上面的NamespaceShardingManager,他在pick的时候,会将当前的作业从调度列表删除,然后executor在进行数据分片的时候就会针对当前的job进行调度或者不调度。

比如:

job的启用和禁用,console通过监听/job/config下的enable配置文件,最后通过增加或者移除当前executor上的job来实现。

还有一种就是job在运行过程中出现问题。需要转移分片等等,这些状态的变化是在executor上进行转换的,也就是JobScheduler.init()里面有个startAll会将所有的路径监听器启动:

 @Overridepublic void start() {electionListenerManager = new ElectionListenerManager(jobScheduler);failoverListenerManager = new FailoverListenerManager(jobScheduler);jobOperationListenerManager = new JobOperationListenerManager(jobScheduler);configurationListenerManager = new ConfigurationListenerManager(jobScheduler);shardingListenerManager = new ShardingListenerManager(jobScheduler);analyseResetListenerManager = new AnalyseResetListenerManager(jobScheduler);controlListenerManager = new ControlListenerManager(jobScheduler);electionListenerManager.start();failoverListenerManager.start();jobOperationListenerManager.start();configurationListenerManager.start();shardingListenerManager.start();analyseResetListenerManager.start();controlListenerManager.start();}

这里面部分监听器的实现,主要是为了让客户端感知到当前job的状态,如点击立即执行的时候,会被JobOperationListenerManager监听到,从而进行立即调度。当修改了状态如enable修改为false也就是禁用job的时候,ConfigurationListenerManager会将当前的config更新,然后在运行过程中AbstractElasticJob.execute会判断当前的job是否为enable状态。而且会调用jobonEnabled方法。

job运行后的failover:

上文提到,当作业开始运行的时候,会在对应的execution/${item}/上注册running的临时节点。当作业完成后,也会在注册completed信息。当作业开始运行的时候,FailoverListenerManager会监听execution路径,分为两种:

 class RunningPathListener implements NodeCacheListener {private int item;public RunningPathListener(int item) {this.item = item;}@Overridepublic void nodeChanged() throws Exception {// 由于当前的监听为execution/${item}/  该节点下在运行的时候会将executor的节点写入,如果发生节点变化,则说明两种情况:当前的分片依据完成,当前的分片异常关闭需要failover。//getJobNodeStorage().fillEphemeralJobNode(ExecutionNode.getRunningNode(item), executorName);zkCacheManager.getExecutorService().execute(new Runnable() {@Overridepublic void run() {try {if (isShutdown) {return;}if (!executionService.isRunning(item)) {failover(item);}} catch (Throwable t) {LogUtils.error(log, jobName, t.getMessage(), t);}}});}}

running节点数据发生变化分为三种情况,第一个是新增了running节点,说明分片正在运行,或者删除了running节点,那么说明两种情况,第一就是正常运行结束,还有一种就是异常失败:

private synchronized void failover(final Integer item) {if (jobScheduler == null || jobScheduler.getJob() == null) {return;}if (!jobScheduler.getJob().isFailoverSupported() || !configService.isFailover() || executionService.isCompleted(item)) {return;}failoverService.createCrashedFailoverFlag(item);if (!executionService.hasRunningItems(jobScheduler.getShardingService().getLocalHostShardingItems())) {failoverService.failoverIfNecessary();}
}

所以failover会判断当前是否运行异常转移和是否正常借宿,如果都不是,则创建failover,然后执行failover的逻辑:

class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {@Overridepublic void execute() {if (!needFailover()) {return;}if (jobScheduler == null) {return;}if (coordinatorRegistryCenter.isExisted(SaturnExecutorsNode.getExecutorNoTrafficNodePath(executorName))) {return;}if (!jobScheduler.getConfigService().getPreferList().contains(executorName) && !jobScheduler.getConfigService().isUseDispreferList()) {return;}List<String> items = getJobNodeStorage().getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT);if (items != null && !items.isEmpty()) {int crashedItem = Integer.parseInt(getJobNodeStorage().getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));LogUtils.debug(log, jobName, "Elastic job: failover job begin, crashed item:{}.", crashedItem);getJobNodeStorage().fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), executorName);getJobNodeStorage().removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));jobScheduler.triggerJob(null);}}
}

首先是在选主人。然后在FailoverLeaderExecutionCallback判断当前的节点是否被摘除流量,是否存在优先队列,是否有配置只使用优先地队列,如果说当前的配置了优先队列,但是当前的executor没有在此节点上,但是没有配置只使用优先队列,则可以进行失败转移,如果既设置了优先队列,又配置了只使用优先队列,而且当前的executor不属于优先队列,则不会进行失败转移。然后会先获取failover的列表,执行对应的分片,从而完成对应的failover

saturn 源码解析相关推荐

  1. 谷歌BERT预训练源码解析(二):模型构建

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/weixin_39470744/arti ...

  2. 谷歌BERT预训练源码解析(三):训练过程

    目录 前言 源码解析 主函数 自定义模型 遮蔽词预测 下一句预测 规范化数据集 前言 本部分介绍BERT训练过程,BERT模型训练过程是在自己的TPU上进行的,这部分我没做过研究所以不做深入探讨.BE ...

  3. 谷歌BERT预训练源码解析(一):训练数据生成

    目录 预训练源码结构简介 输入输出 源码解析 参数 主函数 创建训练实例 下一句预测&实例生成 随机遮蔽 输出 结果一览 预训练源码结构简介 关于BERT,简单来说,它是一个基于Transfo ...

  4. Gin源码解析和例子——中间件(middleware)

    在<Gin源码解析和例子--路由>一文中,我们已经初识中间件.本文将继续探讨这个技术.(转载请指明出于breaksoftware的csdn博客) Gin的中间件,本质是一个匿名回调函数.这 ...

  5. Colly源码解析——结合例子分析底层实现

    通过<Colly源码解析--框架>分析,我们可以知道Colly执行的主要流程.本文将结合http://go-colly.org上的例子分析一些高级设置的底层实现.(转载请指明出于break ...

  6. libev源码解析——定时器监视器和组织形式

    我们先看下定时器监视器的数据结构.(转载请指明出于breaksoftware的csdn博客) /* invoked after a specific time, repeatable (based o ...

  7. libev源码解析——定时器原理

    本文将回答<libev源码解析--I/O模型>中抛出的两个问题.(转载请指明出于breaksoftware的csdn博客) 对于问题1:为什么backend_poll函数需要指定超时?我们 ...

  8. libev源码解析——I/O模型

    在<libev源码解析--总览>一文中,我们介绍过,libev是一个基于事件的循环库.本文将介绍其和事件及循环之间的关系.(转载请指明出于breaksoftware的csdn博客) 目前i ...

  9. libev源码解析——调度策略

    在<libev源码解析--监视器(watcher)结构和组织形式>中介绍过,监视器分为[2,-2]区间5个等级的优先级.等级为2的监视器最高优,然后依次递减.不区分监视器类型和关联的文件描 ...

最新文章

  1. Servlet中的配置 web.xml
  2. 爬虫必备工具,掌握它就解决了一半的问题
  3. java静态链表_用Java实现一个静态链表
  4. SuperMap.IS.AjaxScript缓冲区分析及专题图制作
  5. Stream流中的常用方法_limit
  6. java hibernate 分页查询_4 Hibernate HQL查询,分页查询
  7. KLT(Kanade-Lucas-Tomasi )
  8. SQLi LABS Less-6 报错注入+布尔盲注
  9. LayuiAdmin 滚动条设置问题解决
  10. 用HTML简单制作一个网页
  11. 14.拉格朗日插值公式
  12. pyodbc 测试连接 SQL Server 数据库
  13. 下载新版火狐后无法同步书签_firefox无法同步书签,恢复备份文件失败的解决办法...
  14. 操作员或系统管理员_什么是系统管理员?
  15. python写诗代码_牛逼了,用Python写个会做诗的机器人
  16. 使用Tiled编辑铁锈战争自定义地图
  17. php unlink没有权限,php中删除文件用unlink函数权限判断_PHP教程
  18. 显卡天梯图2020年10月完整版
  19. FileZilla 下载
  20. springboot+vue基本微信小程序的疫情防控平台系统 计算机毕业设计

热门文章

  1. 2022年秋季期寒假总结
  2. ptapython答案第四章_ptapython答案
  3. DSPE-PEG-VAP/DCDX/LyP-1/M2pep/GLP-1/HP2/FNB/CPPs/CGKRKb 磷脂-聚乙二醇-多肽
  4. 247、(案例)javaScript对象--注册表单升级,添加表单校验(利用正则表达式)
  5. 微信7.0.16 for iOS测试版新增隐藏会话功能
  6. 李雅普诺夫稳定性判别方法
  7. Java对中文进行排序
  8. Vue - 实现微信扫码登录功能(项目植入微信扫码登录功能)超详细完整流程详解及详细代码及注释,附带完整功能源码、常见问题解决方案
  9. sh脚本中一些命令使用总结及sed命令
  10. MIKE 21 教程 1.2 网格搭建界面介绍之点线面要素的高阶处理 (Mesh Generator 工具)