文章目录

  • @Scheduled源码解析
    • 解析部分
    • 执行部分
  • 小结

@Scheduled源码解析

解析部分

定时任务调度的基础是ScheduledAnnotationBeanPostProcessor类,这是一个实现了BeanPostProcessor接口的后置处理器。

关于BeanPostProcessor,最主要就是看postProcessBeforeInitialization方法和postProcessAfterInitialization方法做了什么逻辑。
postProcessBeforeInitialization方法没有实现逻辑,所以看postProcessAfterInitialization方法的逻辑。

@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) {/*** 上面省略部分代码,看下面的关键代码*/if (!this.nonAnnotatedClasses.contains(targetClass) &&AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {/*** 这里一长串代码是为了获取被@Scheduled和@Schedules注解的方法*/Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);return (!scheduledMethods.isEmpty() ? scheduledMethods : null);});//如果没有被@Scheduled和@Schedules注解的方法,当前bean加入到nonAnnotatedClasses集合中,不进行处理if (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(targetClass);if (logger.isTraceEnabled()) {logger.trace("No @Scheduled annotations found on bean class: " + targetClass);}}else {//如果存在被@Scheduled和@Schedules注解的方法,针对每个方法调用processScheduled方法annotatedMethods.forEach((method, scheduledMethods) ->scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));if (logger.isTraceEnabled()) {logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +"': " + annotatedMethods);}}}return bean;}

根据以上代码,总结出ScheduledAnnotationBeanPostProcessor 类做的事情:
(1)获取被@Scheduled和@Schedules注解标记的方法,若没有,将此Bean加入到nonAnnotatedClasses集合中。
(2)存在被@Scheduled和@Schedules注解的方法,针对每个方法调用processScheduled方法

所以,接下来就是分析关键在于processScheduled方法做的逻辑

protected void processScheduled(Scheduled scheduled, Method method, Object bean) {try {//将被注解的方法封装为ScheduledMethodRunnableRunnable runnable = createRunnable(bean, method);boolean processedSchedule = false;String errorMessage ="Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";Set<ScheduledTask> tasks = new LinkedHashSet<>(4);// 解析initialDelay 的值,字符串和整型值不能同时配置long initialDelay = scheduled.initialDelay();String initialDelayString = scheduled.initialDelayString();if (StringUtils.hasText(initialDelayString)) {Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");if (this.embeddedValueResolver != null) {initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);}if (StringUtils.hasLength(initialDelayString)) {try {initialDelay = parseDelayAsLong(initialDelayString);}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");}}}// Check cron expressionString cron = scheduled.cron();if (StringUtils.hasText(cron)) {String zone = scheduled.zone();if (this.embeddedValueResolver != null) {cron = this.embeddedValueResolver.resolveStringValue(cron);zone = this.embeddedValueResolver.resolveStringValue(zone);}if (StringUtils.hasLength(cron)) {Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");processedSchedule = true;if (!Scheduled.CRON_DISABLED.equals(cron)) {TimeZone timeZone;if (StringUtils.hasText(zone)) {timeZone = StringUtils.parseTimeZoneString(zone);}else {timeZone = TimeZone.getDefault();}tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));}}}// At this point we don't need to differentiate between initial delay set or not anymoreif (initialDelay < 0) {initialDelay = 0;}// Check fixed delaylong fixedDelay = scheduled.fixedDelay();if (fixedDelay >= 0) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));}String fixedDelayString = scheduled.fixedDelayString();if (StringUtils.hasText(fixedDelayString)) {if (this.embeddedValueResolver != null) {fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);}if (StringUtils.hasLength(fixedDelayString)) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;try {fixedDelay = parseDelayAsLong(fixedDelayString);}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");}tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));}}// Check fixed ratelong fixedRate = scheduled.fixedRate();if (fixedRate >= 0) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));}String fixedRateString = scheduled.fixedRateString();if (StringUtils.hasText(fixedRateString)) {if (this.embeddedValueResolver != null) {fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);}if (StringUtils.hasLength(fixedRateString)) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;try {fixedRate = parseDelayAsLong(fixedRateString);}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");}tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));}}// Check whether we had any attribute setAssert.isTrue(processedSchedule, errorMessage);// Finally register the scheduled taskssynchronized (this.scheduledTasks) {Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));regTasks.addAll(tasks);}}catch (IllegalArgumentException ex) {throw new IllegalStateException("Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());}}

根据以上代码,大致逻辑如下:
(1)解析initialDelay的值
(2)根据@Scheduled注解的属性配置,分别将此bean的被注解//方法封装为CronTask,FixedDelayTask,FixedRateTask
(3)this.registrar 根据封装的任务类型使用对应的调度方法scheduleXXX
若此时taskScheduler局部变量还没有初始化完成,那么将会加入到一个临时的集合存起来,不进行调度,这个taskScheduler可以看做是一个调度任务专用的线程池
(4)调度方法返回的结果加入到tasks集合中
(5)然后按照bean分类,放入scheduledTasks集合(以bean为key的Map集合)

其中@Scheduled注解 的限制如下:
(1)cron表达式不能与initialDelay,fixedDelay,fixedRate一起配置
(2)fixedDelay不能与cron同时设置
(3)fixedRate不能与cron 同时配置
(4)fixedDelay 和fixedRate不能同时配置

至此postProcessAfterInitialization方法执行完成。
粗略总结一下,这个方法就是把@Scheduled注解的方法解析出来,然后转化为ScheduledTask,这大概是代表了一个定时任务的对象,然后再按bean分组存放到一个Map集合中。

执行部分

经过验证,其实上面在执行postProcessAfterInitialization方法,taskScheduler还是为null的,也就是说,各个定时任务实际上还是没办法开始调度执行。
举个例子:

@Nullablepublic ScheduledTask scheduleFixedRateTask(FixedRateTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}if (this.taskScheduler != null) {if (task.getInitialDelay() > 0) {Date startTime = new Date(System.currentTimeMillis() + task.getInitialDelay());scheduledTask.future =this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), startTime, task.getInterval());}else {scheduledTask.future =this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());}}else {addFixedRateTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);}

