Queue常用类解析之PriorityQueue
Queue常用类解析之ConcurrentLinkedQueue
Queue常用类解析之BlockingQueue(一):PriorityBlockingQueue、DelayQueue和DelayedWorkQueue

接着上文对BlockingQueue的介绍继续向下

五、ArrayBlockingQueue

从命名可以看出,这是一个循环数组表示的的阻塞队列。
与前面介绍的BlockingQueue不同,ArrayBlockingQueue在入队和出队时都有可能会陷入阻塞。

1. 属性

/** The queued items */
final Object[] items;/** items index for next take, poll, peek or remove */
int takeIndex;/** items index for next put, offer, or add */
int putIndex;/** Number of elements in the queue */
int count;/** Concurrency control uses the classic two-condition algorithm* found in any textbook.*//** Main lock guarding all access */
final ReentrantLock lock;/** Condition for waiting takes */
private final Condition notEmpty;/** Condition for waiting puts */
private final Condition notFull;/*** Shared state for currently active iterators, or null if there* are known not to be any.  Allows queue operations to update* iterator state.*/
transient Itrs itrs = null;

putIndex和takeIndex分别表示入队和出队的数组索引。
notEmpty和notFull分别表示空队列和满队列时的阻塞condition。
Itrs 时迭代器的链表形式的集合。

2. 构造器

public ArrayBlockingQueue(int capacity) {this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =  lock.newCondition();
}

ArrayBlockingQueue由两个构造器方法,不支持无参构造器。至少需要传入队列的容量,并初始化对于长度的数组。后续数组的长度无法修改。另外,ArrayBlockingQueue还支持在构造器方法中传入是否是公平锁的参数,默认是非公平锁。

3. ArrayBlockingQueue#put(Object)

public void put(E e) throws InterruptedException {//元素不能为nullcheckNotNull(e);//加锁final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {//满队列,线程阻塞while (count == items.length)notFull.await();//入队操作enqueue(e);} finally {lock.unlock();}
}
private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;//putIndex入队的数组索引items[putIndex] = x;//循环数组,到达数组末尾后的下一个索引为0if (++putIndex == items.length)putIndex = 0;count++;//发送notEmpty信号唤醒notEmpty.signal();
}

4. ArrayBlockingQueue#poll(long, TimeUnit)

public E poll(long timeout, TimeUnit unit) throws InterruptedException {//计算阻塞时间long nanos = unit.toNanos(timeout);//加锁final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {//空队列,线程阻塞while (count == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}//执行出队操作return dequeue();} finally {lock.unlock();}
}
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")//takeIndex出队的数组索引E x = (E) items[takeIndex];items[takeIndex] = null;//循环数组,达到数组末尾的下一个元素是0if (++takeIndex == items.length)takeIndex = 0;count--;//itrs操作if (itrs != null)itrs.elementDequeued();//发送notFull信号唤醒线程notFull.signal();return x;
}

5. ArrayBlockingQueue#remove(Object)

public boolean remove(Object o) {if (o == null) return false;final Object[] items = this.items;//加锁final ReentrantLock lock = this.lock;lock.lock();try {if (count > 0) {final int putIndex = this.putIndex;int i = takeIndex;//从takeIndex开始遍历,直到putIndex - 1do {//相等,执行删除逻辑if (o.equals(items[i])) {removeAt(i);return true;}if (++i == items.length)i = 0;} while (i != putIndex);}return false;} finally {lock.unlock();}
}
void removeAt(final int removeIndex) {// assert lock.getHoldCount() == 1;// assert items[removeIndex] != null;// assert removeIndex >= 0 && removeIndex < items.length;final Object[] items = this.items;//相当于pollif (removeIndex == takeIndex) {// removing front item; just advanceitems[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();} else {// an "interior" remove// slide over all others up through putIndex.final int putIndex = this.putIndex;//从removeIndex到putIndex - 1的元素依次向前挪动一位,并putIndex = putIndex - 1for (int i = removeIndex;;) {int next = i + 1;if (next == items.length)next = 0;if (next != putIndex) {items[i] = items[next];i = next;} else {items[i] = null;this.putIndex = i;break;}}count--;//itrs处理if (itrs != null)itrs.removedAt(removeIndex);}//发送信号notFull.signal();
}

