Github地址:https://github.com/lhj502819/netty/tree/v502819-main,示例代码在example模块中

系列文章

  • 你知道都有哪些I/O模型吗?
  • Java NIO三大角色Channel、Buffer、Selector
  • Doug lea《Scalable IO in Java》翻译
  • Reactor模型你知道都有哪些吗?
  • Netty服务端创建源码流程解析
  • EventLoopGroup到底是个啥?
  • 深入剖析Netty之EventLoop刨根问底
  • 深入剖析Netty之NioEventLoop寻根究底
  • 未完待续…

在上篇文章中我们讲解NioEventLoop时,我们提到了Netty中的任务分为普通任务和定时任务,我们可以通过EventLoop创建一个定时任务,使用方法我们就不在这里讲解了,没使用过的小伙伴可以去度娘找一个Demo看一看,今天这篇文章我们主要讲解Netty的定时任务实现机制。

Java中的定时任务

我们知道在JDK1.5之后提供了定时任务的接口抽象ScheduledExecutorService以及实现ScheduledThreadPoolExecutor,Netty并没有直接使用JDK提供的定时任务实现,而是基于ScheduledExecutorService接口进行了自定义实现。首先我们先来看下ScheduledExecutorService都定义了哪些基础方法。

/*** 延迟执行,并不会周期性执行* @param callable 延迟执行的任务* @param delay 延迟时间* @param unit 延迟时间单位*/
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);/*** 延迟执行,并不会周期性执行* @param callable 延迟执行的任务* @param delay 延迟时间* @param unit 延迟时间单位*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);/*** 在延迟initialDelay后定时执行任务* @param command 要执行的任务* @param initialDelay 初始化延迟* @param period 两次任务开始执行的延迟* @param unit 时间单位* @return*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
/*** 在延迟initialDelay后定时执行任务* @param command 要执行的任务* @param initialDelay 初始化延迟* @param delay 下一次任务开始距前一次任务结束的时间* @param unit 时间单位* @return*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);

我们就不对ScheduledThreadPoolExecutor进行讲解了,不能跑题,还是来讲解Netty是如何做的。

核心类

  • ScheduledFutureTask:实现定时任务的核心类
  • AbstractScheduledEventExecutor:集成ScheduledFutureTaskScheduledExecutorService进行实现,进行行为控制

Netty实现定时任务的核心类就是这两个,我们先来分析下ScheduledFutureTask

ScheduledFutureTask

ScheduledFutureTask的主要职责就是判断是否应该要执行定时任务以及执行定时任务。

构造方法

/*** @param executor 执行任务的Executpr* @param runnable 任务* @param nanoTime 任务的执行时间*/
ScheduledFutureTask(AbstractScheduledEventExecutor executor,Runnable runnable, long nanoTime) {super(executor, runnable);deadlineNanos = nanoTime;//默认两个任务之间没有延迟periodNanos = 0;
}/*** @param executor 执行任务的Executor* @param runnable 任务* @param nanoTime 任务的执行时间* @param period 两个任务之间的执行延迟*/
ScheduledFutureTask(AbstractScheduledEventExecutor executor,Runnable runnable, long nanoTime, long period) {super(executor, runnable);deadlineNanos = nanoTime;periodNanos = validatePeriod(period);
}

成员变量

/*** 定时任务的初始化时间*/
private static final long START_TIME = System.nanoTime();/*** 获得当前距初始化时间的间隔*/
static long nanoTime() {return System.nanoTime() - START_TIME;
}/*** 获得任务的执行时间,相对START_TIME的*/
static long deadlineNanos(long delay) {long deadlineNanos = nanoTime() + delay;// Guard against overflowreturn deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}static long initialNanoTime() {return START_TIME;
}// set once when added to priority queue
/*** 任务ID*/
private long id;/*** 任务的执行时间*/
private long deadlineNanos;
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
/*** 两个任务之间的执行延迟*/
private final long periodNanos;

主要方法

取消任务

public boolean cancel(boolean mayInterruptIfRunning) {boolean canceled = super.cancel(mayInterruptIfRunning);if (canceled) {scheduledExecutor().removeScheduled(this);}return canceled;
}

具体的细节这里就不展开讲解了,主要逻辑就是进行事件通知并将任务从任务队列中移除。

核心逻辑

核心逻辑在#run方法中,主要逻辑就是判断自身是否达到了执行的时间,并执行,需具体代码如下:

