个人思考:

当时最开始的时候,想到的是通过直接sleep(60),然后push到redis队列,然后redis的blpop,但是这样会导致线程阻塞,

DelayQueue相当于把这么多的线程阻塞放到了一个容器中,通过lock的condition来实现排序以及精准的唤醒,减少了很多系统的开销。

-------------------------------------------------------------------------------------------------------------------------------------------

一、DelayQueue是什么

  DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。

二、DelayQueue能做什么

 1. 淘宝订单业务:下单之后如果三十分钟之内没有付款就自动取消订单。 
 2. 饿了吗订餐通知:下单成功后60s之后给用户发送短信通知。

 3.关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。

 4.缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。

 5.任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求等。

三、实例展示

 定义元素类,作为队列的元素

 DelayQueue只能添加(offer/put/add)实现了Delayed接口的对象,意思是说我们不能想往DelayQueue里添加什么就添加什么,不能添加int、也不能添加String进去,必须添加我们自己的实现了Delayed接口的类的对象,来代码:

/***  compareTo 方法必须提供与 getDelay 方法一致的排序*/
class MyDelayedTask implements Delayed{private String name ;private long start = System.currentTimeMillis();private long time ;public MyDelayedTask(String name,long time) {this.name = name;this.time = time;}/*** 需要实现的接口,获得延迟时间   用过期时间-当前时间* @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);}/*** 用于延迟队列内部比较排序   当前时间的延迟时间 - 比较对象的延迟时间* @param o* @return*/@Overridepublic int compareTo(Delayed o) {MyDelayedTask o1 = (MyDelayedTask) o;return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));}@Overridepublic String toString() {return "MyDelayedTask{" +"name='" + name + '\'' +", time=" + time +'}';}
}

  其中,compareTo 方法 getDelay 方法 就是Delayed接口的方法,我们必须实现,而且按照JAVASE文档,compareTo 方法必须提供与 getDelay 方法一致的排序,也就是说compareTo方法里可以按照getDelay方法的返回值大小排序,即在compareTo方法里比较getDelay方法返回值大小

写main方法测试

  定义一个DelayQueue,添加几个元素,while循环获取元素

private static DelayQueue delayQueue  = new DelayQueue();public static void main(String[] args) throws InterruptedException {new Thread(new Runnable() {@Overridepublic void run() {delayQueue.offer(new MyDelayedTask("task1",10000));delayQueue.offer(new MyDelayedTask("task2",3900));delayQueue.offer(new MyDelayedTask("task3",1900));delayQueue.offer(new MyDelayedTask("task4",5900));delayQueue.offer(new MyDelayedTask("task5",6900));delayQueue.offer(new MyDelayedTask("task6",7900));delayQueue.offer(new MyDelayedTask("task7",4900));}}).start();while (true) {Delayed take = delayQueue.take();System.out.println(take);}}

执行结果

1

2

3

4

5

6

7

MyDelayedTask{name='task3', time=1900}

MyDelayedTask{name='task2', time=3900}

MyDelayedTask{name='task7', time=4900}

MyDelayedTask{name='task4', time=5900}

MyDelayedTask{name='task5', time=6900}

MyDelayedTask{name='task6', time=7900}

MyDelayedTask{name='task1', time=10000}

 DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法。

static class Task implements Delayed{@Override//比较延时,队列里元素的排序依据public int compareTo(Delayed o) {return 0;}@Override//获取剩余时间public long getDelay(TimeUnit unit) {return 0;}}

  元素进入队列后,先进行排序,然后,只有getDelay也就是剩余时间为0的时候,该元素才有资格被消费者从队列中取出来,所以构造函数一般都有一个时间传入。

具体另一个实例:

import java.sql.Time;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;public class Delayquue {public static void main(String[] args) throws Exception {BlockingQueue<Task> delayqueue = new DelayQueue<>();long now = System.currentTimeMillis();delayqueue.put(new Task(now+3000));delayqueue.put(new Task(now+4000));delayqueue.put(new Task(now+6000));delayqueue.put(new Task(now+1000));System.out.println(delayqueue);for(int i=0; i<4; i++) {System.out.println(delayqueue.take());}}static class Task implements Delayed{long time = System.currentTimeMillis();public Task(long time) {this.time = time;}@Overridepublic int compareTo(Delayed o) {if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))return -1;else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) return 1;else return 0;}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);}@Overridepublic String toString() {return "" + time;}}
}

输出结果:

  可以看出来,每隔一段时间就会输出一个元素,这个间隔时间就是由构造函数定义的秒数来决定的。