6. Itrs

Itr是ArrayBlockingQueue的迭代器类,而Itrs则是由Itr组成的链表集合类。
对于Itrs和Itr,因为循环数组和移除元素时会使迭代器丢失他们的位置,为了保证迭代器和队列数据的一致性,需要注意
(1)记录所有takeIndex循环到0的次数
(2)每次删除元素都需要通过removeAt方法通知到所有的迭代器。
Itrs中使用的是Itr的弱引用类,因此需要针对Itrs中的过期迭代器进行清理。清理过期节点主要有3个时机:
(1)建立了新的迭代器,调用doSomeSweep
(2)takeIndex循环到0,调用takeIndexWrapped
(3)队列变为空队列时,调用queueIsEmpty

6.1 属性

/** Incremented whenever takeIndex wraps around to 0 */
//takeIndex 循环到0的次数
int cycles = 0;/** Linked list of weak iterator references */
//头结点
private Node head;/** Used to expunge stale iterators */
//清理过期迭代器的开始节点
private Node sweeper = null;//清理过期迭代器时的节点检查数量,harder模式时16,否则4
private static final int SHORT_SWEEP_PROBES = 4;
private static final int LONG_SWEEP_PROBES = 16;

6.2 Itrs#register(Itr)

void register(Itr itr) {// assert lock.getHoldCount() == 1;head = new Node(itr, head);
}

迭代器加入Itrs链表,头插法。

6.3 Itrs#doSomeSweep(boolean)

void doSomeSweeping(boolean tryHarder) {// assert lock.getHoldCount() == 1;// assert head != null;int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;Node o, p;final Node sweeper = this.sweeper;//passGo表示已经从头结点开始检查boolean passedGo;   // to limit search to one full sweepif (sweeper == null) {o = null;p = head;passedGo = true;} else {o = sweeper;p = o.next;passedGo = false;}for (; probes > 0; probes--) {if (p == null) {//已经检查到尾节点,并且检查是从头节点开始的,跳出循环if (passedGo)break;//已经检查到尾节点,但没有检查过头结点,从头节点开始再次检查,并记录passedGo=trueo = null;p = head;passedGo = true;}final Itr it = p.get();final Node next = p.next;//过期的无效节点if (it == null || it.isDetached()) {// found a discarded/exhausted iterator//一旦找到一个过期的节点,就会采用harder模式检查更多的节点。probes = LONG_SWEEP_PROBES; // "try harder"// unlink pp.clear();p.next = null;if (o == null) {head = next;//链表中已经没有迭代器if (next == null) {// We've run out of iterators to track; retireitrs = null;return;}}elseo.next = next;} else {o = p;}p = next;}//记录sweeper节点,下一次清理直接从sweeper 开始this.sweeper = (p == null) ? null : o;
}

该方法用于检查并清理过期节点,参数为是否采用harder模式,harder模式检查16个节点,非harder模式检查4个节点。
一旦找到一个过期的节点,就会采用harder模式检查更多的节点。
检查并清理的动作在循环中进行,检查结束的条件由以下3个,满足其一就可以离开循环。
(1)检查的节点数量达到要求
(2)链表已经完整了检查了一遍。
(3)链表中已经没有迭代器

6.4 Itrs#takeIndexWrapped()

void takeIndexWrapped() {// assert lock.getHoldCount() == 1;cycles++;for (Node o = null, p = head; p != null;) {final Itr it = p.get();final Node next = p.next;if (it == null || it.takeIndexWrapped()) {// unlink p// assert it == null || it.isDetached();p.clear();p.next = null;if (o == null)head = next;elseo.next = next;} else {o = p;}p = next;}if (head == null)   // no more iterators to trackitrs = null;
}