public void run() {assert executor().inEventLoop();try {if (delayNanos() > 0L) {// 大于零表示没过期// Not yet expired, need to add or remove from queueif (isCancelled()) {//如果已经取消,则从定时任务队列中移除当前任务scheduledExecutor().scheduledTaskQueue().removeTyped(this);} else {//则将当前任务添加到定时任务队列中//后续会有EventLoop轮询队列中的定时任务是否该执行,我们在前边的文章中讲过scheduledExecutor().scheduleFromEventLoop(this);}return;}任务过期////如果执行两个任务的执行延迟为0if (periodNanos == 0) {if (setUncancellableInternal()) {V result = runTask();setSuccessInternal(result);}} else {//两个任务的执行延迟大于0 ,判断任务是否已经取消// check if is done as it may was cancelledif (!isCancelled()) {//如果任务没有被取消,则执行任务runTask();if (!executor().isShutdown()) {if (periodNanos > 0) {//重新设置下一次的执行时间,任务开始执行时间 + 两个任务的延迟时间deadlineNanos += periodNanos;} else {//如果两次执行间隔小于0,负负得正获得新的执行时间//设置执行时间为当前时间deadlineNanos = nanoTime() - periodNanos;}if (!isCancelled()) {//如果没有被取消,将当前任务添加到定时任务队列中scheduledExecutor().scheduledTaskQueue().add(this);}}}}} catch (Throwable cause) {setFailureInternal(cause);}}

AbstractScheduledEventExecutor

该类的主要责任是对定时任务的执行进行行为封装,比如定时任务的定义,定时任务的调度,定时任务队列的存储。大致都比较简单,接下来我们对其API进行简单讲解。

成员变量

/*** 定时任务队列*/
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

该变量是一个存储定时任务的队列,通过名称可以看出来是有序的,通过IDEA的快捷键查看该变量引用的位置,可以看到有取消和取两个操作。

既然有取的位置,那肯定得有加的位置,那任务是从哪里添加到队列里的呢?

主要方法

构造定时任务

AbstractScheduledEventExecutor对Java的ScheduledExecutorService进行了实现

/*** 延迟执行* @param command 延迟执行的任务* @param delay 延迟时间* @param unit 延迟时间单位*/@Overridepublic ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {ObjectUtil.checkNotNull(command, "command");ObjectUtil.checkNotNull(unit, "unit");if (delay < 0) {delay = 0;}validateScheduled0(delay, unit);return schedule(new ScheduledFutureTask<Void>(this,command,deadlineNanos(unit.toNanos(delay))));}/*** 延迟执行* @param callable 延迟执行的任务* @param delay 延迟时间* @param unit 延迟时间单位*/@Overridepublic <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {ObjectUtil.checkNotNull(callable, "callable");ObjectUtil.checkNotNull(unit, "unit");if (delay < 0) {delay = 0;}validateScheduled0(delay, unit);return schedule(new ScheduledFutureTask<V>(this, callable, deadlineNanos(unit.toNanos(delay))));}/*** 在延迟initialDelay后定时执行任务* @param command 要执行的任务* @param initialDelay 初始化延迟* @param period 两次任务开始执行的延迟* @param unit 时间单位*/@Overridepublic ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {ObjectUtil.checkNotNull(command, "command");ObjectUtil.checkNotNull(unit, "unit");if (initialDelay < 0) {throw new IllegalArgumentException(String.format("initialDelay: %d (expected: >= 0)", initialDelay));}if (period <= 0) {throw new IllegalArgumentException(String.format("period: %d (expected: > 0)", period));}validateScheduled0(initialDelay, unit);validateScheduled0(period, unit);return schedule(new ScheduledFutureTask<Void>(this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));}/*** 在延迟initialDelay后定时执行任务* @param command 要执行的任务* @param initialDelay 初始化延迟* @param delay 下一次任务开始距前一次任务结束的时间* @param unit 时间单位* @return*/@Overridepublic ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {ObjectUtil.checkNotNull(command, "command");ObjectUtil.checkNotNull(unit, "unit");if (initialDelay < 0) {throw new IllegalArgumentException(String.format("initialDelay: %d (expected: >= 0)", initialDelay));}if (delay <= 0) {throw new IllegalArgumentException(String.format("delay: %d (expected: > 0)", delay));}validateScheduled0(initialDelay, unit);validateScheduled0(delay, unit);return schedule(new ScheduledFutureTask<Void>(this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));}

其实四个方法只是一个门面,对细节进行了封装,最终都是创建了一个ScheduledFutureTask,只是构造的参数不同。

private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {if (inEventLoop()) {//如果在EventLoop线程中则直接将任务添加到定时任务队列中scheduleFromEventLoop(task);} else {final long deadlineNanos = task.deadlineNanos();// task will add itself to scheduled task queue when run if not expiredif (beforeScheduledTaskSubmitted(deadlineNanos)) {//添加一个异步任务execute(task);} else {//懒添加一个异步任务lazyExecute(task);// Second hook after scheduling to facilitate race-avoidanceif (afterScheduledTaskSubmitted(deadlineNanos)) {//任务已经提交完成后execute(WAKEUP_TASK);}}}return task;
}

其实核心逻辑就是构造一个定时任务添加到任务队列中等待执行。

总结

今天我们讲解了Netty的定时任务机制,先介绍了Java的定时任务相关内容,后又讲解了Netty的定时任务类ScheduledFutureTask和定时任务执行器类AbstractScheduledEventExecutor,都比较简单,我们就不过多阐述了,滋滋。。

深入剖析Netty之定时任务实现相关推荐

  1. 源码剖析 Netty 服务启动 NIO

    如果这个文章看不懂的话 , 建议反复阅读 Netty 与 Reactor 开篇立意.引用网友好的建议.看源码要针对性的看,最佳实践就是,带着明确的目的去看源码.抓主要问题,放弃小问题.主要的逻辑理解了 ...

  2. 原理剖析-Netty之服务端启动工作原理分析(上)

    一.大致介绍 1.Netty这个词,对于熟悉并发的童鞋一点都不陌生,它是一个异步事件驱动型的网络通信框架: 2.使用Netty不需要我们关注过多NIO的API操作,简简单单的使用即可,非常方便,开发门 ...

  3. Netty HashedWheelTimer 定时任务调用

    2019独角兽企业重金招聘Python工程师标准>>> 一个Hash Wheel Timer是一个环形结构,可以想象成时钟,分为很多格子,一个格子代表一段时间(越短Timer精度越高 ...

  4. 原理剖析-Netty之无锁队列

    一.大致介绍 1.了解过netty原理的童鞋,其实应该知道工作线程组的每个子线程都维护了一个任务队列: 2.细心的童鞋会发现netty的队列是重写了队列的实现方法,覆盖了父类中的LinkedBlock ...

  5. 深入剖析Netty源码设计(一)——深入理解select poll epoll机制

    本文首发于: http://www.6aiq.com/article/1548222475606 前言 打算输出一系列Netty源码分析与实践的文章,也作为后端开发学习过程中的沉淀,此文章为第一篇,从 ...

  6. 二、 剖析Netty的工作机制之Buffer、Channel、Selector分析

    一.缓冲区 1.1 基本介绍 缓冲区(buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松的使用内存块,缓冲区对象内置了一些机制 ...

  7. Dubbo剖析-Netty粘包与半包问题(一)

    一.前言 在客户端与服务端进行通信时候都会约定一个通讯协议,协议一般包含一个header和body,一个header和body组成了一次通讯的内容,一个通讯包.正常情况下客户端通过socket发送一个 ...

  8. Netty消息接收类故障案例分析

    <Netty 进阶之路>.<分布式服务框架原理与实践>作者李林锋深入剖析Netty消息接收类故障案例.李林锋此后还将在 InfoQ 上开设 Netty 专题持续出稿,感兴趣的同 ...

  9. Netty和RPC框架线程模型分析

    <Netty 进阶之路>.<分布式服务框架原理与实践>作者李林锋深入剖析Netty和RPC框架线程模型.李林锋已在 InfoQ 上开设 Netty 专题持续出稿,感兴趣的同学可 ...

最新文章

  1. 从opensuse 12.3 升级到 opensuse13.1体验
  2. 互联网协议 — BGP 边界网关协议 — Route(路由信息)
  3. 2020年快手校招JAVA岗笔试第三题
  4. Clinicast让癌症治疗不那么昂贵
  5. 牛客题霸 NC27 集合的所有子集
  6. linux中,项目生成的文件的权限为-rw-r-----
  7. 漫游Kafka入门篇之简单介绍
  8. ExtJs学习笔记(21)-使用XTemplate结合WCF显示数据
  9. vim 文本一些行注释,替换
  10. 德国可能在年底前决定特斯拉柏林电池工厂将获得多少国家补贴
  11. CentOS虚拟机 Device eth0 does not seem to be present
  12. JavaScript中的事件与异常捕获解析
  13. 嵌入式应用软件开发的步骤流程
  14. FontAwesome动态旋转图标类(fa-spinfa-pulse)
  15. 摄像头驱动CAMERA SENSOR调试流程
  16. 解决笔记本屏幕颜色偏变白问题
  17. 统计一个字符串中单词的个数(C语言)
  18. matlab2016与VS2019混合编程
  19. jQuery+Ajax+全解析
  20. cdn perl_用perl对CDN节点日志进行统计

热门文章

  1. 问卷星刷问卷python_Python+Selenium自动刷问卷星问卷
  2. 最新92kaifa开发的帝国cms7.5美nv主播视频网站源码 自适应手机端
  3. UDA/语义分割:Feature Re-Representation and Reliable Pseudo Label Retraining for Cross-Domain Semantic
  4. 我糟糕的2019年:虽流年不利,但我心仍坚定
  5. 云堡垒机相关概念汇总说明
  6. 仿起点中文网的小说网站——JavaEE大作业
  7. linux下的elf结构,ELF结构详细分析(1)---elf32_hdr
  8. 长连接心跳问题解决总结
  9. 四、SpringMVC文件上传
  10. Python学习 Day43 数据解析-BeautifulSoup 07