作者:chenssy

来源:Java技术驿站

我们知道线程Thread可以调用setPriority(int newPriority)来设置优先级的,线程优先级高的线程先执行,优先级低的后执行。而前面介绍的ArrayBlockingQueue、LinkedBlockingQueue都是采用FIFO原则来确定线程执行的先后顺序,那么有没有一个队列可以支持优先级呢? PriorityBlockingQueue 。

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序。

二叉堆

由于PriorityBlockingQueue底层采用二叉堆来实现的,所以有必要先介绍下二叉堆。

二叉堆是一种特殊的堆,就结构性而言就是完全二叉树或者是近似完全二叉树,满足树结构性和堆序性。树机构特性就是完全二叉树应该有的结构,堆序性则是:父节点的键值总是保持固定的序关系于任何一个子节点的键值,且每个节点的左子树和右子树都是一个二叉堆。它有两种表现形式:最大堆、最小堆。

最大堆:父节点的键值总是大于或等于任何一个子节点的键值(下右图)

最小堆:父节点的键值总是小于或等于任何一个子节点的键值(下走图)

二叉堆一般用数组表示,如果父节点的节点位置在n处,那么其左孩子节点为:2 * n + 1 ,其右孩子节点为2 * (n + 1),其父节点为(n - 1) / 2 处。上左图的数组表现形式为:

二叉堆的基本结构了解了,下面来看看二叉堆的添加和删除节点。二叉堆的添加和删除相对于二叉树来说会简单很多。

添加元素

首先将要添加的元素N插添加到堆的末尾位置(在二叉堆中我们称之为空穴)。如果元素N放入空穴中而不破坏堆的序(其值大于跟父节点值(最大堆是小于父节点)),那么插入完成。否则,我们则将该元素N的节点与其父节点进行交换,然后与其新父节点进行比较直到它的父节点不在比它小(最大堆是大)或者到达根节点。

假如有如下一个二叉堆

这是一个最小堆,其父节点总是小于等于任一一个子节点。现在我们添加一个元素2。

第一步:在末尾添加一个元素2,如下:

第二步:元素2比其父节点6小,进行替换,如下:

第三步:继续与其父节点5比较,小于,替换:

第四步:继续比较其跟节点1,发现跟节点比自己小,则完成,到这里元素2插入完毕。所以整个添加元素过程可以概括为:在元素末尾插入元素,然后不断比较替换直到不能移动为止。

删除元素

删除元素与增加元素一样,需要维护整个二叉堆的序。删除位置1的元素(数组下标0),则把最后一个元素空出来移到最前边,然后和它的两个子节点比较,如果两个子节点中较小的节点小于该节点,就将他们交换,知道两个子节点都比该元素大为止。

就上面二叉堆而言,删除的元素为元素1。

第一步:删掉元素1,元素6空出来,如下:

第二步:与其两个子节点(元素2、元素3)比较,都小,将其中较小的元素(元素2)放入到该空穴中:

第三步:继续比较两个子节点(元素5、元素7),还是都小,则将较小的元素(元素5)放入到该空穴中:!

第四步:比较其子节点(元素8),比该节点小,则元素6放入该空穴位置不会影响二叉堆的树结构,放入:

到这里整个删除操作就已经完成了。

二叉堆的添加、删除操作还是比较简单的,很容易就理解了。下面我们就参考该内容来开启PriorityBlockingQueue的源代码研究。

PriorityBlockingQueue


PriorityBlockingQueue继承AbstractQueue,实现BlockingQueue接口。

  1. public class PriorityBlockingQueue<E> extends AbstractQueue<E>

  2.    implements BlockingQueue<E>, java.io.Serializable

定义了一些属性:

  1.    // 默认容量

  2.    private static final int DEFAULT_INITIAL_CAPACITY = 11;

  3.    // 最大容量

  4.    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

  5.    // 二叉堆数组

  6.    private transient Object[] queue;

  7.    // 队列元素的个数

  8.    private transient int size;

  9.    // 比较器,如果为空,则为自然顺序

  10.    private transient Comparator<? super E> comparator;

  11.    // 内部锁

  12.    private final ReentrantLock lock;

  13.    private final Condition notEmpty;

  14.    //

  15.    private transient volatile int allocationSpinLock;

  16.    // 优先队列:主要用于序列化,这是为了兼容之前的版本。只有在序列化和反序列化才非空

  17.    private PriorityQueue<E> q;