takeIndex每次循环到0时会调用该方法。
cycle计数增加,遍历链表检查并清理过期的无效节点。

6.5 Itrs#queueIsEmpty()

void queueIsEmpty() {// assert lock.getHoldCount() == 1;for (Node p = head; p != null; p = p.next) {Itr it = p.get();if (it != null) {p.clear();it.shutdown();}}head = null;itrs = null;
}

队列每次变为空队列时调用,清空所有有效的迭代器。

6.6 Itrs#elementDequeued()

void elementDequeued() {// assert lock.getHoldCount() == 1;if (count == 0)queueIsEmpty();else if (takeIndex == 0)takeIndexWrapped();
}

在ArrayBlockingQueue中移除元素(包括出队和remove)时调用,保证迭代器和队列数据的一致性。

7. Itr

Itr是ArrayBlockingQueue的迭代器,所有迭代器都会记录到对应ArrayBlockingQueue的itrs属性中。
Itr是一个先读后写的迭代器,会先预读到next的值进行保存,即使后续next对应的元素被移除也能通过迭代器的next方法访问到。这是为了防止hasNext方法执行和next方法执行的之间的某个时刻,该节点被删除,导致hasNext返回true而next返回值却为null的情形,因此ArrayBlockingQueue的迭代器Itr是弱一致性的。
当迭代器的所有下标都为负数或者hasNext第一次返回false时进入detach状态,表示迭代器已经无用,可以从itrs中移除。

7.1 属性

/** Index to look for new nextItem; NONE at end */
//游标,下一个next节点对应的下标,到达putIndex结束的位置为NONE
private int cursor;/** Element to be returned by next call to next(); null if none */
//next节点的元素值
private E nextItem;/** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
//next节点的下标
private int nextIndex;/** Last element returned; null if none or not detached. */
//上一个节点的元素值
private E lastItem;/** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
//上一个节点的下标
private int lastRet;/** Previous value of takeIndex, or DETACHED when detached */
//记录takeIndex
private int prevTakeIndex;/** Previous value of iters.cycles */
//记录cycles
private int prevCycles;/** Special index value indicating "not available" or "undefined" */
private static final int NONE = -1;/*** Special index value indicating "removed elsewhere", that is,* removed by some operation other than a call to this.remove().*/
private static final int REMOVED = -2;/** Special value for prevTakeIndex indicating "detached mode" */
private static final int DETACHED = -3;

可以看到,每次都会预读到next的值存储到nextItem和nextIndex,并且保存上一次的返回值lastItem和lastRet。
由于ArrayBlockingQueue时一个循环数组,takeIndex和putIndex一直在循环移动。这样子就有可能出现,迭代器创建时位置已经确定,但是随着ArrayBlockingQueue数组不断循环,位置一直在变化,导致迭代器的结果有误。因此需要记录旧的takeIndex和cycles,在遍历时比较这两个值与当前值,以修正下标索引,保证遍历的正确性。

7.2 Itr#Itr()

Itr() {// assert lock.getHoldCount() == 0;lastRet = NONE;final ReentrantLock lock = ArrayBlockingQueue.this.lock;lock.lock();try {//没有元素,无用的迭代器,直接进入detach模式if (count == 0) {// assert itrs == null;cursor = NONE;nextIndex = NONE;prevTakeIndex = DETACHED;} else {//保存各项属性final int takeIndex = ArrayBlockingQueue.this.takeIndex;prevTakeIndex = takeIndex;nextItem = itemAt(nextIndex = takeIndex);cursor = incCursor(takeIndex);//迭代器加入itrsif (itrs == null) {itrs = new Itrs(this);} else {itrs.register(this); // in this orderitrs.doSomeSweeping(false);}prevCycles = itrs.cycles;// assert takeIndex >= 0;// assert prevTakeIndex == takeIndex;// assert nextIndex >= 0;// assert nextItem != null;}} finally {lock.unlock();}
}

7.3 Itr#incorporateDequeues()