原理分析:

 内部结构

  • 可重入锁
  • 用于根据delay时间排序的优先级队列
  • 用于优化阻塞通知的线程元素leader
  • 用于实现阻塞和通知的Condition对象

delayed和PriorityQueue

 在理解delayQueue原理之前我们需要先了解两个东西,delayed和PriorityQueue.

  • delayed是一个具有过期时间的元素
  • PriorityQueue是一个根据队列里元素某些属性排列先后的顺序队列

  delayQueue其实就是在每次往优先级队列中添加元素,然后以元素的delay/过期值作为排序的因素,以此来达到先过期的元素会拍在队首,每次从队列里取出来都是最先要过期的元素

offer方法

  1. 执行加锁操作
  2. 吧元素添加到优先级队列中
  3. 查看元素是否为队首
  4. 如果是队首的话,设置leader为空,唤醒所有等待的队列
  5. 释放锁

代码如下:

public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}}

take方法

  1. 执行加锁操作
  2. 取出优先级队列元素q的队首
  3. 如果元素q的队首/队列为空,阻塞请求
  4. 如果元素q的队首(first)不为空,获得这个元素的delay时间值
  5. 如果first的延迟delay时间值为0的话,说明该元素已经到了可以使用的时间,调用poll方法弹出该元素,跳出方法
  6. 如果first的延迟delay时间值不为0的话,释放元素first的引用,避免内存泄露
  7. 判断leader元素是否为空,不为空的话阻塞当前线程
  8. 如果leader元素为空的话,把当前线程赋值给leader元素,然后阻塞delay的时间,即等待队首到达可以出队的时间,在finally块中释放leader元素的引用
  9. 循环执行从1~8的步骤
  10. 如果leader为空并且优先级队列不为空的情况下(判断还有没有其他后续节点),调用signal通知其他的线程
  11. 执行解锁操作
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return q.poll();first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}}  

get点

 整个代码的过程中并没有使用上太难理解的地方,但是有几个比较难以理解他为什么这么做的地方

leader元素的使用

 大家可能看到在我们的DelayQueue中有一个Thread类型的元素leader,那么他是做什么的呢,有什么用呢?

 让我们先看一下元素注解上的doc描述:

Thread designated to wait for the element at the head of the queue.
This variant of the Leader-Follower pattern serves to minimize unnecessary timed waiting.
when a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.
The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim.
Whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field is invalidated by being reset to null, and some waiting thread, but not necessarily the current leader, is signalled.
So waiting threads must be prepared to acquire and lose leadership while waiting.

 上面主要的意思就是说用leader来减少不必要的等待时间,那么这里我们的DelayQueue是怎么利用leader来做到这一点的呢:

 这里我们想象着我们有个多个消费者线程用take方法去取,内部先加锁,然后每个线程都去peek第一个节点.
 如果leader不为空说明已经有线程在取了,设置当前线程等待

    if (leader != null)available.await();
//如果为空说明没有其他线程去取这个节点,设置leader并等待delay延时到期,直到poll后结束循环else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}

take方法中为什么释放first元素

first = null; // don't retain ref while waiting

 我们可以看到doug lea后面写的注释,那么这段代码有什么用呢?

 想想假设现在延迟队列里面有三个对象。

  • 线程A进来获取first,然后进入 else 的else ,设置了leader为当前线程A
  • 线程B进来获取first,进入else的阻塞操作,然后无限期等待
  • 这时在JDK 1.7下面他是持有first引用的
  • 如果线程A阻塞完毕,获取对象成功,出队,这个对象理应被GC回收,但是他还被线程B持有着,GC链可达,所以不能回收这个first.
  • 假设还有线程C 、D、E.. 持有对象1引用,那么无限期的不能回收该对象1引用了,那么就会造成内存泄露.

链接:

  https://www.jianshu.com/p/e0bcc9eae0ae

  https://www.jianshu.com/p/bf9f6b08ba5b

  https://blog.csdn.net/toocruel/article/details/82769595

参考:https://www.cnblogs.com/myseries/p/10944211.html