此时由于taskScheduler 为null,因此没有执行this.taskScheduler.scheduleAtFixedRate方法,而是调用了addFixedRateTask(task)。(经过测试,就算自定义了taskScheduler,也不会在这时候赋值的)
那上面的this.taskScheduler.scheduleAtFixedRate方法 在什么执行?带着这个疑问,调试打点,最终发现在onApplicationEvent方法中它才会执行调度,此时taskScheduler不为空。

ScheduledAnnotationBeanPostProcessor 类实现了ApplicationListener接口,监听ContextRefreshedEvent 事件。根据以前学习的Spirng加载流程,ContextRefreshedEvent 事件是Spring容器加载完成之后,执行finishRefesh方法时发布的。
在监听方法里面主要执行了finishRegistration()方法

private void finishRegistration() {//片段1if (this.beanFactory instanceof ListableBeanFactory) {Map<String, SchedulingConfigurer> beans =((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());AnnotationAwareOrderComparator.sort(configurers);for (SchedulingConfigurer configurer : configurers) {configurer.configureTasks(this.registrar);}}//省略若干代码....this.registrar.afterPropertiesSet();}

有一个地方,个人觉得值得了解的:
片段1:自定义调度线程池时实现了SchedulingConfigurer接口 的configureTasks方法,这个方法就是在片段1执行的。

然后之后比较重要的。主要看this.registrar.afterPropertiesSet方法
this.registrar.afterPropertiesSet方法里面调用了scheduleTasks()方法

protected void scheduleTasks() {//片段1if (this.taskScheduler == null) {this.localExecutor = Executors.newSingleThreadScheduledExecutor();this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);}//片段2if (this.triggerTasks != null) {for (TriggerTask task : this.triggerTasks) {addScheduledTask(scheduleTriggerTask(task));}}if (this.cronTasks != null) {for (CronTask task : this.cronTasks) {addScheduledTask(scheduleCronTask(task));}}if (this.fixedRateTasks != null) {for (IntervalTask task : this.fixedRateTasks) {addScheduledTask(scheduleFixedRateTask(task));}}if (this.fixedDelayTasks != null) {for (IntervalTask task : this.fixedDelayTasks) {addScheduledTask(scheduleFixedDelayTask(task));}}}

片段1:this.taskScheduler 如果为null,则使用Executors.newSingleThreadScheduledExecutor()。如果是自定义线程池,则不会执行,因为此时已经赋值了。
片段2:根据不同的定时任务类型,分别调用不同的调度API

这里的this.cronTasks,this.fixedRateTasks,this.fixedDelayTasks 就是上面执行processScheduled方法时,因为this.taskScheduler 为null而把定时任务临时存放的地方。
因为现在已经有this.taskScheduler ,因此正式将它们加入调度,并放入scheduledTasks 集合中(已经参与调度的不会重复加入)。

小结

(1)从这个源码分析,可以知道通过实现SchedulingConfigurer接口自定义调度线程池的配置
(2)@Scheduled注解 的限制,不能同时配置多种任务类型

@Scheduled源码解析相关推荐

  1. dubbo源码解析(十)远程通信——Exchange层

    远程通讯--Exchange层 目标:介绍Exchange层的相关设计和逻辑.介绍dubbo-remoting-api中的exchange包内的源码解析. 前言 上一篇文章我讲的是dubbo框架设计中 ...

  2. dubbo(5) Dubbo源码解析之服务调用过程

    来源:https://juejin.im/post/5ca4a1286fb9a05e731fc042 Dubbo源码解析之服务调用过程 简介 在前面的文章中,我们分析了 Dubbo SPI.服务导出与 ...

  3. React深入学习与源码解析笔记

    ***当前阶段的笔记 *** 「面向实习生阶段」https://www.aliyundrive.com/s/VTME123M4T9 提取码: 8s6v 点击链接保存,或者复制本段内容,打开「阿里云盘」 ...

  4. [源码解析] PyTorch 流水线并行实现 (6)--并行计算

    [源码解析] PyTorch 流水线并行实现 (6)–并行计算 文章目录 [源码解析] PyTorch 流水线并行实现 (6)--并行计算 0x00 摘要 0x01 总体架构 1.1 使用 1.2 前 ...

  5. 基于8.0源码解析:startService 启动过程

    基于8.0源码解析:startService 启动过程 首先看一张startService的图,心里有个大概的预估,跟Activity启动流程比,Service的启动稍微简单点,并且我把Service ...

  6. [源码解析] 从TimeoutException看Flink的心跳机制

    [源码解析] 从TimeoutException看Flink的心跳机制 文章目录 [源码解析] 从TimeoutException看Flink的心跳机制 0x00 摘要 0x01 缘由 0x02 背景 ...

  7. 4、Eureka 源码解析 之 Eureka Client 启动原理分析

    在前面的一篇文章 3.Eureka 源码解析 之 Eureka Server 启动原理分析当中我们分析了一下 Eureka Server 的启动.在集群环境下 Eureka Server 相互之前需要 ...

  8. Eureka 源码解析 —— EndPoint 与 解析器

    1. 概述 本文主要分享 EndPoint 与 解析器. EndPoint ,服务端点.例如,Eureka-Server 的访问地址. EndPoint 解析器,将配置的 Eureka-Server ...

  9. openxr runtime Monado 源码解析 源码分析:CreateSwapchain 画布 HardwareBuffer共享纹理 渲染线程 xrEndeFrame comp_renderer

    monado系列文章索引汇总: openxr runtime Monado 源码解析 源码分析:源码编译 准备工作说明 hello_xr解读 openxr runtime Monado 源码解析 源码 ...

最新文章

  1. springboot 日志 log4j
  2. html中如何修改href,在html中设置href链接中的变量
  3. openssl 模块 安装 centso Ubuntu
  4. loadrunner脚本练习
  5. Laravel 用户认证与登陆
  6. js中select下拉框重置_Web测试中需要注意的16个小细节
  7. java long类型转string_JavaSE的学习——数据类型
  8. 《我的第一本算法书》读书笔记
  9. onload 事件、DOMContentLoaded事件、DOM加载顺序
  10. 前端操作复制粘贴板(clicpboardData )
  11. linux 向日葵教程,远程控制工具——Centos7上向日葵安装使用(转)
  12. 神经网络学习笔记(1)——神经元与激活函数简介
  13. 对三极管特性曲线的理解
  14. Voldemort的RoutedStore
  15. 电脑计算机找不到指定的程序,光驱双击显示找不到应用程序。怎么办?
  16. 分立式数码管循环显示数字0到9.
  17. 数学建模-层次分析法
  18. 简化电脑操作,不让多余操作浪费你的生命
  19. Qt windows端的蓝牙串口服务
  20. nrf52 iic使用

热门文章

  1. struts2+ibatis+spring框架整合(二)
  2. Linux云服务器下配置Scrapy并抓取数据
  3. 有一种高级的情商,叫“不抬杠”
  4. 2017-2018-2 20155228 《网络对抗技术》 实验二:后门原理与实践
  5. CRM——企业内外部管理的重要手段
  6. java.lang.IllegalArgumentException: SimpleMessageConverter only supports String, byte[] and Serializ
  7. 人大计算机语言学,人大语言学考研经——心之所向,终会抵达
  8. 在命令行输入ipconfig/flushdns,显示不是内部文件
  9. 送人玫瑰 手留余香!(上星期日为张玉朵募捐.义卖)
  10. “被标记为恶意拓展程序并已被系统阻止安装”解决方法