xxl-job源码解读:调度器schedule
xxl-job源码解读:调度器schedule
本文基于xxl-job的2.3.1版本
基本说明
基本原理概述
调用器主要的用于判断定时任务的执行时间,按时调用触发器(trigger),再由触发器去获取任务信息,执行预先定义好的程序。
调度器的基本原理为启用一个线程,循环去查询所有任务状态,获取一个时间段内,已经到执行时间或者即将执行的任务,通知触发器去执行任务。
任务信息存储
定时任务的信息存储大致可分为两类:
- 应用内存:应用启动时将任务信息加载到内存里,调度器通过读取这些信息进行调度,这种功能一般比较单一,且不支持集群(即集群情况下任务会出现每个节点都执行一次的情况),例如spring scheduled-task
- 数据库存储:将任务信息存储于数据库或某中间件中,调度器通过锁的方式进行集群调度,能实现的功能更为丰富,xxl-job归属于此类
调度器源码解读
xxl-job调度器代码主要在 com.xxl.job.admin.core.thread.JobScheduleHelper
中
代码逻辑流程图
调度器执行主流程
调度器线程启动加载完毕之后,在收到调度器终止命令之前(一般为应用关闭),会一直循环执行:
通过
xxl_job_lock
表,获取数据库锁,防止集群下调度器任务调度并发冲突conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false);preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update"); preparedStatement.execute();
读取
xxl_job_info
信息,获取未来5秒内执行的任务信息// 1、pre read 预读任务执行时间,获取可执行任务集合 long nowTime = System.currentTimeMillis(); List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
<select id="scheduleJobQuery" parameterType="java.util.HashMap" resultMap="XxlJobInfo">SELECT <include refid="Base_Column_List" />FROM xxl_job_info AS tWHERE t.trigger_status = 1and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}ORDER BY id ASCLIMIT #{pagesize}</select>
根据任务的触发时间与当前时间的对比,判断进入三种处理方式:
过期处理策略:跳过执行(默认跳过,可配置为执行一次),根据当前时间,重新计算下次触发时间并更新
// 过期处理策略: 过期超5s, 本次忽略, 当前时间开始计算下次触发时间 logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = {}", jobInfo.getId());// 1、misfire match MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING); if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// FIRE_ONCE_NOW 》 trigger JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = {}", jobInfo.getId()); }// 2、fresh next refreshNextValidTime(jobInfo, new Date());
交给触发器立即执行,并更新下次执行时间。如果下次执行时间在5秒以内,计算更新下下次执行时间,并将任务加入预执行集合
ringData
中// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time // 1、trigger JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = {}", jobInfo.getId());// 2、fresh next refreshNextValidTime(jobInfo, new Date());// next-trigger-time in 5s, pre-read again if (jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {// 1、make ring second int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);// 2、push time ring pushTimeRing(ringSecond, jobInfo.getId());// 3、fresh next refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
加入预执行集合
ringData
,并更新下次执行时间/*** 预提取执行的任务集合 触发秒->任务ID集合*/private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time// 1、make ring second int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);// 2、push time ring pushTimeRing(ringSecond, jobInfo.getId());// 3、fresh next refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
private void pushTimeRing(int ringSecond, int jobId) {// push async ringList<Integer> ringItemData = ringData.computeIfAbsent(ringSecond, k -> new ArrayList<>());ringItemData.add(jobId);logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : {} = {}" , ringSecond, Collections.singletonList(ringItemData));}
根据当次循环耗时,选择睡眠时长,同时进行秒数对齐,保证下次循环开始时间为整秒(毫秒级)
while (!scheduleThreadToStop) {// Scan Joblong start = System.currentTimeMillis();// 此处省略流程代码long cost = System.currentTimeMillis() - start;// Wait seconds, align secondif (cost < 1000) { // scan-overtime, not waittry {// pre-read period: success > scan each second; fail > skip this period;TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);Thread.currentThread().interrupt();}}}}
预执行线程
用于处理被调度器抓取,但未到执行时间的任务,循环根据触发的秒数进行判断,对到达执行秒数的任务,调用触发器进行任务执行
// ring threadringThread = new Thread(() -> {while (!ringThreadToStop) {// align secondtry {TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!ringThreadToStop) {logger.error(e.getMessage(), e);Thread.currentThread().interrupt();}}try {// second dataList<Integer> ringItemData = new ArrayList<>();int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);if (tmpData != null) {ringItemData.addAll(tmpData);}}// ring triggerlogger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData));if (!ringItemData.isEmpty()) {// do triggerfor (int jobId : ringItemData) {// do triggerJobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);}// clearringItemData.clear();}} catch (Exception e) {if (!ringThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");});
xxl-job源码解读:调度器schedule相关推荐
- xxl-job源码解读(调度器调度执行器)
一.执行器与调度器流程图 二.调度器调度执行器时序图 三.执行器接收到调度器的调度信息执行job时序图 四.调度器远程调用执行器部分代码片段 1.配置类实现InitializingBean接口,初始化 ...
- MySQL内核源码解读-SQL解析之解析器浅析
MYSQL服务器接收SQL格式的查询,首先要对sql进行解析,内部将文本格式转换为二进制结构,这个转换就是解析器,解析的目的是为了让优化器更好的处理指令,以便以最优的路径,最少的耗时返回我们想要的结果 ...
- 【JVM】Java类加载器设计原理(ClassLoader源码解读/ SPI机制/ 绕开双亲委派/ 常见Java虚拟机)
目录 1. 什么是类加载器 2. 类加载器加载的过程 3. Class文件读取来源 4. 类加载器的分类 5. 那些操作会初始化类加载器 6. 类加载器的双亲委派机制 6.1 双亲委派机制机制的好处 ...
- 【Android】OkHttp源码解读逐字稿(1)-拦截器
目录 0.前言 1.OkHttp的简单使用 2.浅析开始 拦截器 链式调用流程示意图 第 0 个拦截器 第一个 拦截器 RetryAndFollowUpInterceptor 第二个拦截器 Bridg ...
- Zeus源码解读之定时任务执行与手动执行任务的过程分析
Zeus源码解读之定时任务执行与手动执行任务的过程分析 zeus集群依赖任务执行模式 宙斯中任务出去任务独立调度之外,支持任务直接的复杂依赖调度,如下图一所示: 图1 A为根任务,B,C依赖A任务 ...
- golang源码分析-调度概述
golang源码分析-调度过程概述 本文主要概述一下golang的调度器的大概工作的流程,众所周知golang是基于用户态的协程的调度来完成多任务的执行.在Linux操作系统中,以往的多线程执行都是通 ...
- mobx 源码解读(四):讲讲 autorun 和 reaction
原文地址:mobx autorun 文本是 mobx 源码解读系列 第四篇 本系列文章全部采用 mobx 较新版本:v5.13.0 mobx 源码解读 issue,欢迎讨论 技术前提 在阅读之前,希望 ...
- CesiumJS 2022^ 源码解读[7] - 3DTiles 的请求、加载处理流程解析
3DTiles 与 I3S 是竞争关系,可是比起生态开放性.数据定义的灵活性与易读性来说,3DTiles 比 I3S 好太多了.由于数据生产工具的开发者水平参差不齐,且数据并不存在极致的.万能的优化方 ...
- 约2万字-Vue源码解读汇总篇(续更)
约2万字-Vue源码解读汇总篇(续更) 一.前言 1.系列汇总 未完待续... Vue源码解读:06Vue3探索篇 Vue源码解读:05生命周期篇 Vue源码解读:04模板编译篇 Vue源码解读:03 ...
最新文章
- 扩展欧几里得 POJ 1061
- 如何在Swift 3中创建调度队列
- html5电影在线看,HTML5-电影影评网
- 2020-12-14 Matlab 模糊控制 车辆泊车 案例分享
- 信息系统项目管理师-第5章:项目范围管理-重点汇总
- [二叉树建树] 后序遍历与中序遍历建立二叉树
- 有关计算机发展的英语作文,写一篇英语短文,介绍电脑的发展变化,并谈谈你对电脑的看法并翻译...
- 微信已停止访问该网页怎么解决
- webpack4.x加vue模板文件简单还原vue-cli
- mysql-5.7.24 linux下载_Linux下安装mysql-5.7.24
- Laravel 项目开发规范
- 计算机三角函数习惯原创的音乐,三角函数 UNIT版
- paip.cpu占用高解决方案---ThreadMast 跟Process Lasso的使用
- 《产品经理面试攻略》PART 6:产品群面题
- 卷积码原理及基本概念
- windows 批量创建文件夹
- 非正常关闭vim编辑器后提示错误的解决方法
- 病毒制作实践小记:运行关机、蓝屏炸弹、进程关闭、拓展名病毒
- dnf剑魂buff等级上限_DNF:95职业加强前瞻,爆发能力明显提升,瞎子大冰可点TP!...
- HDU6438-Buy and Resell(贪心、思维题)