xxl-job源码解读:调度器schedule

本文基于xxl-job的2.3.1版本

基本说明

基本原理概述

调用器主要的用于判断定时任务的执行时间,按时调用触发器(trigger),再由触发器去获取任务信息,执行预先定义好的程序。

调度器的基本原理为启用一个线程,循环去查询所有任务状态,获取一个时间段内,已经到执行时间或者即将执行的任务,通知触发器去执行任务。

任务信息存储

定时任务的信息存储大致可分为两类:

  1. 应用内存:应用启动时将任务信息加载到内存里,调度器通过读取这些信息进行调度,这种功能一般比较单一,且不支持集群(即集群情况下任务会出现每个节点都执行一次的情况),例如spring scheduled-task
  2. 数据库存储:将任务信息存储于数据库或某中间件中,调度器通过锁的方式进行集群调度,能实现的功能更为丰富,xxl-job归属于此类

调度器源码解读

xxl-job调度器代码主要在 com.xxl.job.admin.core.thread.JobScheduleHelper

代码逻辑流程图

调度器执行主流程

调度器线程启动加载完毕之后,在收到调度器终止命令之前(一般为应用关闭),会一直循环执行:

  1. 通过xxl_job_lock表,获取数据库锁,防止集群下调度器任务调度并发冲突

    conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
    connAutoCommit = conn.getAutoCommit();
    conn.setAutoCommit(false);preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update");
    preparedStatement.execute();
    
  2. 读取xxl_job_info 信息,获取未来5秒内执行的任务信息

    // 1、pre read 预读任务执行时间,获取可执行任务集合
    long nowTime = System.currentTimeMillis();
    List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
    
        <select id="scheduleJobQuery" parameterType="java.util.HashMap" resultMap="XxlJobInfo">SELECT <include refid="Base_Column_List" />FROM xxl_job_info AS tWHERE t.trigger_status = 1and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}ORDER BY id ASCLIMIT #{pagesize}</select>
    
  3. 根据任务的触发时间与当前时间的对比,判断进入三种处理方式:

    1. 过期处理策略:跳过执行(默认跳过,可配置为执行一次),根据当前时间,重新计算下次触发时间并更新

      // 过期处理策略: 过期超5s, 本次忽略, 当前时间开始计算下次触发时间
      logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = {}", jobInfo.getId());// 1、misfire match
      MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
      if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// FIRE_ONCE_NOW 》 trigger
      JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
      logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = {}", jobInfo.getId());
      }// 2、fresh next
      refreshNextValidTime(jobInfo, new Date());
      
    2. 交给触发器立即执行,并更新下次执行时间。如果下次执行时间在5秒以内,计算更新下下次执行时间,并将任务加入预执行集合ringData

      // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
      // 1、trigger
      JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
      logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = {}", jobInfo.getId());// 2、fresh next
      refreshNextValidTime(jobInfo, new Date());// next-trigger-time in 5s, pre-read again
      if (jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {// 1、make ring second
      int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);// 2、push time ring
      pushTimeRing(ringSecond, jobInfo.getId());// 3、fresh next
      refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
      
    3. 加入预执行集合ringData,并更新下次执行时间

          /*** 预提取执行的任务集合 触发秒->任务ID集合*/private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
      
      // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time// 1、make ring second
      int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);// 2、push time ring
      pushTimeRing(ringSecond, jobInfo.getId());// 3、fresh next
      refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
      
          private void pushTimeRing(int ringSecond, int jobId) {// push async ringList<Integer> ringItemData = ringData.computeIfAbsent(ringSecond, k -> new ArrayList<>());ringItemData.add(jobId);logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : {} = {}" , ringSecond, Collections.singletonList(ringItemData));}
      
  4. 根据当次循环耗时,选择睡眠时长,同时进行秒数对齐,保证下次循环开始时间为整秒(毫秒级)

    while (!scheduleThreadToStop) {// Scan Joblong start = System.currentTimeMillis();// 此处省略流程代码long cost = System.currentTimeMillis() - start;// Wait seconds, align secondif (cost < 1000) {  // scan-overtime, not waittry {// pre-read period: success > scan each second; fail > skip this period;TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);Thread.currentThread().interrupt();}}}}
    

预执行线程

用于处理被调度器抓取,但未到执行时间的任务,循环根据触发的秒数进行判断,对到达执行秒数的任务,调用触发器进行任务执行

        // ring threadringThread = new Thread(() -> {while (!ringThreadToStop) {// align secondtry {TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!ringThreadToStop) {logger.error(e.getMessage(), e);Thread.currentThread().interrupt();}}try {// second dataList<Integer> ringItemData = new ArrayList<>();int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);if (tmpData != null) {ringItemData.addAll(tmpData);}}// ring triggerlogger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData));if (!ringItemData.isEmpty()) {// do triggerfor (int jobId : ringItemData) {// do triggerJobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);}// clearringItemData.clear();}} catch (Exception e) {if (!ringThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");});

xxl-job源码解读:调度器schedule相关推荐

  1. xxl-job源码解读(调度器调度执行器)

    一.执行器与调度器流程图 二.调度器调度执行器时序图 三.执行器接收到调度器的调度信息执行job时序图 四.调度器远程调用执行器部分代码片段 1.配置类实现InitializingBean接口,初始化 ...

  2. MySQL内核源码解读-SQL解析之解析器浅析

    MYSQL服务器接收SQL格式的查询,首先要对sql进行解析,内部将文本格式转换为二进制结构,这个转换就是解析器,解析的目的是为了让优化器更好的处理指令,以便以最优的路径,最少的耗时返回我们想要的结果 ...

  3. 【JVM】Java类加载器设计原理(ClassLoader源码解读/ SPI机制/ 绕开双亲委派/ 常见Java虚拟机)

    目录 1. 什么是类加载器 2. 类加载器加载的过程 3. Class文件读取来源 4. 类加载器的分类 5. 那些操作会初始化类加载器 6. 类加载器的双亲委派机制 6.1 双亲委派机制机制的好处 ...

  4. 【Android】OkHttp源码解读逐字稿(1)-拦截器

    目录 0.前言 1.OkHttp的简单使用 2.浅析开始 拦截器 链式调用流程示意图 第 0 个拦截器 第一个 拦截器 RetryAndFollowUpInterceptor 第二个拦截器 Bridg ...

  5. Zeus源码解读之定时任务执行与手动执行任务的过程分析

    Zeus源码解读之定时任务执行与手动执行任务的过程分析 zeus集群依赖任务执行模式  宙斯中任务出去任务独立调度之外,支持任务直接的复杂依赖调度,如下图一所示: 图1  A为根任务,B,C依赖A任务 ...

  6. golang源码分析-调度概述

    golang源码分析-调度过程概述 本文主要概述一下golang的调度器的大概工作的流程,众所周知golang是基于用户态的协程的调度来完成多任务的执行.在Linux操作系统中,以往的多线程执行都是通 ...

  7. mobx 源码解读(四):讲讲 autorun 和 reaction

    原文地址:mobx autorun 文本是 mobx 源码解读系列 第四篇 本系列文章全部采用 mobx 较新版本:v5.13.0 mobx 源码解读 issue,欢迎讨论 技术前提 在阅读之前,希望 ...

  8. CesiumJS 2022^ 源码解读[7] - 3DTiles 的请求、加载处理流程解析

    3DTiles 与 I3S 是竞争关系,可是比起生态开放性.数据定义的灵活性与易读性来说,3DTiles 比 I3S 好太多了.由于数据生产工具的开发者水平参差不齐,且数据并不存在极致的.万能的优化方 ...

  9. 约2万字-Vue源码解读汇总篇(续更)

    约2万字-Vue源码解读汇总篇(续更) 一.前言 1.系列汇总 未完待续... Vue源码解读:06Vue3探索篇 Vue源码解读:05生命周期篇 Vue源码解读:04模板编译篇 Vue源码解读:03 ...

最新文章

  1. 扩展欧几里得 POJ 1061
  2. 如何在Swift 3中创建调度队列
  3. html5电影在线看,HTML5-电影影评网
  4. 2020-12-14 Matlab 模糊控制 车辆泊车 案例分享
  5. 信息系统项目管理师-第5章:项目范围管理-重点汇总
  6. [二叉树建树] 后序遍历与中序遍历建立二叉树
  7. 有关计算机发展的英语作文,写一篇英语短文,介绍电脑的发展变化,并谈谈你对电脑的看法并翻译...
  8. 微信已停止访问该网页怎么解决
  9. webpack4.x加vue模板文件简单还原vue-cli
  10. mysql-5.7.24 linux下载_Linux下安装mysql-5.7.24
  11. Laravel 项目开发规范
  12. 计算机三角函数习惯原创的音乐,三角函数 UNIT版
  13. paip.cpu占用高解决方案---ThreadMast 跟Process Lasso的使用
  14. 《产品经理面试攻略》PART 6:产品群面题
  15. 卷积码原理及基本概念
  16. windows 批量创建文件夹
  17. 非正常关闭vim编辑器后提示错误的解决方法
  18. 病毒制作实践小记:运行关机、蓝屏炸弹、进程关闭、拓展名病毒
  19. dnf剑魂buff等级上限_DNF:95职业加强前瞻,爆发能力明显提升,瞎子大冰可点TP!...
  20. HDU6438-Buy and Resell(贪心、思维题)

热门文章

  1. leetcode刷题方法
  2. 使用SSM为学校医务室开发一套管理系统
  3. 使用百度文字识别API进行图片中文字的识别
  4. 有便宜又好用的云服务器推荐?
  5. 乒乓球比赛赛程_2020年乒乓球比赛赛事赛程表(优个网独家整理)
  6. 广域网技术——iFIT
  7. 一文详解!你真的了解商业智能BI吗?
  8. 【机器学习】泛化误差上界
  9. 开发手机游戏的步骤是怎样的?
  10. window下xmind-pro-8破解版