DelayQueue实例和源码解析相关推荐

  1. Google Test(GTest)使用方法和源码解析——结果统计机制分析

    在分析源码之前,我们先看一个例子.以<Google Test(GTest)使用方法和源码解析--概况 >一文中最后一个实例代码为基准,修改最后一个"局部测试"结果为错误 ...

  2. Google Test(GTest)使用方法和源码解析——自动调度机制分析

    在<Google Test(GTest)使用方法和源码解析--概况 >一文中,我们简单介绍了下GTest的使用和特性.从这篇博文开始,我们将深入代码,研究这些特性的实现.(转载请指明出于b ...

  3. 解析并符号 读取dll_Spring IOC容器之XmlBeanFactory启动流程分析和源码解析

    一. 前言 Spring容器主要分为两类BeanFactory和ApplicationContext,后者是基于前者的功能扩展,也就是一个基础容器和一个高级容器的区别.本篇就以BeanFactory基 ...

  4. 状态模式的介绍及状态机模型的函数库javascript-state-machine的用法和源码解析

    文章大体就两部分: 状态模式 状态机模型的函数库javascript-state-machine的用法和源码解析 场景及问题背景: 我们平时开发时本质上就是对应用程序的各种状态进行切换并作出相应处理. ...

  5. Google Test(GTest)使用方法和源码解析——模板类测试技术分析和应用

    写C++难免会遇到模板问题,如果要针对一个模板类进行测试,似乎之前博文中介绍的方式只能傻乎乎的一个一个特化类型后再进行测试.其实GTest提供了两种测试模板类的方法,本文我们将介绍方法的使用,并分析其 ...

  6. Google Test(GTest)使用方法和源码解析——参数自动填充技术分析和应用

    在我们设计测试用例时,我们需要考虑很多场景.每个场景都可能要细致地考虑到到各个参数的选择.比如我们希望使用函数IsPrime检测10000以内字的数字,难道我们要写一万行代码么?(转载请指明出于bre ...

  7. Google Test(GTest)使用方法和源码解析——私有属性代码测试技术分析

    有些时候,我们不仅要测试类暴露出来的公有方法,还要测试其受保护的或者私有方法.GTest测试框架提供了一种方法,让我们可以测试类的私有方法.但是这是一种侵入式的,会破坏原来代码的结构,所以我觉得还是谨 ...

  8. Google Test(GTest)使用方法和源码解析——预处理技术分析和应用

    预处理 在<Google Test(GTest)使用方法和源码解析--概况>最后一部分,我们介绍了GTest的预处理特性.现在我们就详细介绍该特性的使用和相关源码.(转载请指明出于brea ...

  9. Google Test(GTest)使用方法和源码解析——断言的使用方法和解析

    在之前博文的基础上,我们将介绍部分断言的使用,同时穿插一些源码.(转载请指明出于breaksoftware的csdn博客) 断言(Assertions) 断言是GTest局部测试中最简单的使用方法,我 ...

最新文章

  1. Oracle---对字段进行修改(varchar2修改clob)+ 索引失效
  2. scala的foreach和for
  3. SpringBoot系列: 使用 flyway 管理数据库版本
  4. 浅谈推荐系统中的图神经网络
  5. linux系统中配置NFS实现文件共享
  6. oracle常用操作命令总结
  7. c 生成html的div,createElement动态创建HTML对象脚本代码
  8. JS 简易控制台插件 [供 博客, 论坛 运行js用]
  9. 深度 | 人工智能究竟能否实现?
  10. windows如何设置新建文本文档快捷键
  11. 大众点评产品分析报告
  12. 时域,频域与傅立叶变换 - 慕水 - CSDNBlog
  13. C# 使用Panel控件实现窗体嵌套
  14. Linux下的hostid
  15. 比特大陆招股书曝光;阿里否认阻挠滴滴收购OFO;Uber巨资了结数据泄露案| 雷锋早报...
  16. 使用Flink Metric Reporter 对flink任务指标进行监控
  17. 去健身房健身戴什么耳机好、最适合运动健身的健身房耳机推荐
  18. 1226. The Dining Philosophers (Leetcode 1226)
  19. 初入职场,菜鸟北漂记
  20. Jsp实验0:调试运行eBuy电子商城系统

热门文章

  1. 拓嘉辰丰电商:什么是拼多多上货助手?有什么作用?
  2. zb加密插件报错Call to undefined function app\admin\model\parse_attr()
  3. 13-ESP8266连接MQTT服务器发送数据
  4. 使用FontLab生成字体文件
  5. Python学习,用python制作一个专属有声小说,每天学习一个小技能
  6. 计算机毕业设计Python+uniapp+安卓基于Android健康饮食搭配的设计与实现(WEB+APP+LW)
  7. AHB、APB、AXI三种协议对比分析(AMBA总线)
  8. 动画学习之Animate.css的使用与解析
  9. WPS表格 JSA 学习笔记 - 批量设置图片
  10. 电力系统短期负荷预测(Python代码实现)