private void incorporateDequeues() {// assert lock.getHoldCount() == 1;// assert itrs != null;// assert !isDetached();// assert count > 0;final int cycles = itrs.cycles;final int takeIndex = ArrayBlockingQueue.this.takeIndex;final int prevCycles = this.prevCycles;final int prevTakeIndex = this.prevTakeIndex;//cycles和takeIndex存在不一致,需要修正if (cycles != prevCycles || takeIndex != prevTakeIndex) {final int len = items.length;// how far takeIndex has advanced since the previous// operation of this iterator//计算出队了元素数量long dequeues = (cycles - prevCycles) * len+ (takeIndex - prevTakeIndex);// Check indices for invalidation//校验下标合法性,lastRet 和 nextIndex 无效记录被移除,cursor无效那么下一个next读取从takeIndex重新开始if (invalidated(lastRet, prevTakeIndex, dequeues, len))lastRet = REMOVED;if (invalidated(nextIndex, prevTakeIndex, dequeues, len))nextIndex = REMOVED;if (invalidated(cursor, prevTakeIndex, dequeues, len))cursor = takeIndex;//进入detach模式if (cursor < 0 && nextIndex < 0 && lastRet < 0)detach();else {//记录新的cycles和takeIndexthis.prevCycles = cycles;this.prevTakeIndex = takeIndex;}}
}
//判断下标合法性,无效返回true
//判断逻辑,下标距离的prevTakeIndex元素数量 和 出队元素数量 比较
private boolean invalidated(int index, int prevTakeIndex,long dequeues, int length) {if (index < 0)return false;int distance = index - prevTakeIndex;if (distance < 0)distance += length;return dequeues > distance;
}

用于检查并修正Itr的下标属性,在noNext、next和remove方法中会被调用。

7.4 Itr#detach()

private void detach() {// Switch to detached mode// assert lock.getHoldCount() == 1;// assert cursor == NONE;// assert nextIndex < 0;// assert lastRet < 0 || nextItem == null;// assert lastRet < 0 ^ lastItem != null;if (prevTakeIndex >= 0) {// assert itrs != null;prevTakeIndex = DETACHED;// try to unlink from itrs (but not too hard)itrs.doSomeSweeping(true);}
}

修改detach的标志字段,并且启动itrs的清理逻辑。

7.5 Itr#hasNext()