内部仍然采用可重入锁ReentrantLock来实现同步机制,但是这里只有一个notEmpty的Condition,了解了ArrayBlockingQueue我们知道它定义了两个Condition,之类为何只有一个呢?原因就在于PriorityBlockingQueue是一个无界队列,插入总是会成功,除非消耗尽了资源导致服务器挂。

入列

PriorityBlockingQueue提供put()、add()、offer()方法向队列中加入元素。我们这里从put()入手:put(E e) :将指定元素插入此优先级队列。

  1.    public void put(E e) {

  2.        offer(e); // never need to block

  3.    }

PriorityBlockingQueue是无界的,所以不可能会阻塞。内部调用offer(E e):

  1.    public boolean offer(E e) {

  2.        // 不能为null

  3.        if (e == null)

  4.            throw new NullPointerException();

  5.        // 获取锁

  6.        final ReentrantLock lock = this.lock;

  7.        lock.lock();

  8.        int n, cap;

  9.        Object[] array;

  10.        // 扩容

  11.        while ((n = size) >= (cap = (array = queue).length))

  12.            tryGrow(array, cap);

  13.        try {

  14.            Comparator<? super E> cmp = comparator;

  15.            // 根据比较器是否为null,做不同的处理

  16.            if (cmp == null)

  17.                siftUpComparable(n, e, array);

  18.            else

  19.                siftUpUsingComparator(n, e, array, cmp);

  20.            size = n + 1;

  21.            // 唤醒正在等待的消费者线程

  22.            notEmpty.signal();

  23.        } finally {

  24.            lock.unlock();

  25.        }

  26.        return true;

  27.    }

siftUpComparable当比较器comparator为null时,采用自然排序,调用siftUpComparable方法:

  1.    private static <T> void siftUpComparable(int k, T x, Object[] array) {

  2.        Comparable<? super T> key = (Comparable<? super T>) x;

  3.        // “上冒”过程

  4.        while (k > 0) {

  5.            // 父级节点 (n - ) / 2

  6.            int parent = (k - 1) >>> 1;

  7.            Object e = array[parent];

  8.            // key >= parent 完成(最大堆)

  9.            if (key.compareTo((T) e) >= 0)

  10.                break;

  11.            // key < parant 替换

  12.            array[k] = e;

  13.            k = parent;

  14.        }

  15.        array[k] = key;

  16.    }

这段代码所表示的意思:将元素X插入到数组中,然后进行调整以保持二叉堆的特性。

siftUpUsingComparator当比较器不为null时,采用所指定的比较器,调用siftUpUsingComparator方法:

  1.    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,

  2.                                       Comparator<? super T> cmp) {

  3.        while (k > 0) {

  4.            int parent = (k - 1) >>> 1;

  5.            Object e = array[parent];

  6.            if (cmp.compare(x, (T) e) >= 0)

  7.                break;

  8.            array[k] = e;

  9.            k = parent;

  10.        }

  11.        array[k] = x;

  12.    }

扩容:tryGrow

  1.    private void tryGrow(Object[] array, int oldCap) {

  2.        lock.unlock();      // 扩容操作使用自旋,不需要锁主锁,释放

  3.        Object[] newArray = null;

  4.        // CAS 占用

  5.        if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {

  6.            try {

  7.                // 新容量  最小翻倍

  8.                int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) :  (oldCap >> 1));

  9.                // 超过

  10.                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow

  11.                    int minCap = oldCap + 1;

  12.                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)

  13.                        throw new OutOfMemoryError();

  14.                    newCap = MAX_ARRAY_SIZE;        // 最大容量

  15.                }

  16.                if (newCap > oldCap && queue == array)

  17.                    newArray = new Object[newCap];

  18.            } finally {

  19.                allocationSpinLock = 0;     // 扩容后allocationSpinLock = 0 代表释放了自旋锁

  20.            }

  21.        }

  22.        // 到这里如果是本线程扩容newArray肯定是不为null,为null就是其他线程在处理扩容,那就让给别的线程处理

  23.        if (newArray == null)

  24.            Thread.yield();

  25.        // 主锁获取锁

  26.        lock.lock();

  27.        // 数组复制

  28.        if (newArray != null && queue == array) {

  29.            queue = newArray;

  30.            System.arraycopy(array, 0, newArray, 0, oldCap);

  31.        }

  32.    }

整个添加元素的过程和上面二叉堆一模一样:先将元素添加到数组末尾,然后采用“上冒”的方式将该元素尽量往上冒。

