saturn 源码解析
saturn-console
启动
saturn console
本身为一个springboot
项目,所有的bean
都在saturn-console
下的applicationContext.xml
中定义。所以启动主要就是初始化负责各种操作的bean
。
<!-- ----- 权限管理主要针对job -------->
<bean id="authorizationService"class="com.vip.saturn.job.console.service.impl.AuthorizationServiceImpl"/> <bean id="authorizationManageServiceImpl"class="com.vip.saturn.job.console.service.impl.AuthorizationManageServiceImpl"/><!-- ----- 系统配置管理,console中的一些系统配置相关 --------><bean id="systemConfigService"class="com.vip.saturn.job.console.service.impl.SystemConfigServiceImpl"/><!-- ----- 告警 --------><bean id="alarmStatisticsService"class="com.vip.saturn.job.console.service.impl.AlarmStatisticsServiceImpl"/><!-- ----- 主页和统计信息 --------><bean id="dashboardService"class="com.vip.saturn.job.console.service.impl.DashboardServiceImpl"/><!-- ----- 执行器管理 --------><bean id="executorService"class="com.vip.saturn.job.console.service.impl.ExecutorServiceImpl"/><!-- ----- job --------><bean id="jobService"class="com.vip.saturn.job.console.service.impl.JobServiceImpl"/><!-- ----- 域和zk集群管理 --------><bean id="namespaceZkClusterMappingService"class="com.vip.saturn.job.console.service.impl.NamespaceZkClusterMappingServiceImpl"/><!-- ----- 报告告警 --------><bean id="reportAlarmService"class="com.vip.saturn.job.console.service.impl.ReportAlarmServiceImpl"/><!-- ----- 判断cron --------><bean id="utilsService"class="com.vip.saturn.job.console.service.impl.UtilsServiceImpl"/><!-- ----- 判断zk和数据库是不同 --------><bean id="zkDBDiffService"class="com.vip.saturn.job.console.service.impl.ZkDBDiffServiceImpl"/><!-- ----- 查看和管理zk中的path --------><bean id="zkTreeService"class="com.vip.saturn.job.console.service.impl.ZkTreeServiceImpl"/><!-- ----- 更新job的配置 --------><bean id="updateJobConfigService"class="com.vip.saturn.job.console.service.impl.UpdateJobConfigServiceImpl"/><!-- ----- RestApi 针对job的一些http请求 --------><bean id="restApiService" class="com.vip.saturn.job.console.service.impl.RestApiServiceImpl"/>
<!-- ----- 将执行的结果(每天执行多少,失败等等)持久化到数据库 --------><bean id="statisticPersistence"class="com.vip.saturn.job.console.service.impl.statistics.StatisticsPersistence"/>
<!-- ----- dashboard统计数据(执行多少,失败等等) --------><bean id="statisticRefreshService"class="com.vip.saturn.job.console.service.impl.statistics.StatisticsRefreshServiceImpl"/>
<!-- ----- 注册中心管理 --------><bean id="registryCenterService"class="com.vip.saturn.job.console.service.impl.RegistryCenterServiceImpl"/>
<!-- ----- 权限,登陆 --------><bean id="authenticationService"class="com.vip.saturn.job.console.service.impl.AuthenticationServiceImpl"/>
上诉的bean
中,大部分都是一些定时任务。重点关注namespace
和job
管理,包括创建,分片等等。
namespace的创建
在console
创建域,需要传入namespace
和zkCluster
,后台由RegistryCenterController
中createNamespace
进行创建。核心逻辑如下:
@Transactional(rollbackFor = {Exception.class})@Overridepublic void createNamespace(NamespaceDomainInfo namespaceDomainInfo) throws SaturnJobConsoleException {String namespace = namespaceDomainInfo.getNamespace();String zkClusterKey = namespaceDomainInfo.getZkCluster();// 从注册中心查询是否存在该zk集群ZkCluster currentCluster = getZkCluster(zkClusterKey);if (currentCluster == null) {throw new SaturnJobConsoleHttpException(HttpStatus.BAD_REQUEST.value(),String.format(ERR_MSG_TEMPLATE_FAIL_TO_CREATE, namespace, "not found zkcluster" + zkClusterKey));}// 判断当前所有的集群下是否有该namespace,也就是说namespace是所有集群唯一的if (checkNamespaceExists(namespace)) {throw new SaturnJobConsoleHttpException(HttpStatus.BAD_REQUEST.value(),String.format(ERR_MSG_NS_ALREADY_EXIST, namespace));}try {// 创建 namespaceInfoNamespaceInfo namespaceInfo = constructNamespaceInfo(namespaceDomainInfo);namespaceInfoService.create(namespaceInfo);// 创建 zkcluster 和 namespaceInfo 关系,并写入数据库,如果从现有数据库中发现有该namespace,则更新数据,否则插入新数据。namespaceZkClusterMapping4SqlService.insert(namespace, "", zkClusterKey, NAMESPACE_CREATOR_NAME);// refresh 数据到注册中心,该方法不是通过直接刷新数据到zk,而是通过刷新sys_config 中的uid来异步进行刷新notifyRefreshRegCenter();} catch (Exception e) {log.error(e.getMessage(), e);throw new SaturnJobConsoleHttpException(HttpStatus.INTERNAL_SERVER_ERROR.value(),String.format(ERR_MSG_TEMPLATE_FAIL_TO_CREATE, namespace, e.getMessage()));}}
上文中代码会和RegistryCenterServiceImpl
进行交互。
public void init() {getConsoleClusterId();localRefresh();initLocalRefreshThreadPool();startLocalRefreshTimer();startLocalRefreshIfNecessaryTimer();}private void initLocalRefreshThreadPool() {localRefreshThreadPool = Executors.newSingleThreadExecutor(new ConsoleThreadFactory("refresh-RegCenter-thread", false));}private void startLocalRefreshTimer() {//每隔5分钟执行一次localRefresh}private void startLocalRefreshIfNecessaryTimer() {/* 每一秒钟检查,当system的配置发生变化,当前的uuid 和最新的uuid 不同的时候,执行loaclRefresh */}private synchronized void localRefresh() {// 有删减// 刷新注册中心,主要包括,对比zk集群是否变化,包括域的关闭和迁移refreshRegistryCenter();// console 的选主和数据更新,在console的系统配置中设置的zk集群才能够被当前的console管理。// 该数据为mysql中的数据,dashboard的选主逻辑为在zk中注册$SaturnSlef/saturn-console/dashboard/leaderrefreshDashboardLeaderTreeCache();// 是否需要创建或者迁移namespaceShardingManager,每一个域名都对应一个namespaceShardingManagerrefreshNamespaceShardingListenerManagerMap();}
该类的初始化主要为上文的代码。所以,域名的创建逻辑是直接写入数据库,然后通过refresh
写入zk
。一个namespace
的创建就完成了,主要是在数据库中写入数据(namespace_zkcluster_mapping
),更新sys_config
中的uid
异步刷新。刷新后,注册中心会有该namespace
的节点,以及针对dashboard
的选主和分片管理,其中每一个namespace
会有一个分片管理NamespaceShardingManager
,该类主要用于管理执行器的上下线,分片等信息。
private void start0() throws Exception {//shardingTreeCacheService.start();// create ephemeral node $SaturnExecutors/leader/host & $Jobs// 主要是针对saturn的执行器选举,为console节点namespaceShardingService.leaderElection();// 针对已经存在的job增加JobServersTriggerShardingListener,用于当executor上线下后进行分片,节点为$Jobs/${jobName}/servers// JobServersTriggerShardingListener addJobListenersService.addExistJobPathListener();// 上下线Listener,节点为/$SaturnExecutors/executors//ExecutorOnlineOfflineTriggerShardingListener ExecutorTrafficTriggerShardingListeneraddOnlineOfflineListener();// 分片 节点为/$SaturnExecutors/sharding//SaturnExecutorsShardingTriggerShardingListeneraddExecutorShardingListener();// 选主// /$SaturnExecutors/leaderaddLeaderElectionListener();// 新增与删除/$Jobs// AddOrRemoveJobListeneraddNewOrRemoveJobListener();}
针对上面不同的listerner
最后的执行的方法都是AbstractAsyncShardingTask
中的run
方法。当前我们创建了一个namespace
且需要分片的时候,对应的NamespaceShardingManager
会在zk
上注册多个节点,并且监听对应的executor
上下线,job
创建等事件。分片是由当前executor
的主节点来进行的,也就是说当新建一个namespace
的时候,该namespace
所有的executor
分片由获取到该namespace
的leader
进行操作,每一步操作都会判断当前操作的对象是否为namespace
的leader
(是否已经创建latch
节点,以及host
是否就是当前节点)。
选举:
public void leaderElection() throws Exception {lock.lockInterruptibly();try {if (hasLeadership()) {return;}log.info("{}-{} leadership election start", namespace, hostValue);try (LeaderLatch leaderLatch = new LeaderLatch(curatorFramework,SaturnExecutorsNode.LEADER_LATCHNODE_PATH)) {leaderLatch.start();int timeoutSeconds = 60;if (leaderLatch.await(timeoutSeconds, TimeUnit.SECONDS)) {if (!hasLeadership()) {becomeLeader();} else {log.info("{}-{} becomes a follower", namespace, hostValue);}} else {log.error("{}-{} leadership election is timeout({}s)", namespace, hostValue, timeoutSeconds);}} catch (InterruptedException e) {log.info("{}-{} leadership election is interrupted", namespace, hostValue);throw e;} catch (Exception e) {log.error(namespace + "-" + hostValue + " leadership election error", e);throw e;}} finally {lock.unlock();}}
选举其实就是在$SaturnExecutors/leader
节点下写入latch
节点,成为leader
后,回持久化$JOB
节点在zk
,然后将host
写入$SaturnExecutors/leader/host
里。
如果当前操作实例成为leader
,则进行job
和executor
管理工作,包括上下线,分片等。
console
所有的操作最后其实都是异步执行AbstractAsyncShardingTask
里面的方法:
public void run() {logStartInfo();boolean isAllShardingTask = this instanceof ExecuteAllShardingTask;try {// 如果当前变为非leader,则直接返回if (!namespaceShardingService.isLeadershipOnly()) {return;}// 如果需要全量分片,且当前线程不是全量分片线程,则直接返回,没必要做分片,由于console在选举成功后就会设置全量分片为true,而且立马将全量分片的task//提交给线程池。所以一开始是执行全量分片的if (namespaceShardingService.isNeedAllSharding() && !isAllShardingTask) {log.info("the {} will be ignored, because there will be {}", this.getClass().getSimpleName(),ExecuteAllShardingTask.class.getSimpleName());return;}// 从zk中获取所有的jobList<String> allJobs = getAllJobs();// 获取所有enable的jobList<String> allEnableJobs = getAllEnableJobs(allJobs);//最后在线的executor。位于$SaturnExecutors/sharding/content,该节点包含了executor的ip,分片值,以及负载等信息List<Executor> oldOnlineExecutorList = getLastOnlineExecutorList();// 如果当前是全分片,则从/$SaturnExecutors/executors 下拉去所有在线的executor,否则为nullList<Executor> customLastOnlineExecutorList = customLastOnlineExecutorList();// 如果不是全量分片,则copy最后在县的executor。List<Executor> lastOnlineExecutorList = customLastOnlineExecutorList == null? copyOnlineExecutorList(oldOnlineExecutorList) : customLastOnlineExecutorList;//最后没有被摘取流量的executor,$SaturnExecutors/executors/xx/noTraffic true 已经被摘流量;false,otherwise;List<Executor> lastOnlineTrafficExecutorList = getTrafficExecutorList(lastOnlineExecutorList);List<Shard> shardList = new ArrayList<>();// 摘取,该方法为抽象方法,if (pick(allJobs, allEnableJobs, shardList, lastOnlineExecutorList, lastOnlineTrafficExecutorList)) {// 放回putBackBalancing(allEnableJobs, shardList, lastOnlineExecutorList, lastOnlineTrafficExecutorList);// 如果当前变为非leader,则返回if (!namespaceShardingService.isLeadershipOnly()) {return;}// 持久化分片结果if (shardingContentIsChanged(oldOnlineExecutorList, lastOnlineExecutorList)) {namespaceShardingContentService.persistDirectly(lastOnlineExecutorList);}// notify the shards-changed jobs of all enable jobs.Map<String, Map<String, List<Integer>>> enabledAndShardsChangedJobShardContent = getEnabledAndShardsChangedJobShardContent(isAllShardingTask, allEnableJobs, oldOnlineExecutorList, lastOnlineExecutorList);namespaceShardingContentService.persistJobsNecessaryInTransaction(enabledAndShardsChangedJobShardContent);// sharding count ++increaseShardingCount();}} catch (InterruptedException e) {log.info("{}-{} {} is interrupted", namespaceShardingService.getNamespace(),namespaceShardingService.getHostValue(), this.getClass().getSimpleName());Thread.currentThread().interrupt();} catch (Throwable t) {log.error(t.getMessage(), t);if (!isAllShardingTask) { // 如果当前不是全量分片,则需要全量分片来拯救异常namespaceShardingService.setNeedAllSharding(true);namespaceShardingService.shardingCountIncrementAndGet();executorService.submit(new ExecuteAllShardingTask(namespaceShardingService));} else { // 如果当前是全量分片,则告警并关闭当前服务,重选leader来做事情raiseAlarm();shutdownNamespaceShardingService();}} finally {if (isAllShardingTask) { // 如果是全量分片,不再进行全量分片namespaceShardingService.setNeedAllSharding(false);}namespaceShardingService.shardingCountDecrementAndGet();}}
针对上面的pick
方法:
ExecuteAllShardingTask : 域下重排,移除已经存在所有executor,重新获取executors,重新获取作业shardsExecuteExtractTrafficShardingTask : 摘取executor流量,标记该executor的noTraffic为true,并移除其所有作业分片,只摘取所有非本地作业分片,设置totalLoadLevel为0ExecuteJobDisableShardingTask : 作业禁用,摘取所有executor运行的该作业的shard,注意要相应地减loadLevel,不需要放回ExecuteJobEnableShardingTask: 作业启用,获取该作业的shards,注意要过滤不能运行该作业的executorsExecuteJobForceShardShardingTask: 作业重排,移除所有executor的该作业shard,重新获取该作业的shards,finally删除forceShard结点ExecuteJobServerOfflineShardingTask: 作业的executor下线,将该executor运行的该作业分片都摘取,如果是本地作业,则移除ExecuteJobServerOnlineShardingTask : 作业的executor上线,executor级别平衡摘取,但是只能摘取该作业的shard;添加的新的shardExecuteOfflineShardingTask : executor下线,摘取该executor运行的所有非本地模式作业,移除该executor
总的来说,其实就是完成$SaturnExecutors/sharding/content
下的内容,下面的内容是一个数组,从0
开始,节点中记录的对象为
[{"executorName":"aaa","ip":"0.0.0.0","noTraffic":false,"jobNameList":["job1","job2"],"shardList":[{"jobName":"job1","item":0,loadlevel:1},{"jobName":"job2","item":0,loadlevel:1}]}]
后面的shardList
就是分片后的结果。
上述就是一个namespace的创建过程中的逻辑。
namespace 迁移
console
中能够将namespace
迁移到其他的zk
集群中。迁移主要逻辑就是修改数据库和注册中心的数据。迁移过程过程会记录失败和成功的个数到temporary_shared_status
表中。更新过程是先从数据库中拿到当前namespace
的zk
集群名称,然后将当前的节点移动到目的zk集群,删除当前对应的节点,然后刷新数据库中的数据。
如果在迁移过程中,删除了zk
节点,但是namespace
还没有刷新到数据库中,那么需要通过diff
方法将数据补齐。暂时没有发现其他的途径
job
job的创建
job
的创建和复制本质上是一样的,只是一个是手写配置,一个是从数据库中拉数据。具体的代码在JobServiceImpl
的createJob
中。
private void addOrCopyJob(String namespace, JobConfig jobConfig, String jobNameCopied, String createdBy)throws SaturnJobConsoleException {// 一半情况下为空List<JobConfig> unSystemJobs = getUnSystemJobs(namespace);Set<JobConfig> streamChangedJobs = new HashSet<>();validateJobConfig(namespace, jobConfig, unSystemJobs, streamChangedJobs);// 如果数据存在相同作业名,则抛异常// 直接再查一次,不使用unSystemJobs,因为也不能与系统作业名相同String jobName = jobConfig.getJobName();if (currentJobConfigService.findConfigByNamespaceAndJobName(namespace, jobName) != null) {throw new SaturnJobConsoleException(ERROR_CODE_BAD_REQUEST, String.format("该作业(%s)已经存在", jobName));}// 如果zk存在该作业,则尝试删除CuratorRepository.CuratorFrameworkOp curatorFrameworkOp = registryCenterService.getCuratorFrameworkOp(namespace);if (curatorFrameworkOp.checkExists(JobNodePath.getJobNodePath(jobName))) {if (!removeJobFromZk(jobName, curatorFrameworkOp)) {throw new SaturnJobConsoleException(ERROR_CODE_BAD_REQUEST,String.format("该作业(%s)正在删除中,请稍后再试", jobName));}}// 该域作业总数不能超过一定数量,sys_config 表中MAX_JOB_NUM字段,默认为100int maxJobNum = getMaxJobNum();if (jobIncExceeds(namespace, maxJobNum, 1)) {throw new SaturnJobConsoleException(ERROR_CODE_BAD_REQUEST,String.format("总作业数超过最大限制(%d),作业名%s创建失败", maxJobNum, jobName));}// 如果是copy作业,则从数据库中复制被拷贝的作业的配置到新的作业配置JobConfig myJobConfig = jobConfig;if (jobNameCopied != null) {myJobConfig = currentJobConfigService.findConfigByNamespaceAndJobName(namespace, jobNameCopied);SaturnBeanUtils.copyPropertiesIgnoreNull(jobConfig, myJobConfig);}// 设置作业配置字段默认值,并且强制纠正某些字段correctConfigValueWhenAddJob(myJobConfig);// 添加该作业到数据库currentJobConfigService.create(constructJobConfig4DB(namespace, myJobConfig, createdBy, createdBy));// 更新关联作业的上下游for (JobConfig streamChangedJob : streamChangedJobs) {currentJobConfigService.updateStream(constructJobConfig4DB(namespace, streamChangedJob, null, createdBy));}// 添加该作业配置到zk,并联动更新关联作业的上下游createJobConfigToZk(myJobConfig, streamChangedJobs, curatorFrameworkOp);}
最后将作业各种配置写入zk
// 添加作业
private void createJobConfigToZk(JobConfig jobConfig, Set<JobConfig> streamChangedJobs,CuratorRepository.CuratorFrameworkOp curatorFrameworkOp) throws SaturnJobConsoleException {curatorTransactionOp.create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_ENABLED), jobConfig.getEnabled()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_DESCRIPTION), jobConfig.getDescription()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_CUSTOM_CONTEXT),jobConfig.getCustomContext()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_JOB_TYPE), jobConfig.getJobType()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_JOB_MODE), jobConfig.getJobMode()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_SHARDING_ITEM_PARAMETERS),jobConfig.getShardingItemParameters()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_JOB_PARAMETER),jobConfig.getJobParameter()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_QUEUE_NAME), jobConfig.getQueueName()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_CHANNEL_NAME),jobConfig.getChannelName()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_FAILOVER), jobConfig.getFailover()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_MONITOR_EXECUTION), "true").create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_TIMEOUT_4_ALARM_SECONDS),jobConfig.getTimeout4AlarmSeconds()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_TIMEOUT_SECONDS),jobConfig.getTimeoutSeconds()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_TIME_ZONE), jobConfig.getTimeZone()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_CRON), jobConfig.getCron()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_PAUSE_PERIOD_DATE),jobConfig.getPausePeriodDate()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_PAUSE_PERIOD_TIME),jobConfig.getPausePeriodTime()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_PROCESS_COUNT_INTERVAL_SECONDS),jobConfig.getProcessCountIntervalSeconds()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_SHARDING_TOTAL_COUNT),jobConfig.getShardingTotalCount()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_SHOW_NORMAL_LOG),jobConfig.getShowNormalLog()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_LOAD_LEVEL), jobConfig.getLoadLevel()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_JOB_DEGREE), jobConfig.getJobDegree()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_ENABLED_REPORT),jobConfig.getEnabledReport()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_PREFER_LIST), jobConfig.getPreferList()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_USE_DISPREFER_LIST),jobConfig.getUseDispreferList()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_LOCAL_MODE), jobConfig.getLocalMode()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_USE_SERIAL), jobConfig.getUseSerial()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_DEPENDENCIES),jobConfig.getDependencies()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_GROUPS), jobConfig.getGroups()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_RERUN), jobConfig.getRerun()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_UPSTREAM), jobConfig.getUpStream()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_DOWNSTREAM), jobConfig.getDownStream()).create(JobNodePath.getConfigNodePath(jobName, CONFIG_ITEM_JOB_CLASS), jobConfig.getJobClass());
}
可以看到,写入zk
中的数据特别的多,基本上是配置的所有数据,如果没有则都使用默认值。一半情况下,最开始初始化的时候,只会涉及到job
的名称,类全名和分片信息。其他的数据都是在详情里面配置的。
Executor
executor启动
本文只涉及到executor
外部启动方式,不涉及到内嵌模式。
executor
本身也是个java
进程,所以仅仅需要找到对应的main
方法就可以知道了解到启动的逻辑。
如果在本地使用saturn:run -Dnamespace=demo1 -DexecutorName=exec01 -DVIP_SATURN_CONSOLE_URI=http://127.0.0.1:9088 -f pom.xml
的方式启动,
那么已经配置了namespace
,console-url
两个必须填入的,在插件的SaturnJobRunMojo
中,我们发现他帮我们下载了executor
的zip
然后解压缩,最后传入了saturnLibDir
和appLibDir
,一个用于加载saturn
依赖的包,一个是业务代码通过saturn:zip
得到的zip
包解压缩。为什么不能使用springboot
,我们后续会了解到。
在saturn-executor
启动源码中,首先是初始化classLoader
,包含两个,一个executor
的SaturnClassLoader
,一个为job
的JobClassLoader
,两者的实现都是差不多的,主要都是继承URLClassLoader
,该classloader
的主要作用就是可以通过jar
或者url
等方式获取到class
。
public class SaturnClassLoader extends URLClassLoader {public SaturnClassLoader(URL[] urls, ClassLoader parent) {super(urls, parent);}@Overrideprotected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {synchronized (getClassLoadingLock(name)) {Class<?> findClass = findLoadedClass(name);if (findClass == null) {findClass = super.loadClass(name, resolve);}return findClass;}}
该classLoader
继承了URLClassLoader
,上文抛出一个问题,就是为什么不能用springboot
的打包方式。当我们需要执行一个jar
的时候,jar
包中对应会有个META-INF/MANIFEST.MF
文件,里面有个参数就是Main-Class
当我们使用springboot
插件打包的时候,该参数被修改为org.springframework.boot.loader.JarLauncher
,而我们定义的main
方法的class
最终变成了start-class
也就是说.而Jarlauncher
最后使用的classloader
为org.springframework.boot.loader.LaunchedURLClassLoader
,他会去扫描当前jar
包里的BOOT-INF/classes
和BOOT-INF/lib
,所以如果使用JobClassLoader
的去加载原本写好的Main
方法,而loadClass
是不会去包里面查看BOOT-INF/classes
或者BOOT-INF/lib
下的内容的。这里涉及到springboot
启动的时候的时候进行的类加载。
使用saturn
的maven
插件进行打包的实现莫过于将当前所有依赖的jar
放入一个zip
中。
启动的时候,传入-saturnLibDir -appLibDir -namespace
,其中saturnLibDir
也就是saturn
的executor/lib
包将由SaturnClassLoader
加载,appLibDir
是上文提到的zip
解压缩后文件夹,是由JobClassLoader
加载。这种分开的方式可以有效去掉jar
冲突。
运行启动人口为executor
下的Main
,核心启动为SaturnExecutor.buildExecutor
。首先是初始化一些和日志相关配置。然后初始化Job
,由于我们在写Job
的时候,会在resources
下配置saturn.properties
,里面会写入app.class
.
private static Object validateAndLoadSaturnApplication(ClassLoader jobClassLoader) {try {Properties properties = getSaturnProperty(jobClassLoader); //获取saturn.propertiesif (properties == null) {return null;}String appClassStr = properties.getProperty("app.class"); // 获取app.classif (StringUtils.isBlank(appClassStr)) {return null;}appClassStr = appClassStr.trim();ClassLoader oldCL = Thread.currentThread().getContextClassLoader();try {Thread.currentThread().setContextClassLoader(jobClassLoader);Class<?> appClass = jobClassLoader.loadClass(appClassStr); // 加载当前的appclassClass<?> saturnApplicationClass = jobClassLoader.loadClass(SATURN_APPLICATION_CLASS); // 加载实现SaturnApplication的类if (saturnApplicationClass.isAssignableFrom(appClass)) { // 判断初始化class 是否实现saturn的SaturnApplication借口Object saturnApplication = appClass.newInstance(); //初始化对象appClass.getMethod("init").invoke(saturnApplication);//调用初始化方法。return saturnApplication; // 这里返回的是Object,但是其实可以返回为SaturnApplication} else {throw new RuntimeException("the app.class " + appClassStr + " must be instance of " + SATURN_APPLICATION_CLASS);}} finally {Thread.currentThread().setContextClassLoader(oldCL);}} catch (RuntimeException e) {LogUtils.error(log, LogEvents.ExecutorEvent.INIT, "Fail to load SaturnApplication", e);throw e;} catch (Exception e) {LogUtils.error(log, LogEvents.ExecutorEvent.INIT, "Fail to load SaturnApplication", e);throw new RuntimeException(e);}}
如果是springboot
项目,则可以直接使用saturn
的springboot
。
public class GenericSpringBootSaturnApplication extends AbstractSpringSaturnApplication {@Overridepublic void init() {if (applicationContext != null) {destroy();}applicationContext = run();}@Overridepublic void destroy() {if (applicationContext != null) {SpringApplication.exit(applicationContext);applicationContext = null;}}/*** 启动SpringBoot,默认启动方式为SpringApplication.run(source()),其中source()可查看{@link #source()}方法*/protected ApplicationContext run() {return SpringApplication.run(source());}/*** 使用默认方式启动SpringBoot时,加载的source*/protected Object source() {return this.getClass();}}
其实可以看出,springboot
项目主要就是为了获得这个ApplicationContext
。后面saturn
的Job
是从applicationContext
中直接获取bean
的,所以我们的job
需要注册为spring
的bean
.
最后初始化SaturnExecutor
的对象
private SaturnExecutor(String namespace, String executorName, ClassLoader executorClassLoader,ClassLoader jobClassLoader, Object saturnApplication) {this.executorName = executorName;this.namespace = namespace;this.executorClassLoader = executorClassLoader;this.jobClassLoader = jobClassLoader;this.saturnApplication = saturnApplication;this.raiseAlarmExecutorService = Executors.newSingleThreadExecutor(new SaturnThreadFactory(executorName + "-raise-alarm-thread", false));this.shutdownJobsExecutorService = Executors.newCachedThreadPool(new SaturnThreadFactory(executorName + "-shutdownJobSchedulers-thread", true));initRestartThread();registerShutdownHandler();}
其中initRestartThead
会调用到
public void execute() throws Exception {shutdownLock.lockInterruptibly();try {if (isShutdown) {return;}long startTime = System.currentTimeMillis();shutdown0();try {StartCheckUtil.add2CheckList(StartCheckItem.ZK, StartCheckItem.UNIQUE, StartCheckItem.JOBKILL);LogUtils.info(log, LogEvents.ExecutorEvent.INIT, "start to discover from saturn console");// 通过console的url,通过/rest/v1/discovery?namespace 获取到namespace的zkMap<String, String> discoveryInfo = discover();String zkConnectionString = discoveryInfo.get(DISCOVER_INFO_ZK_CONN_STR);if (StringUtils.isBlank(zkConnectionString)) {LogUtils.error(log, LogEvents.ExecutorEvent.INIT, "zk connection string is blank!");throw new RuntimeException("zk connection string is blank!");}saturnExecutorExtension.postDiscover(discoveryInfo);// 初始化注册中心initRegistryCenter(zkConnectionString.trim());// 检测是否存在仍然有正在运行的SHELL作业checkAndKillExistedShellJobs();// 初始化timeout schedulerTimeoutSchedulerExecutor.createScheduler(executorName);// 先注册Executor再启动作业,防止Executor因为一些配置限制而抛异常了,而作业线程已启动,导致作业还运行了一会// 在$SaturnExecutors/executors下注册当前的executor,包括{lastBeginTime,clean,version,ip}registerExecutor();// 启动定时清空nohup文件的线程periodicTruncateNohupOutService = new PeriodicTruncateNohupOutService(executorName);periodicTruncateNohupOutService.start();// 启动零点清0成功数错误数的线程resetCountService = new ResetCountService(executorName);resetCountService.startRestCountTimer();// 添加新增作业时的回调方法,启动已经存在的作业saturnExecutorService.registerJobsWatcher();} catch (Throwable t) {saturnExecutorExtension.handleExecutorStartError(t);shutdown0();throw t;}} finally {shutdownLock.unlock();}}
首先将当前的executor
注册到zk
上,然后对当前namespace
下的$JOB
节点进行监听。
public void registerJobsWatcher() throws Exception {if (initNewJobService != null) {initNewJobService.shutdown();}initNewJobService = new InitNewJobService(this);initNewJobService.start();}
//InitNewJobService
public void start() throws Exception {treeCache = TreeCache.newBuilder((CuratorFramework) regCenter.getRawClient(), JobNodePath.ROOT).setExecutor(new CloseableExecutorService(Executors.newSingleThreadExecutor(new SaturnThreadFactory(executorName + "-$Jobs-watcher", false)),true)).setMaxDepth(1).build();executorService = Executors.newSingleThreadExecutor(new SaturnThreadFactory(executorName + "-initNewJob-thread", false));treeCache.getListenable().addListener(new InitNewJobListener(), executorService);treeCache.start();}
在InitNewJobListener
中是负责对当前节点node
的监听,使用的是zk
的treecache
,Curator
包括了Node Cache 、Path Cache、Tree Cache
三类。其中Node Cache
节点缓存可以用于ZNode
节点的监听,Path Cache
子节点缓存用于ZNode
的子节点的监听,而Tree Cache
树缓存是Path Cache
的增强,不光能监听子节点,也能监听ZNode
节点自身。
监听的事件核心处理逻辑为:
String jobName = StringUtils.substringAfterLast(path, "/");// /${namespace}/$Jobs/${jobName}/${jobclass}String jobClassPath = JobNodePath.getNodeFullPath(jobName, ConfigurationNode.JOB_CLASS);// wait 5 seconds at most until jobClass created .WAIT_JOBCLASS_ADDED_COUNT=25for (int i = 0; i < WAIT_JOBCLASS_ADDED_COUNT; i++) {// 判断是否已经存在当前job的完整classif (!regCenter.isExisted(jobClassPath)) {Thread.sleep(200L);continue;}// 是否已经创建if (!jobNames.contains(jobName)) {if (canInitTheJob(jobName) && initJobScheduler(jobName)) {//加入已经创建的list,为什么不用set也很奇怪。jobNames.add(jobName);}} else {//logbreak;}
创建分为两种,如果当前的executor
配置了VIP_SATURN_INIT_JOB_BY_GROUPS
的值,也就是配置了group
则只初始group
中的数据。否则初始化所由的job
。而初始化job
位于initJobScheduler
:
private boolean initJobScheduler(String jobName) {try {// job 创建失败的记录JOB_INIT_FAILED_RECORDS.get(executorName).putIfAbsent(jobName, new HashSet<Integer>());// 所有在console创建job或者update job的参数,从注册中心拉JobConfiguration jobConfig = new JobConfiguration(regCenter, jobName);if (jobConfig.getSaturnJobClass() == null) {throw new JobException("unexpected error, the saturnJobClass cannot be null, jobName is %s, jobType is %s",jobName, jobConfig.getJobType());}if (jobConfig.isDeleting()) {String serverNodePath = JobNodePath.getServerNodePath(jobName, executorName);regCenter.remove(serverNodePath);LogUtils.warn(log, jobName, "the job is on deleting");return false;}//新建JobScheduler 对象JobScheduler scheduler = new JobScheduler(regCenter, jobConfig);// 设置saturnExecutorService,主要包含了executor的注册信息和关系scheduler.setSaturnExecutorService(saturnExecutorService);//初始化scheduler.init();// 如果创建成功,则从当前失败记录里移除JOB_INIT_FAILED_RECORDS.get(executorName).get(jobName).clear();return true;} catch (JobInitAlarmException e) {if (!SystemEnvProperties.VIP_SATURN_DISABLE_JOB_INIT_FAILED_ALARM) {// no need to log exception stack as it should be logged in the original happen placeraiseAlarmForJobInitFailed(jobName, e);}} catch (Throwable t) {LogUtils.warn(log, jobName, "job initialize failed, but will not stop the init process", t);}return false;}
可以看到,重点最后都在JobScheduler
这个类中,而这个类应该是和Job
一对一的。
public JobScheduler(final CoordinatorRegistryCenter coordinatorRegistryCenter,final JobConfiguration jobConfiguration) {this.jobName = jobConfiguration.getJobName();this.executorName = coordinatorRegistryCenter.getExecutorName();this.currentConf = jobConfiguration;this.coordinatorRegistryCenter = coordinatorRegistryCenter;this.jobNodeStorage = new JobNodeStorage(coordinatorRegistryCenter, jobConfiguration);initExecutorService();JobRegistry.addJobScheduler(executorName, jobName, this);zkCacheManager = new ZkCacheManager((CuratorFramework) coordinatorRegistryCenter.getRawClient(), jobName,executorName);// 包含一些job的config信息,包括cron等信息configService = new ConfigurationService(this);//选举,每一个job都有自己的leader,路径 /${namespace}/$Jobs/${job}/leader/election/leaderElectionService = new LeaderElectionService(this);//主要用于 持久化/${namespace}/$Jobs/${job}/servers下面的executor信息,包括ip,执行的成功失败等信息// 还会有标记是否立即执行,该节点为临时节点serverService = new ServerService(this);//用于分片信息shardingService = new ShardingService(this);//执行上下文executionContextService = new ExecutionContextService(this);//执行作业的服务,主要包含更新下次启动时间,注册作业完成情况等信息executionService = new ExecutionService(this);// 失败转移failoverService = new FailoverService(this);//统计作业执行情况statisticsService = new StatisticsService(this);//执行情况统计analyseService = new AnalyseService(this);// 每一个namespace 最多执行500个job VIP_SATURN_MAX_NUMBER_OF_JOBS=500limitMaxJobsService = new LimitMaxJobsService(this);//针对上面的所有service,都会有对应的zklisterner,这里主要是将所有的service和zk节点进行注册listenerManager = new ListenerManager(this);//上传结果的信息reportService = new ReportService(this);}
后面针对上诉的service
在执行的过程中的作用在展开,现在进入JobScheduler.init()
.
public void init() {try {startAll();createJob();serverService.persistServerOnline(job);// Notify job enabled or disabled after that all are ready, include job was initialized.configService.notifyJobEnabledOrNot();} catch (Throwable t) {shutdown(false);throw t;}}
初始化涉及到两步,第一步就是开始前面所有的service
,然后就是创建job
。由于service
是跟随着job
的生命周期来运行的,所以关注于job
的生命周期,也就是它的初始化,运行,运行状态转换和停止等。其中createJob
就是涉及到job
的初始化:
private void createJob() {//从typemanager 中获取到当前的job类型,本文主要涉及为javaClass<?> jobClass = currentConf.getSaturnJobClass();try {job = (AbstractElasticJob) jobClass.newInstance();} catch (Exception e) {LogUtils.error(log, jobName, "unexptected error", e);throw new JobException(e);}//设置JobScheduler 中的servicejob.setJobScheduler(this);job.setConfigService(configService);job.setShardingService(shardingService);job.setExecutionContextService(executionContextService);job.setExecutionService(executionService);job.setFailoverService(failoverService);job.setServerService(serverService);job.setExecutorName(executorName);job.setReportService(reportService);job.setJobName(jobName);job.setNamespace(coordinatorRegistryCenter.getNamespace());job.setSaturnExecutorService(saturnExecutorService);// 初始化jobjob.init();}
所有的jobType
:
public void registerJobType() {JobTypeManager.register( //JobTypeBuilder.newBuilder().name("JAVA_JOB").cron().java().allowedShutdownGracefully().triggerClass(CronTrigger.class).handlerClass(SaturnJavaJob.class).build());JobTypeManager.register( //JobTypeBuilder.newBuilder().name("SHELL_JOB").cron().shell().allowedShutdownGracefully().triggerClass(CronTrigger.class).handlerClass(SaturnScriptJob.class).build());JobTypeManager.register( //JobTypeBuilder.newBuilder().name("PASSIVE_JAVA_JOB").passive().java().allowedShutdownGracefully().triggerClass(PassiveTrigger.class).handlerClass(SaturnJavaJob.class).build());JobTypeManager.register( //JobTypeBuilder.newBuilder().name("PASSIVE_SHELL_JOB").passive().shell().allowedShutdownGracefully().triggerClass(PassiveTrigger.class).handlerClass(SaturnScriptJob.class).build());
}
java
相关的job
最后使用的对象为SaturnJavaJob
。首先看下具体的一个业务任务代码。
// AbstractSaturnJavaJob extends AbstractElasticJob
@Component
public class DemoJob extends AbstractSaturnJavaJob {private static final Logger log = LoggerFactory.getLogger(DemoJob.class);@Resourceprivate DemoService demoService;@Overridepublic SaturnJobReturn handleJavaJob(String jobName, Integer shardItem, String shardParam,SaturnJobExecutionContext shardingContext) throws InterruptedException {log.info("{} is running, item is {}", jobName, shardItem);demoService.doing();return new SaturnJobReturn();}}
回到SaturnJavaJob.init
public void init() {//AbstractSaturnJavaJob.init(super.init()):{ //代码块为super.init()Class<? extends Trigger> triggerClass = configService.getJobType().getTriggerClass();Trigger trigger = null;try {trigger = triggerClass.newInstance();trigger.init(this);} catch (Exception e) {LogUtils.error(log, jobName, "Trigger init failed", e);throw new JobException(e);}scheduler = new SaturnScheduler(this, trigger);scheduler.start();getExecutorService();}// 创建job的对象createJobBusinessInstanceIfNecessary();// 获取job的versiongetJobVersionIfNecessary();
}
更具对应的job
类型获取到Trigger
类型。然后初始化Trigger
和SaturnScheduler
。并且启动SaturnScheduler
。最后是将任务交给SaturnWorker
对象执行:
//new SaturnWorker(job, trigger.createTriggered(false, null), trigger.createQuartzTrigger()); SaturnScheduler创建方法Triggered 方法其实主要用于判断是否已经执行
public SaturnWorker(AbstractElasticJob job, Triggered notTriggered, Trigger trigger) {this.job = job;this.notTriggered = notTriggered;this.triggered = notTriggered;initTrigger(trigger);// this.triggerObj = (OperableTrigger) trigger; 初始化triggerObj
}
SaturnWorker.run()
while (!halted.get()) {try {synchronized (sigLock) {while (paused && !halted.get()) {try {sigLock.wait(1000L);} catch (InterruptedException ignore) {}}if (halted.get()) {break;}}boolean noFireTime = false; // 没有下次执行时间,初始化为falselong timeUntilTrigger = 1000;// 即上文传入的trigger,if (triggerObj != null) {triggerObj.updateAfterMisfire(null);long now = System.currentTimeMillis();Date nextFireTime = triggerObj.getNextFireTime();if (nextFireTime != null) {timeUntilTrigger = nextFireTime.getTime() - now;} else {noFireTime = true;}}while (!noFireTime && timeUntilTrigger > 2) {synchronized (sigLock) {// 是否停止if (halted.get()) {break;}//是否已经执行if (triggered.isYes()) {break;}try {sigLock.wait(timeUntilTrigger);} catch (InterruptedException ignore) {}// 计算下次执行时间if (triggerObj != null) {long now = System.currentTimeMillis();Date nextFireTime = triggerObj.getNextFireTime();if (nextFireTime != null) {timeUntilTrigger = nextFireTime.getTime() - now;} else {noFireTime = true;}}}}boolean goAhead;Triggered currentTriggered = notTriggered; //notTriggered = trigger.createTriggered(false, null)// 触发执行只有两个条件:1.时间到了 2.点立即执行synchronized (sigLock) {// 是否停止或者暂停goAhead = !halted.get() && !paused;// 重置立即执行标志,赋值当前立即执行数据if (triggered.isYes()) { // 初始化为falsecurrentTriggered = triggered;triggered = notTriggered;} else if (goAhead) { // 非立即执行。即,执行时间到了,或者没有下次执行时间goAhead = goAhead && !noFireTime; // 有下次执行时间,即执行时间到了,才执行作业if (goAhead) { // 执行时间到了,更新执行时间if (triggerObj != null) {triggerObj.triggered(null);//这里使用的是quartz,传入null用于计算下次cron的时间}} else { // 没有下次执行时间,则尝试睡一秒,防止不停的循环导致CPU使用率过高(如果cron不再改为周期性执行)try {sigLock.wait(1000L);} catch (InterruptedException ignore) {}}}}// job开始执行if (goAhead) {job.execute(currentTriggered);}} catch (RuntimeException e) {LogUtils.error(log, job.getJobName(), e.getMessage(), e);}}
由于saturnworker
是异步放入线程池进行运行,所以此时主线程已经进入了下一步init
方法:
private void createJobBusinessInstanceIfNecessary() {// 从configuration也就是zk中获取到job的classString jobClassStr = configService.getJobConfiguration().getJobClass();if (StringUtils.isBlank(jobClassStr)) {LogUtils.error(log, jobName, "jobClass is not set");throw new JobInitAlarmException("jobClass is not set");}jobClassStr = jobClassStr.trim();LogUtils.info(log, jobName, "start to create job business instance, jobClass is {}", jobClassStr);if (jobBusinessInstance == null) {ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();ClassLoader jobClassLoader = saturnExecutorService.getJobClassLoader();// 还是上文中的JobclassloaderThread.currentThread().setContextClassLoader(jobClassLoader);try {// 如果说,我们使用springboot的fatjar,那么在这里,就不能够获取到springbootjar包中的/BOOT-INF/Classes,从而爆出CLassNotfound的一场Class<?> jobClass = jobClassLoader.loadClass(jobClassStr);//从上文中,如果是spring项目,就会有applicationcontext中获取到对应的jobbean,该方法调用的其实是AbstractSaturnApplication,下的getJobInstance。也就是说,如果不想通过spring来管理自己的job,那么可以在对应的application中实现getJobInstance 方法,该方法需要返回对应class的job实例。spring中的实现 applicationContext.getBean(jobClass) 。jobBusinessInstance = tryToGetSaturnBusinessInstanceFromSaturnApplication(jobClassLoader, jobClass);if (jobBusinessInstance == null) {try {// 如果没有从application中获取到,那么可以调用当前class的静态方法getObject,来初始化job。jobBusinessInstance = jobClass.getMethod("getObject").invoke(null);if (jobBusinessInstance != null) {LogUtils.info(log, jobName, "get job instance from getObject");}} catch (NoSuchMethodException e) {LogUtils.info(log, jobName,"the jobClass hasn't the static getObject method, will initialize job by default no arguments constructor method");}}// 业务没有重写getObject方法,BaseSaturnJob会默认返回nullif (jobBusinessInstance == null) {jobBusinessInstance = jobClass.newInstance();LogUtils.info(log, jobName, "get job instance from newInstance");}// 该类暂时没发现有啥用处,setSaturnApi也是为空实现SaturnApi saturnApi = new SaturnApi(getNamespace(), executorName);jobClass.getMethod("setSaturnApi", Object.class).invoke(jobBusinessInstance, saturnApi);} catch (Throwable t) {throw new JobInitAlarmException(logBusinessExceptionIfNecessary(jobName, t));} finally {Thread.currentThread().setContextClassLoader(oldClassLoader);}}if (jobBusinessInstance == null) {LogUtils.error(log, jobName, "job instance is null");throw new JobInitAlarmException("job instance is null");}
}
现在,saturn
的worker
线程已经开始运行,而job
也已经初始化完成。
Job的执行
上文已经将job初始化,而且其实已经调用了job.execute(currentTriggered);
,也就是job
其实已经开始进入执行阶段,接下来会看下job
的具体执行过程:
//AbstractElasticJob
public final void execute(final Triggered triggered) {LogUtils.debug(log, jobName, "Saturn start to execute job [{}]", jobName);// 对每一个jobScheduler,作业对象只有一份,多次使用,所以每次开始执行前先要resetreset();if (configService == null) {LogUtils.warn(log, jobName, "configService is null");return;}JobExecutionMultipleShardingContext shardingContext = null;try {// 如果当前的job运行上报,cron和passive作业默认上报,而且当前host下没有分配failover的,则进行分片// job在执行的过程中,会在execution下if (!configService.isEnabledReport() || failoverService.getLocalHostFailoverItems().isEmpty()) {// 如果当前允许上报,或者当前executor的失败转移分片为空,则进行分片shardingService.shardingIfNecessary();}// 判断当前job是否执行if (!configService.isJobEnabled()) {LogUtils.debug(log, jobName, "{} is disabled, cannot be continued, do nothing about business.",jobName);return;}//分片上下文shardingContext = executionContextService.getJobExecutionShardingContext(triggered);if (shardingContext.getShardingItems() == null || shardingContext.getShardingItems().isEmpty()) {LogUtils.debug(log, jobName, "{} 's items of the executor is empty, do nothing about business.",jobName);callbackWhenShardingItemIsEmpty(shardingContext);return;}if (configService.isInPausePeriod()) {LogUtils.info(log, jobName,"the job {} current running time is in pausePeriod, do nothing about business.", jobName);return;}executeJobInternal(shardingContext);if (isFailoverSupported() && configService.isFailover() && !stopped && !forceStopped && !aborted) {failoverService.failoverIfNecessary();}LogUtils.debug(log, jobName, "Saturn finish to execute job [{}], sharding context:{}.", jobName,shardingContext);} catch (Exception e) {LogUtils.warn(log, jobName, e.getMessage(), e);} finally {running = false;}
}
判断是否需要分片中,会有判断是否有运行本作业服务器的失败转移序列号
/*** 获取运行在本作业服务器的失效转移序列号.** @return 运行在本作业服务器的失效转移序列号*/public List<Integer> getLocalHostFailoverItems() {//zk中的路径为:/${namespace}/$Jobs/${jonName}/execution 里面是分片数组List<String> items = getJobNodeStorage().getJobNodeChildrenKeys(ExecutionNode.ROOT);List<Integer> result = new ArrayList<>(items.size());for (String each : items) {int item = Integer.parseInt(each);// zk路径为/${namespace}/$Jobs/${jonName}/execution/${item}/failoverString node = FailoverNode.getExecutionFailoverNode(item);// 判断是否存在/${namespace}/$Jobs/${jonName}/execution/${item}/failover//上诉路径为一个零时节点,如果该节点存在,那么failover中的值为executorNameif (getJobNodeStorage().isJobNodeExisted(node) && executorName.equals(getJobNodeStorage().getJobNodeDataDirectly(node))) {result.add(item);}}Collections.sort(result);return result;}
由于cron
是默认需要分片的,所以其实从某种意义上来说每次执行的时候都会进行分片
/*** 如果需要分片且当前节点为主节点, 则作业分片.*/public synchronized void shardingIfNecessary() throws JobShuttingDownException {if (isShutdown) {return;}GetDataStat getDataStat = null;// 从/${namespace}/$Jobs/${jonName}/leader/sharding/necessary中获取数据if (getJobNodeStorage().isJobNodeExisted(ShardingNode.NECESSARY)) {getDataStat = getNecessaryDataStat();}// sharding necessary内容为空,或者内容是"0"则返回,否则,需要进行sharding处理if (getDataStat == null || SHARDING_UN_NECESSARY.equals(getDataStat.getData())) {return;}// 如果不是leader,则等待leader处理完成(这也是一个死循环,知道满足跳出循环的条件:1. 被shutdown 2. 无须sharding而且不处于processing状态)if (blockUntilShardingComplatedIfNotLeader()) {return;}// 如果有作业分片处于running状态则等待(无限期)waitingOtherJobCompleted();// 建立一个临时节点,标记shardig处理中//processinggetJobNodeStorage().fillEphemeralJobNode(ShardingNode.PROCESSING, "");try {// 删除作业下面的所有JobServer的sharding节点clearShardingInfo();int retryCount = 3;while (!isShutdown) {boolean needRetry = false;int version = getDataStat.getVersion();// 首先尝试从job/leader/sharding/neccessary节点获取,如果失败,会从$SaturnExecutors/sharding/content下面获取// key is executor, value is sharding itemsMap<String, List<Integer>> shardingItems = namespaceShardingContentService.getShardContent(jobName, getDataStat.getData());try {// 所有jobserver的(检查+创建),加上设置sharding necessary内容为0,都是一个事务CuratorTransactionFinal curatorTransactionFinal = getJobNodeStorage().getClient().inTransaction().check().forPath("/").and();for (Entry<String, List<Integer>> entry : shardingItems.entrySet()) {//创建/${namespace}/$Jobs/${jonName}/servers/${executorName}/sharding,写入一些必要的数据curatorTransactionFinal.create().forPath(JobNodePath.getNodeFullPath(jobName, ShardingNode.getShardingNode(entry.getKey())),ItemUtils.toItemsString(entry.getValue()).getBytes(StandardCharsets.UTF_8)).and();}curatorTransactionFinal.setData().withVersion(version).forPath(JobNodePath.getNodeFullPath(jobName, ShardingNode.NECESSARY),SHARDING_UN_NECESSARY.getBytes(StandardCharsets.UTF_8)).and();curatorTransactionFinal.commit();} catch (BadVersionException e) {LogUtils.warn(log, jobName, "zookeeper bad version exception happens.", e);needRetry = true;retryCount--;} catch (Exception e) {// 可能多个sharding task导致计算结果有滞后,但是server机器已经被删除,导致commit失败// 实际上可能不影响最终结果,仍然能正常分配分片,因为还会有resharding事件被响应// 修改日志级别为warn级别,避免不必要的告警LogUtils.warn(log, jobName, "Commit shards failed", e);}if (needRetry) {if (retryCount >= 0) {LogUtils.info(log, jobName,"Bad version because of concurrency, will retry to get shards later");Thread.sleep(200L); // NOSONARgetDataStat = getNecessaryDataStat();} else {LogUtils.warn(log, jobName, "Bad version because of concurrency, give up to retry");break;}} else {break;}}} catch (Exception e) {LogUtils.error(log, jobName, e.getMessage(), e);} finally {getJobNodeStorage().removeJobNodeIfExisted(ShardingNode.PROCESSING);}}
分片完成以后,执行之前去获取当前executor
的分片信息:
/*** 获取当前作业服务器运行时分片上下文.** @return 当前作业服务器运行时分片上下文*/public JobExecutionMultipleShardingContext getJobExecutionShardingContext(final Triggered triggered) {SaturnExecutionContext result = new SaturnExecutionContext();result.setJobName(configService.getJobName());result.setShardingTotalCount(configService.getShardingTotalCount());result.setTriggered(triggered);//从/${namespace}/$Jobs/${jonName}/servers/${executorName}/sharding中获取本地的,这个值是在上文中的分片中写入的List<Integer> shardingItems = getShardingItems();boolean isEnabledReport = configService.isEnabledReport();if (isEnabledReport) {//从${namespace}/$Jobs/${jonName}/execution/${item}/running 获取到正在执行的itemremoveRunningItems(shardingItems);}// 设置当前需要执行的分片result.setShardingItems(shardingItems);// 传入的参数result.setJobParameter(configService.getJobParameter());// 业务上下文result.setCustomContext(configService.getCustomContext());result.setJobConfiguration(jobConfiguration);if (coordinatorRegistryCenter != null) {result.setNamespace(coordinatorRegistryCenter.getNamespace());result.setExecutorName(coordinatorRegistryCenter.getExecutorName());}if (result.getShardingItems().isEmpty()) {return result;}// 获取参数对照表Map<Integer, String> shardingItemParameters = configService.getShardingItemParameters();if (shardingItemParameters.containsKey(-1)) { // 本地模式for (int each : result.getShardingItems()) {result.getShardingItemParameters().put(each, shardingItemParameters.get(-1));}} else {for (int each : result.getShardingItems()) {if (shardingItemParameters.containsKey(each)) {result.getShardingItemParameters().put(each, shardingItemParameters.get(each));}}}if (jobConfiguration.getTimeoutSeconds() > 0) {result.setTimetoutSeconds(jobConfiguration.getTimeoutSeconds());}// 返回执行上下文return result;}
获取到分片信息,开始执行:
private void executeJobInternal(final JobExecutionMultipleShardingContext shardingContext) throws Exception {//注册分片执行状态为runningexecutionService.registerJobBegin(shardingContext);try {//进入执行逻辑executeJob(shardingContext);} finally {List<Integer> shardingItems = shardingContext.getShardingItems();if (!shardingItems.isEmpty()) {Date nextFireTimePausePeriodEffected = jobScheduler.getNextFireTimePausePeriodEffected();boolean isEnabledReport = configService.isEnabledReport();for (int item : shardingItems) {if (isEnabledReport && !checkIfZkLostAfterExecution(item)) {continue;}if (!aborted) {//作业完成信息注册,此信息用于页面展现。注意,无论作业是否上报状态(对应/config/enabledReport/节点),都会注册此信息。executionService.registerJobCompletedByItem(shardingContext, item, nextFireTimePausePeriodEffected);}// 如果当前为失败转移的分片,则从失败转移列表中移除当前的分片节点if (isFailoverSupported() && configService.isFailover()) {failoverService.updateFailoverComplete(item);}}}// 执行依赖作业,是通过http接口调用consoleUri + "/rest/v1/" + namespace + "/jobs/" + jobName + "/runDownStream";完成runDownStream(shardingContext);}
}
最后执行executeJob
:
protected final void executeJob(final JobExecutionMultipleShardingContext shardingContext) {if (!(shardingContext instanceof SaturnExecutionContext)) {LogUtils.error(log, jobName, "!!! The context must be instance of SaturnJobExecutionContext !!!");return;}long start = System.currentTimeMillis();SaturnExecutionContext saturnContext = (SaturnExecutionContext) shardingContext;saturnContext.setSaturnJob(true);// 针对分片返回结果Map<Integer, SaturnJobReturn> retMap = new HashMap<Integer, SaturnJobReturn>();// shardingItemParameters为参数表解析出来的Key/Value值Map<Integer, String> shardingItemParameters = saturnContext.getShardingItemParameters();// items为需要处理的作业分片List<Integer> items = saturnContext.getShardingItems();LogUtils.info(log, jobName, "Job {} handle items: {}", jobName, items);for (Integer item : items) {// 兼容配置错误,如配置3个分片, 参数表配置为0=*, 2=*, 则1分片不会执行if (!shardingItemParameters.containsKey(item)) {LogUtils.error(log, jobName,"The {} item's parameter is not valid, will not execute the business code, please check shardingItemParameters",items);SaturnJobReturn errRet = new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL,"Config of parameter is not valid, check shardingItemParameters", SaturnSystemErrorGroup.FAIL);retMap.put(item, errRet);}}Map<Integer, SaturnJobReturn> handleJobMap = handleJob(saturnContext);if (handleJobMap != null) {retMap.putAll(handleJobMap);}// 汇总修改for (Integer item : items) {if (item == null) {continue;}SaturnJobReturn saturnJobReturn = retMap.get(item);if (saturnJobReturn == null) {saturnJobReturn = new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL,"Can not find the corresponding SaturnJobReturn", SaturnSystemErrorGroup.FAIL);retMap.put(item, saturnJobReturn);}// 将结果上传到ProcessCountStatistics,最后传入zkupdateExecuteResult(saturnJobReturn, saturnContext, item);}long end = System.currentTimeMillis();LogUtils.info(log, jobName, "{} finished, totalCost={}ms, return={}", jobName, (end - start), retMap);}
如果当前的job
为java
,则进入:
//SaturnJavaJob
protected Map<Integer, SaturnJobReturn> handleJob(final SaturnExecutionContext shardingContext) {final Map<Integer, SaturnJobReturn> retMap = new HashMap<Integer, SaturnJobReturn>();synchronized (futureTaskMap) {futureTaskMap.clear();final String jobName = shardingContext.getJobName();final int timeoutSeconds = getTimeoutSeconds();ExecutorService executorService = getExecutorService();// 处理自定义参数String jobParameter = shardingContext.getJobParameter();// shardingItemParameters为参数表解析出来的Key/Value值Map<Integer, String> shardingItemParameters = shardingContext.getShardingItemParameters();for (final Entry<Integer, String> shardingItem : shardingItemParameters.entrySet()) {final Integer key = shardingItem.getKey();try {String jobValue = shardingItem.getValue();final String itemVal = getRealItemValue(jobParameter, jobValue); // 作业分片的对应值// 将当前的job放入异步现场池进行处理,让分片可以几乎并发执行的ShardingItemFutureTask shardingItemFutureTask = new ShardingItemFutureTask(createCallable(jobName, key, itemVal, timeoutSeconds, shardingContext, this), null);Future<?> callFuture = executorService.submit(shardingItemFutureTask);if (timeoutSeconds > 0) {TimeoutSchedulerExecutor.scheduleTimeoutJob(shardingContext.getExecutorName(), timeoutSeconds,shardingItemFutureTask);}shardingItemFutureTask.setCallFuture(callFuture);futureTaskMap.put(key, shardingItemFutureTask);} catch (Throwable t) {LogUtils.error(log, jobName, t.getMessage(), t);retMap.put(key, new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL, t.getMessage(),SaturnSystemErrorGroup.FAIL));}}}// 汇总执行的过程for (Entry<Integer, ShardingItemFutureTask> entry : futureTaskMap.entrySet()) {Integer item = entry.getKey();ShardingItemFutureTask futureTask = entry.getValue();try {futureTask.getCallFuture().get();} catch (Exception e) {LogUtils.error(log, jobName, e.getMessage(), e);retMap.put(item, new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL, e.getMessage(),SaturnSystemErrorGroup.FAIL));continue;}retMap.put(item, futureTask.getCallable().getSaturnJobReturn());}synchronized (futureTaskMap) {futureTaskMap.clear();}return retMap;}
最后是放入JavaShardingItemCallable
进行执行的:
public SaturnJobReturn call() {reset();SaturnSystemOutputStream.initLogger();currentThread = Thread.currentThread();SaturnJobReturn temp = null;try {beforeExecution();// 调用saturn的抽象方法temp = doExecution();// 在此之后,不能再强制停止本线程breakForceStop = true;} catch (Throwable t) {// 在此之后,不能再强制停止本线程breakForceStop = true;// 不是超时,不是强制停止。 打印错误日志,设置SaturnJobReturn。if (status.get() != TIMEOUT && status.get() != FORCE_STOP) {LogUtils.error(log, jobName, t.toString(), t);temp = new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL, t.getMessage(),SaturnSystemErrorGroup.FAIL);}} finally {if (status.compareAndSet(INIT, SUCCESS)) {saturnJobReturn = temp;}if (saturnJob != null && saturnJob.getConfigService().showNormalLog()) {String jobLog = SaturnSystemOutputStream.clearAndGetLog();if (jobLog != null && jobLog.length() > SaturnConstant.MAX_JOB_LOG_DATA_LENGTH) {LogUtils.info(log, jobName,"As the job log exceed max length, only the previous {} characters will be reported",SaturnConstant.MAX_JOB_LOG_DATA_LENGTH);jobLog = jobLog.substring(0, SaturnConstant.MAX_JOB_LOG_DATA_LENGTH);}this.shardingContext.putJobLog(this.item, jobLog);}}return saturnJobReturn;}
细说状态
上文已经完整的走了一遍job
的运行,接下来就来详细看下executor
和job
的状态,通过源码结合saturn
的wiki
。架构图:
saturn
是重度依赖zk
的,无论是executor
的管理,job
的配置下发以及结果都是通过zk
来做的。
executor
的状态:
上线:
上文就已经提到,当上线的时候会在对应的/${namespace}/$SaturnExecutors/executors/${executorName}
。
而在console
中,每一个namespace
都会有对应的NamespaceShardingManager
,里面会针对/${namespace}/$SaturnExecutors/executors/
路径有监听器ExecuteOnlineShardingTask
。当触发事件为TreeCacheEvent.Type.NODE_ADDED
的时候,就会执行上线的逻辑,该逻辑会走到上文提到的AbstractAsyncShardingTask.run
方法,会进行对应的分片算法(实现了AbstractAsyncShardingTask.pick
)并且充分片后,将分片后的数据写入/${namespace}/$Jobs/${jonName}/leader/sharding/necessary
,结合前文,每次job
执行前回去判断是否需要分片,依据就是这个necessary
是否为0,因为每次分片完成后,executor
的leader
会将这个值写为0.
下线:和上线差不多,只是监听的事件为TreeCacheEvent.Type.NODE_REMOVED
。
job
的状态:
job
的状态分为两种,一种是认为操作,也就是从console
上点击作业禁用,新增,启用,删除等等。这些操作其实都是从console
的角度做的,也就是上面的NamespaceShardingManager
,他在pick
的时候,会将当前的作业从调度列表删除,然后executor
在进行数据分片的时候就会针对当前的job
进行调度或者不调度。
比如:
job
的启用和禁用,console
通过监听/job/config
下的enable
配置文件,最后通过增加或者移除当前executor
上的job
来实现。
还有一种就是job
在运行过程中出现问题。需要转移分片等等,这些状态的变化是在executor
上进行转换的,也就是JobScheduler.init()
里面有个startAll
会将所有的路径监听器启动:
@Overridepublic void start() {electionListenerManager = new ElectionListenerManager(jobScheduler);failoverListenerManager = new FailoverListenerManager(jobScheduler);jobOperationListenerManager = new JobOperationListenerManager(jobScheduler);configurationListenerManager = new ConfigurationListenerManager(jobScheduler);shardingListenerManager = new ShardingListenerManager(jobScheduler);analyseResetListenerManager = new AnalyseResetListenerManager(jobScheduler);controlListenerManager = new ControlListenerManager(jobScheduler);electionListenerManager.start();failoverListenerManager.start();jobOperationListenerManager.start();configurationListenerManager.start();shardingListenerManager.start();analyseResetListenerManager.start();controlListenerManager.start();}
这里面部分监听器的实现,主要是为了让客户端感知到当前job
的状态,如点击立即执行的时候,会被JobOperationListenerManager
监听到,从而进行立即调度。当修改了状态如enable
修改为false
也就是禁用job
的时候,ConfigurationListenerManager
会将当前的config
更新,然后在运行过程中AbstractElasticJob.execute
会判断当前的job
是否为enable
状态。而且会调用job
的onEnabled
方法。
job
运行后的failover
:
上文提到,当作业开始运行的时候,会在对应的execution/${item}/
上注册running
的临时节点。当作业完成后,也会在注册completed
信息。当作业开始运行的时候,FailoverListenerManager
会监听execution
路径,分为两种:
class RunningPathListener implements NodeCacheListener {private int item;public RunningPathListener(int item) {this.item = item;}@Overridepublic void nodeChanged() throws Exception {// 由于当前的监听为execution/${item}/ 该节点下在运行的时候会将executor的节点写入,如果发生节点变化,则说明两种情况:当前的分片依据完成,当前的分片异常关闭需要failover。//getJobNodeStorage().fillEphemeralJobNode(ExecutionNode.getRunningNode(item), executorName);zkCacheManager.getExecutorService().execute(new Runnable() {@Overridepublic void run() {try {if (isShutdown) {return;}if (!executionService.isRunning(item)) {failover(item);}} catch (Throwable t) {LogUtils.error(log, jobName, t.getMessage(), t);}}});}}
当running
节点数据发生变化分为三种情况,第一个是新增了running
节点,说明分片正在运行,或者删除了running
节点,那么说明两种情况,第一就是正常运行结束,还有一种就是异常失败:
private synchronized void failover(final Integer item) {if (jobScheduler == null || jobScheduler.getJob() == null) {return;}if (!jobScheduler.getJob().isFailoverSupported() || !configService.isFailover() || executionService.isCompleted(item)) {return;}failoverService.createCrashedFailoverFlag(item);if (!executionService.hasRunningItems(jobScheduler.getShardingService().getLocalHostShardingItems())) {failoverService.failoverIfNecessary();}
}
所以failover
会判断当前是否运行异常转移和是否正常借宿,如果都不是,则创建failover
,然后执行failover
的逻辑:
class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {@Overridepublic void execute() {if (!needFailover()) {return;}if (jobScheduler == null) {return;}if (coordinatorRegistryCenter.isExisted(SaturnExecutorsNode.getExecutorNoTrafficNodePath(executorName))) {return;}if (!jobScheduler.getConfigService().getPreferList().contains(executorName) && !jobScheduler.getConfigService().isUseDispreferList()) {return;}List<String> items = getJobNodeStorage().getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT);if (items != null && !items.isEmpty()) {int crashedItem = Integer.parseInt(getJobNodeStorage().getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));LogUtils.debug(log, jobName, "Elastic job: failover job begin, crashed item:{}.", crashedItem);getJobNodeStorage().fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), executorName);getJobNodeStorage().removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));jobScheduler.triggerJob(null);}}
}
首先是在选主人。然后在FailoverLeaderExecutionCallback
判断当前的节点是否被摘除流量,是否存在优先队列,是否有配置只使用优先地队列,如果说当前的配置了优先队列,但是当前的executor
没有在此节点上,但是没有配置只使用优先队列,则可以进行失败转移,如果既设置了优先队列,又配置了只使用优先队列,而且当前的executor
不属于优先队列,则不会进行失败转移。然后会先获取failover
的列表,执行对应的分片,从而完成对应的failover
saturn 源码解析相关推荐
- 谷歌BERT预训练源码解析(二):模型构建
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/weixin_39470744/arti ...
- 谷歌BERT预训练源码解析(三):训练过程
目录 前言 源码解析 主函数 自定义模型 遮蔽词预测 下一句预测 规范化数据集 前言 本部分介绍BERT训练过程,BERT模型训练过程是在自己的TPU上进行的,这部分我没做过研究所以不做深入探讨.BE ...
- 谷歌BERT预训练源码解析(一):训练数据生成
目录 预训练源码结构简介 输入输出 源码解析 参数 主函数 创建训练实例 下一句预测&实例生成 随机遮蔽 输出 结果一览 预训练源码结构简介 关于BERT,简单来说,它是一个基于Transfo ...
- Gin源码解析和例子——中间件(middleware)
在<Gin源码解析和例子--路由>一文中,我们已经初识中间件.本文将继续探讨这个技术.(转载请指明出于breaksoftware的csdn博客) Gin的中间件,本质是一个匿名回调函数.这 ...
- Colly源码解析——结合例子分析底层实现
通过<Colly源码解析--框架>分析,我们可以知道Colly执行的主要流程.本文将结合http://go-colly.org上的例子分析一些高级设置的底层实现.(转载请指明出于break ...
- libev源码解析——定时器监视器和组织形式
我们先看下定时器监视器的数据结构.(转载请指明出于breaksoftware的csdn博客) /* invoked after a specific time, repeatable (based o ...
- libev源码解析——定时器原理
本文将回答<libev源码解析--I/O模型>中抛出的两个问题.(转载请指明出于breaksoftware的csdn博客) 对于问题1:为什么backend_poll函数需要指定超时?我们 ...
- libev源码解析——I/O模型
在<libev源码解析--总览>一文中,我们介绍过,libev是一个基于事件的循环库.本文将介绍其和事件及循环之间的关系.(转载请指明出于breaksoftware的csdn博客) 目前i ...
- libev源码解析——调度策略
在<libev源码解析--监视器(watcher)结构和组织形式>中介绍过,监视器分为[2,-2]区间5个等级的优先级.等级为2的监视器最高优,然后依次递减.不区分监视器类型和关联的文件描 ...
最新文章
- Servlet中的配置 web.xml
- 爬虫必备工具,掌握它就解决了一半的问题
- java静态链表_用Java实现一个静态链表
- SuperMap.IS.AjaxScript缓冲区分析及专题图制作
- Stream流中的常用方法_limit
- java hibernate 分页查询_4 Hibernate HQL查询,分页查询
- KLT(Kanade-Lucas-Tomasi )
- SQLi LABS Less-6 报错注入+布尔盲注
- LayuiAdmin 滚动条设置问题解决
- 用HTML简单制作一个网页
- 14.拉格朗日插值公式
- pyodbc 测试连接 SQL Server 数据库
- 下载新版火狐后无法同步书签_firefox无法同步书签,恢复备份文件失败的解决办法...
- 操作员或系统管理员_什么是系统管理员?
- python写诗代码_牛逼了,用Python写个会做诗的机器人
- 使用Tiled编辑铁锈战争自定义地图
- php unlink没有权限,php中删除文件用unlink函数权限判断_PHP教程
- 显卡天梯图2020年10月完整版
- FileZilla 下载
- springboot+vue基本微信小程序的疫情防控平台系统 计算机毕业设计
热门文章
- 2022年秋季期寒假总结
- ptapython答案第四章_ptapython答案
- DSPE-PEG-VAP/DCDX/LyP-1/M2pep/GLP-1/HP2/FNB/CPPs/CGKRKb 磷脂-聚乙二醇-多肽
- 247、(案例)javaScript对象--注册表单升级,添加表单校验(利用正则表达式)
- 微信7.0.16 for iOS测试版新增隐藏会话功能
- 李雅普诺夫稳定性判别方法
- Java对中文进行排序
- Vue - 实现微信扫码登录功能(项目植入微信扫码登录功能)超详细完整流程详解及详细代码及注释,附带完整功能源码、常见问题解决方案
- sh脚本中一些命令使用总结及sed命令
- MIKE 21 教程 1.2 网格搭建界面介绍之点线面要素的高阶处理 (Mesh Generator 工具)