public boolean hasNext() {// assert lock.getHoldCount() == 0;if (nextItem != null)return true;noNext();return false;
}
private void noNext() {final ReentrantLock lock = ArrayBlockingQueue.this.lock;lock.lock();try {// assert cursor == NONE;// assert nextIndex == NONE;if (!isDetached()) {// assert lastRet >= 0;incorporateDequeues(); // might update lastRetif (lastRet >= 0) {lastItem = itemAt(lastRet);// assert lastItem != null;detach();}}// assert isDetached();// assert lastRet < 0 ^ lastItem != null;} finally {lock.unlock();}
}

判断下一个节点是否存在,由于下一个元素已经预读取并保存在nextItem,判断nextItem是否为null即可。
对于没有下一个节点的迭代器,需要修正下标属性并进入detach模式。

7.6 Itr#next()

public E next() {// assert lock.getHoldCount() == 0;final E x = nextItem;if (x == null)throw new NoSuchElementException();final ReentrantLock lock = ArrayBlockingQueue.this.lock;lock.lock();try {if (!isDetached())incorporateDequeues();// assert nextIndex != NONE;// assert lastItem == null;lastRet = nextIndex;final int cursor = this.cursor;if (cursor >= 0) {nextItem = itemAt(nextIndex = cursor);// assert nextItem != null;this.cursor = incCursor(cursor);} else {nextIndex = NONE;nextItem = null;}} finally {lock.unlock();}return x;
}

由于下一个元素已经与读取,因此next方法的主要逻辑不是读取下一个元素,而是预读取并保存下一个元素的下一个元素。

7.7 Itr#remove()

public void remove() {// assert lock.getHoldCount() == 0;final ReentrantLock lock = ArrayBlockingQueue.this.lock;lock.lock();try {//非detach模式修正下标属性if (!isDetached())incorporateDequeues(); // might update lastRet or detachfinal int lastRet = this.lastRet;//lastRest清空this.lastRet = NONE;if (lastRet >= 0) {//非detach模式直接移除if (!isDetached())removeAt(lastRet);else {//detach需要判断lastItem是否发生了改变,才能移除final E lastItem = this.lastItem;// assert lastItem != null;this.lastItem = null;if (itemAt(lastRet) == lastItem)removeAt(lastRet);}} else if (lastRet == NONE)throw new IllegalStateException();// else lastRet == REMOVED and the last returned element was// previously asynchronously removed via an operation other// than this.remove(), so nothing to do.if (cursor < 0 && nextIndex < 0)//进入detach模式detach();} finally {lock.unlock();// assert lastRet == NONE;// assert lastItem == null;}
}

7.8 Itr#removeAt(int)

boolean removedAt(int removedIndex) {// assert lock.getHoldCount() == 1;if (isDetached())return true;final int cycles = itrs.cycles;final int takeIndex = ArrayBlockingQueue.this.takeIndex;final int prevCycles = this.prevCycles;final int prevTakeIndex = this.prevTakeIndex;final int len = items.length;int cycleDiff = cycles - prevCycles;if (removedIndex < takeIndex)cycleDiff++;final int removedDistance =(cycleDiff * len) + (removedIndex - prevTakeIndex);// assert removedDistance >= 0;int cursor = this.cursor;if (cursor >= 0) {int x = distance(cursor, prevTakeIndex, len);if (x == removedDistance) {if (cursor == putIndex)this.cursor = cursor = NONE;}else if (x > removedDistance) {// assert cursor != prevTakeIndex;this.cursor = cursor = dec(cursor);}}int lastRet = this.lastRet;if (lastRet >= 0) {int x = distance(lastRet, prevTakeIndex, len);if (x == removedDistance)this.lastRet = lastRet = REMOVED;else if (x > removedDistance)this.lastRet = lastRet = dec(lastRet);}int nextIndex = this.nextIndex;if (nextIndex >= 0) {int x = distance(nextIndex, prevTakeIndex, len);if (x == removedDistance)this.nextIndex = nextIndex = REMOVED;else if (x > removedDistance)this.nextIndex = nextIndex = dec(nextIndex);}else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {this.prevTakeIndex = DETACHED;return true;}return false;
}

所有队列移除非takeIndex下标处元素的方法都会调用迭代器的removeAt方法以通知其修正下标索引值。

7.9 Itr#takeIndexWrapped()

boolean takeIndexWrapped() {// assert lock.getHoldCount() == 1;if (isDetached())return true;//已经循环至少两圈,迭代器的所有元素都已经无效if (itrs.cycles - prevCycles > 1) {// All the elements that existed at the time of the last// operation are gone, so abandon further iteration.shutdown();return true;}return false;
}
void shutdown() {// assert lock.getHoldCount() == 1;cursor = NONE;if (nextIndex >= 0)nextIndex = REMOVED;if (lastRet >= 0) {lastRet = REMOVED;lastItem = null;}prevTakeIndex = DETACHED;// Don't set nextItem to null because we must continue to be// able to return it on next().//// Caller will unlink from itrs when convenient.
}

当takeIndex循环到0时调用,对迭代器做一次检查,如果必要则进入detach状态,返回迭代器是否是detach状态。

可以看到,进入detach模式虽然有两种情况,标志字段为prevTakeIndex。
(1)所有索引都为负数
(2)hasNext第一次返回false,关键是cursor < 0。
所以进入detach模式的关键有3种情况:
(1)cursor == putIndex,这时候cursor =NONE
(2)空队列
(3)cycle - preCycles > 1

Queue常用类解析之BlockingQueue(二):ArrayBlockingQueue相关推荐

  1. 常用类详解(二)StringBuffer

    StringBuffer类: 基本介绍: java.lang.StringBuffer代表可变的字符序列,可以对字符串内容进行增删 很多方法与String相同,但StringBuffer是可变长度的. ...

  2. java常用类解析十:Date类和Calendar类示例

    1.Date类实例:格式化输出当前日期 [java] view plaincopy <span style="font-size:16px;">package demo ...

  3. java常用类解析五:IO系统File类及文件搜索工具类

    1.先看一个File类的简单的例子 [java] view plaincopy <span style="font-size:16px;">package test; ...

  4. Java常用类全面解析(含部分源码)

    常用类 文章目录 常用类 字符串相关的类 String 类 说明 案例 String 的实例方式 String 中的常用方法 案例一 案例二 案例三 String 类与其它结构之间的转换 小复习-与基 ...

  5. 常用类 (二) ----- Math类

    相关文章: <常用类 (一) ----- Arrays数组工具类> <常用类 (二) ----- Math类> <常用类 (三) ----- BigDecimal和Big ...

  6. Linux常用基本命令详解(二)-------磁盘分区和磁盘管理类命令

    Linux常用基本命令详解(一) Linux常用基本命令详解(二)-------磁盘分区和磁盘管理类命令 Linux常用基本命令详解(三) 1.磁盘分区 磁盘分区(系统分区)是使用分区编辑器(part ...

  7. Apache入门 篇(二)之apache 2.2.x常用配置解析

    一.httpd 2.2.x目录结构 Cnetos 6.10 YUM安装httpd 2.2.x # yum install -y httpd 程序环境主配置文件:/etc/httpd/conf/http ...

  8. strongswan常用命令解析(二)

    strongswan常用命令解析 0 > ipsec reload //重新加载 ipsec.conf文件 1 > ipsec rereadsecrets //重新加载ipsec.secr ...

  9. java笔记:常用类-反射

    目录 1 常用类 1.1 内部类 1.1.1 概念 1.1.2 成员内部类 1.1.3 静态内部类 1.1.4 局部内部类 1.1.4 匿名内部类 1.2 Object类 1.2.1 getClass ...

最新文章

  1. 网络空间安全:社会工程学之信息追踪——学习笔记 利用搜索引擎追踪!
  2. BZOJ 3626: [LNOI2014]LCA
  3. 算法---------两个数的交集
  4. Kubernetes — 系统架构
  5. PHP中文简繁互转代码 完美支持大陆、香港、台湾及新加坡
  6. mininet 应用实践
  7. day44-前端知识之HTML内容
  8. PAT (Basic Level) Practice 1006 换个格式输出整数
  9. .NET组件和COM组件之间的相互操作方法
  10. Objective-C 继承新的认识以及作用
  11. 使用JAVA实现邮件发送功能
  12. TeeChart插入
  13. 五分钟就能上手的Android APP开发入门教程!!!
  14. IP6826无线充电底座方案IC芯片,兼容WPC Qi v1.2.4
  15. PDP context激活的大致原理
  16. c语言判断闰年次数,C语言判断闰年,即判断年份是否为闰年
  17. 网络:access和trunk端口和hybird端口的区别
  18. ERP项目应该由谁来主导?
  19. gcc怎么设置默认的include,lib路径
  20. 聊天功能获取聊天记录列表并展示最新一条聊天记录

热门文章

  1. C52单片机中断定时系统思路和实现代码
  2. 【微信小程序】(三)订阅消息实现
  3. 计算机考研211大学排名,计算机考研:4所好考的211院校!
  4. GGA的经纬度的度分转换函数
  5. Python实现快速幂取余算法
  6. Intel与ARM处理器对比分析
  7. 杜芬方程的倍周期分岔matlab,杜芬方程的倍周期分岔-read.ppt
  8. 什么是人工智能,揭穿许多关于人工智能和机器学习的各种误区
  9. 使用计算机教学的效果,如何提高计算机课堂教学效果
  10. AEIA2018 | 四维图新白新平:智慧赋能,共建生态