出列

PriorityBlockingQueue提供poll()、remove()方法来执行出对操作。出对的永远都是第一个元素:array[0]。

  1.   public E poll() {

  2.        final ReentrantLock lock = this.lock;

  3.        lock.lock();

  4.        try {

  5.            return dequeue();

  6.        } finally {

  7.            lock.unlock();

  8.        }

  9.    }

先获取锁,然后调用dequeue()方法:

  1.    private E dequeue() {

  2.        // 没有元素 返回null

  3.        int n = size - 1;

  4.        if (n < 0)

  5.            return null;

  6.        else {

  7.            Object[] array = queue;

  8.            // 出对元素

  9.            E result = (E) array[0];

  10.            // 最后一个元素(也就是插入到空穴中的元素)

  11.            E x = (E) array[n];

  12.            array[n] = null;

  13.            // 根据比较器释放为null,来执行不同的处理

  14.            Comparator<? super E> cmp = comparator;

  15.            if (cmp == null)

  16.                siftDownComparable(0, x, array, n);

  17.            else

  18.                siftDownUsingComparator(0, x, array, n, cmp);

  19.            size = n;

  20.            return result;

  21.        }

  22.    }

siftDownComparable

如果比较器为null,则调用siftDownComparable来进行自然排序处理:

  1.    private static <T> void siftDownComparable(int k, T x, Object[] array,

  2.                                               int n) {

  3.        if (n > 0) {

  4.            Comparable<? super T> key = (Comparable<? super T>)x;

  5.            // 最后一个叶子节点的父节点位置

  6.            int half = n >>> 1;

  7.            while (k < half) {

  8.                int child = (k << 1) + 1;       // 待调整位置左节点位置

  9.                Object c = array[child];        //左节点

  10.                int right = child + 1;          //右节点

  11.                //左右节点比较,取较小的

  12.                if (right < n &&

  13.                        ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)

  14.                    c = array[child = right];

  15.                //如果待调整key最小,那就退出,直接赋值

  16.                if (key.compareTo((T) c) <= 0)

  17.                    break;

  18.                //如果key不是最小,那就取左右节点小的那个放到调整位置,然后小的那个节点位置开始再继续调整

  19.                array[k] = c;

  20.                k = child;

  21.            }

  22.            array[k] = key;

  23.        }

  24.    }

处理思路和二叉堆删除节点的逻辑一样:就第一个元素定义为空穴,然后把最后一个元素取出来,尝试插入到空穴位置,并与两个子节点值进行比较,如果不符合,则与其中较小的子节点进行替换,然后继续比较调整。

siftDownUsingComparator

如果指定了比较器,则采用比较器来进行调整:

  1.    private static <T> void siftDownUsingComparator(int k, T x, Object[] array,

  2.                                                    int n,

  3.                                                    Comparator<? super T> cmp) {

  4.        if (n > 0) {

  5.            int half = n >>> 1;

  6.            while (k < half) {

  7.                int child = (k << 1) + 1;

  8.                Object c = array[child];

  9.                int right = child + 1;

  10.                if (right < n && cmp.compare((T) c, (T) array[right]) > 0)

  11.                    c = array[child = right];

  12.                if (cmp.compare(x, (T) c) <= 0)

  13.                    break;

  14.                array[k] = c;

  15.                k = child;

  16.            }

  17.            array[k] = x;

  18.        }

  19.    }

PriorityBlockingQueue采用二叉堆来维护,所以整个处理过程不是很复杂,添加操作则是不断“上冒”,而删除操作则是不断“下掉”。掌握二叉堆就掌握了PriorityBlockingQueue,无论怎么变还是。对于PriorityBlockingQueue需要注意的是他是一个无界队列,所以添加操作是不会失败的,除非资源耗尽。

- END -

 近期热文:

关注我

点击“阅读原文”,看本号其他精彩内容

死磕Java并发:J.U.C之阻塞队列:PriorityBlockingQueue相关推荐

  1. 死磕Java并发:J.U.C之并发工具类:CountDownLatch

    作者:chenssy 来源:Java技术驿站 在上篇博客中介绍了Java四大并发工具一直的CyclicBarrier,今天要介绍的CountDownLatch与CyclicBarrier有点儿相似. ...

  2. 死磕Java并发:J.U.C之AQS:CLH同步队列

    本文转载自公号:Java技术驿站 在上篇文章"死磕Java并发:J.U.C之AQS简介"中提到了AQS内部维护着一个FIFO队列,该队列就是CLH同步队列. CLH同步队列是一个F ...

  3. 死磕Java并发:J.U.C之AQS简介

    本文转载自公众号: Java技术驿站 Java的内置锁一直都是备受争议的,在JDK 1.6之前,synchronized这个重量级锁其性能一直都是较为低下,虽然在1.6后,进行大量的锁优化策略(死磕J ...

  4. 【死磕Java并发】-----J.U.C之AQS:CLH同步队列

    原文出处:https://www.cmsblogs.com/category/1391296887813967872 『chenssy』 在上篇博客[死磕Java并发]-----J.U.C之AQS:A ...

  5. 死磕Java并发:Java内存模型之分析volatile

    近期活动:加班的你,需要一束光 本文转载自公众号: Java技术驿站 前篇文章<死磕Java并发:深入分析volatile的实现原理>中已经阐述了volatile的特性了: volatil ...

  6. 死磕Java并发:Java内存模型之happens-before

    本文转载自公众号: Java技术驿站 在上篇<死磕Java并发:深入分析volatile的实现原理>中提到过由于存在线程本地内存和主内存的原因,再加上重排序,会导致多线程环境下存在可见性的 ...

  7. 【死磕Java并发】-----Java内存模型之happens-before

    在上篇博客([死磕Java并发]-–深入分析volatile的实现原理)LZ提到过由于存在线程本地内存和主内存的原因,再加上重排序,会导致多线程环境下存在可见性的问题.那么我们正确使用同步.锁的情况下 ...

  8. 【死磕Java并发】----- 死磕 Java 并发精品合集

    原文出处:https://www.cmsblogs.com/category/1391296887813967872 『chenssy』 [死磕 Java 并发]系列是 LZ 在 2017 年写的第一 ...

  9. 死磕 Java 并发

    作者 :大明哥 博客 :http://cmsblogs.com/?cat=151 目录 : <[死磕 Java 并发]-– 深入分析 synchronized 的实现原理> <[死磕 ...

  10. 死磕Java并发:分析 ArrayBlockingQueue 构造函数加锁问题

    作者: chenssy 来源:Java技术驿站 昨天有位小伙伴问我一个 ArrayBlockingQueue 中的一个构造函数为何需要加锁,其实这个问题我还真没有注意过.主要是在看 ArrayBloc ...

最新文章

  1. 【v2.x OGE教程 18】 Entity相关
  2. INNODB的锁的类型
  3. 云栖专辑 | 阿里开发者们的第11个感悟:拥抱变化,用正确的方法对待工作
  4. springmvc+mybatis多数据源配置,AOP注解动态切换数据源
  5. c语言利用参数方程绘图,CG实验1-利用C语言图形函数绘图概要1.doc
  6. win10系统崩溃怎么修复_新手怎么重装系统win10
  7. automake 安装及使用
  8. 无聊之时用css3自制了好看的button样式和input样式
  9. Flask 框架的网站实现
  10. 《东周列国志》第六回 卫石碏大义灭亲 郑庄公假命伐宋
  11. 1427: 数字转换
  12. Java实现QQ邮件群发功能
  13. 证明:模n加法满足结合律
  14. Android原生蓝牙音乐绑定、Sink端play流程
  15. 软件工程之高质量代码(编码规范)
  16. 短视频剪辑软件分享,短视频剪辑软件这几个很不错。​
  17. win服务器物理内存占用高,win10系统长时间使用物理内存过高的解决方法
  18. 如何开心愉快兴趣满满的学习机器人和人工智能知识并提升思维力
  19. Python图像处理(13):brisk特征检测
  20. 一种灵活可靠的工作方式:组件化设计与开发

热门文章

  1. mysql null 和 空字符串 区别
  2. KeDelayExecutionThread使用注意
  3. WideCharToMultiByte和MultiByteToWideChar函数的用法(ascii转unicode unicode转ascii)
  4. linux shell (()) 双括号运算符使用
  5. 商人过河 java_商人过河问题(二)java实现
  6. java beans 组件_如何利用JavaBeans在应用程序中创建组件?
  7. 使用ELK 搭建core文件展示平台
  8. Linux信号列表(sigint sigtstp
  9. java 根据详细地址提取小区_Java分析/测试工具EJ Technologies JProfiler介绍及安装教程...
  10. CMake命令之execute_process