1.7中HashMap死循环分析

put()方法

public V put(K key, V value) {if (table == EMPTY_TABLE) {inflateTable(threshold);}if (key == null)return putForNullKey(value);int hash = hash(key);int i = indexFor(hash, table.length);for (Entry<K,V> e = table[i]; e != null; e = e.next) {Object k;if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {V oldValue = e.value;e.value = value;e.recordAccess(this);return oldValue;}}modCount++;addEntry(hash, key, value, i);return null;}
void addEntry(int hash, K key, V value, int bucketIndex) {    if ((size >= threshold) && (null != table[bucketIndex])) {        resize(2 * table.length);//扩容操作        hash = (null != key) ? hash(key) : 0;        bucketIndex = indexFor(hash, table.length);    }

    createEntry(hash, key, value, bucketIndex);}
void resize(int newCapacity) {    Entry[] oldTable = table;    int oldCapacity = oldTable.length;    if (oldCapacity == MAXIMUM_CAPACITY) {        threshold = Integer.MAX_VALUE;        return;    }

    Entry[] newTable = new Entry[newCapacity];    transfer(newTable, initHashSeedAsNeeded(newCapacity));    table = newTable;    threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);}
/** * Transfers all entries from current table to newTable. */void transfer(Entry[] newTable, boolean rehash) {    int newCapacity = newTable.length;    for (Entry<K,V> e : table) {        while(null != e) {            Entry<K,V> next = e.next;            if (rehash) {                e.hash = null == e.key ? 0 : hash(e.key);            }            int i = indexFor(e.hash, newCapacity);            e.next = newTable[i]; //1.7中使用头插法插入            newTable[i] = e;            e = next;        }    }}
/** * Returns index for hash code h. */static int indexFor(int h, int length) {    // assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";    return h & (length-1);}

综合来说,HashMap一次扩容的过程:

1、取当前table的2倍作为新table的大小

2、根据算出的新table的大小new出一个新的Entry数组来,名为newTable

3、轮询原table的每一个位置,将每个位置上连接的Entry,算出在新table上的位置,并以链表形式连接

4、原table上的所有Entry全部轮询完毕之后,意味着原table上面的所有Entry已经移到了新的table上,HashMap中的table指向newTable

单线程下扩容

并发下扩容

HashMap之所以在并发下的扩容造成死循环,是因为,多个线程并发进行时,因为一个线程先期完成了扩容,将原Map的链表重新散列到自己的表中,并且链表变成了倒序,后一个线程再扩容时,又进行自己的散列,再次将倒序链表变为正序链表。于是形成了一个环形链表,当get表中不存在的元素时,造成死循环。

1.8中HashMap使用尾插法,避免了扩容时产生链表死循环。

ConcurrentHahsMap实现分析

1.7下的实现

static final class Segment<K,V> extends ReentrantLock implements Serializable {//segment本身也是锁
transient volatile HashEntry<K,V>[] table;
/*** ConcurrentHashMap list entry. Note that this is never exported* out as a user-visible Map.Entry.*/
static final class HashEntry<K,V> {final int hash;final K key;volatile V value;volatile HashEntry<K,V> next;/*** Creates a new, empty map with a default initial capacity (16),* load factor (0.75) and concurrencyLevel (16).*/
public ConcurrentHashMap() {this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}*** Creates a new, empty map with the specified initial* capacity, load factor and concurrency level.** @param initialCapacity the initial capacity. The implementation* performs internal sizing to accommodate this many elements.* @param loadFactor  the load factor threshold, used to control resizing.* Resizing may be performed when the average number of elements per* bin exceeds this threshold.* @param concurrencyLevel the estimated number of concurrently* updating threads. The implementation performs internal sizing* to try to accommodate this many threads.* @throws IllegalArgumentException if the initial capacity is* negative or the load factor or concurrencyLevel are* nonpositive.*/
@SuppressWarnings("unchecked")
public ConcurrentHashMap17(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel = MAX_SEGMENTS;// Find power-of-two sizes best matching arguments 找到一个离并发度最近的2的平方数作为segment数组大小int sshift = 0;int ssize = 1;while (ssize < concurrencyLevel) {++sshift;ssize <<= 1;}this.segmentShift = 32 - sshift;this.segmentMask = ssize - 1;if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;int cap = MIN_SEGMENT_TABLE_CAPACITY;while (cap < c)cap <<= 1;// create segments and segments[0]Segment<K,V> s0 =new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]this.segments = ss;
}

ConcurrentHashMap初始化方法是通过initialCapacity、loadFactor和concurrencyLevel(参数concurrencyLevel是用户估计的并发级别,就是说你觉得最多有多少线程共同修改这个map,根据这个来确定Segment数组的大小concurrencyLevel默认是DEFAULT_CONCURRENCY_LEVEL = 16;)等几个参数来初始化segment数组、段偏移量segmentShift、段掩码segmentMask和每个segment里的HashEntry数组来实现的。

get()

public V get(Object key) {Segment<K,V> s; // manually integrate access methods to reduce overheadHashEntry<K,V>[] tab;int h = hash(key);//hash值的高位定位segment ,hash值的全部用于定位tablelong u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {//遍历链表定位K k;if ((k = e.key) == key || (e.hash == h && key.equals(k)))return e.value;}}return null;}
private int hash(Object k) {    int h = hashSeed;

    if ((0 != h) && (k instanceof String)) {        return sun.misc.Hashing.stringHash32((String) k);    }

    h ^= k.hashCode();

    // Spread bits to regularize both segment and index locations,    // using variant of single-word Wang/Jenkins hash.    h += (h <<  15) ^ 0xffffcd7d;    h ^= (h >>> 10);    h += (h <<   3);    h ^= (h >>>  6);    h += (h <<   2) + (h << 14);    return h ^ (h >>> 16);}HashEntry属性定义使用了 final和volatile get()方法不需要加锁,也能获取到最新的值

put()

public V put(K key, V value) {Segment<K,V> s;if (value == null)throw new NullPointerException();int hash = hash(key);int j = (hash >>> segmentShift) & segmentMask;if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck(segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegments = ensureSegment(j);return s.put(key, hash, value, false);}
private Segment<K,V> ensureSegment(int k) {//初始化segment的槽    final Segment<K,V>[] ss = this.segments;    long u = (k << SSHIFT) + SBASE; // raw offset    Segment<K,V> seg;    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {        Segment<K,V> proto = ss[0]; // use segment 0 as prototype        int cap = proto.table.length;        float lf = proto.loadFactor;        int threshold = (int)(cap * lf);        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))            == null) { // recheck            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))                   == null) {                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))//循环CAS操作,保证Segment初始化只有一个线程成功                    break;            }        }    }    return seg;}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {//segment方法    HashEntry<K,V> node = tryLock() ? null :        scanAndLockForPut(key, hash, value);    V oldValue;    try {        HashEntry<K,V>[] tab = table;        int index = (tab.length - 1) & hash;        HashEntry<K,V> first = entryAt(tab, index);        for (HashEntry<K,V> e = first;;) {            if (e != null) {                K k;                if ((k = e.key) == key ||                    (e.hash == hash && key.equals(k))) {                    oldValue = e.value;                    if (!onlyIfAbsent) {                        e.value = value;                        ++modCount;                    }                    break;                }                e = e.next;            }            else {                if (node != null)                    node.setNext(first);//把当前的头结点变成下一个节点,(头插法)                else                    node = new HashEntry<K,V>(hash, key, value, first);                int c = count + 1;                if (c > threshold && tab.length < MAXIMUM_CAPACITY)                    rehash(node);                else                    setEntryAt(tab, index, node);                ++modCount;                count = c;                oldValue = null;                break;            }        }    } finally {        unlock();    }    return oldValue;}
/** * Sets next field with volatile write semantics.  (See above * about use of putOrderedObject.) */final void setNext(HashEntry<K,V> n) {    UNSAFE.putOrderedObject(this, nextOffset, n);}
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {//segment方法,尝试一定次数循序拿锁    HashEntry<K,V> first = entryForHash(this, hash);    HashEntry<K,V> e = first;    HashEntry<K,V> node = null;    int retries = -1; // negative while locating node    while (!tryLock()) {        HashEntry<K,V> f; // to recheck first below        if (retries < 0) {            if (e == null) {                if (node == null) // speculatively create node                    node = new HashEntry<K,V>(hash, key, value, null);                retries = 0;            }            else if (key.equals(e.key))                retries = 0;            else                e = e.next;        }        else if (++retries > MAX_SCAN_RETRIES) {            lock();//循序拿锁失败则 阻塞式拿锁            break;        }        else if ((retries & 1) == 0 &&                 (f = entryForHash(this, hash)) != first) {            e = first = f; // re-traverse if entry changed            retries = -1;        }    }    return node;}

rehash()扩容

/*** Doubles size of table and repacks entries, also adding the* given node to new table*/
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {//扩容操作/** Reclassify nodes in each list to new table.  Because we* are using power-of-two expansion, the elements from* each bin must either stay at same index, or move with a* power of two offset. We eliminate unnecessary node* creation by catching cases where old nodes can be* reused because their next fields won't change.* Statistically, at the default threshold, only about* one-sixth of them need cloning when a table* doubles. The nodes they replace will be garbage* collectable as soon as they are no longer referenced by* any reader thread that may be in the midst of* concurrently traversing table. Entry accesses use plain* array indexing because they are followed by volatile* table write.*/HashEntry<K,V>[] oldTable = table;int oldCapacity = oldTable.length;int newCapacity = oldCapacity << 1;//变为原来的2倍大小threshold = (int)(newCapacity * loadFactor);HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity];int sizeMask = newCapacity - 1;for (int i = 0; i < oldCapacity ; i++) {HashEntry<K,V> e = oldTable[i];if (e != null) {HashEntry<K,V> next = e.next;int idx = e.hash & sizeMask;if (next == null)   //  Single node on listnewTable[idx] = e;else { // Reuse consecutive sequence at same slotHashEntry<K,V> lastRun = e;int lastIdx = idx;for (HashEntry<K,V> last = next;last != null;last = last.next) {int k = last.hash & sizeMask;if (k != lastIdx) {lastIdx = k;lastRun = last;}}newTable[lastIdx] = lastRun;// Clone remaining nodesfor (HashEntry<K,V> p = e; p != lastRun; p = p.next) {V v = p.value;int h = p.hash;int k = h & sizeMask;HashEntry<K,V> n = newTable[k];newTable[k] = new HashEntry<K,V>(h, p.key, v, n);}}}}int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);newTable[nodeIndex] = node;table = newTable;
}

remove()

public V remove(Object key) {    int hash = hash(key);    Segment<K,V> s = segmentForHash(hash);    return s == null ? null : s.remove(key, hash, null);}
/** * Remove; match on key only if value null, else match both. */final V remove(Object key, int hash, Object value) {    if (!tryLock())        scanAndLock(key, hash);    V oldValue = null;    try {        HashEntry<K,V>[] tab = table;        int index = (tab.length - 1) & hash;        HashEntry<K,V> e = entryAt(tab, index);        HashEntry<K,V> pred = null;        while (e != null) {            K k;            HashEntry<K,V> next = e.next;            if ((k = e.key) == key ||                (e.hash == hash && key.equals(k))) {                V v = e.value;                if (value == null || value == v || value.equals(v)) {                    if (pred == null)                        setEntryAt(tab, index, next);                    else                        pred.setNext(next);                    ++modCount;                    --count;                    oldValue = v;                }                break;            }            pred = e;            e = next;        }    } finally {        unlock();    }    return oldValue;}

size() 统计ConcurrentHashMap元素个数(一般不提倡使用,还有相同的containsValue(Object value)方法,同样存在对整个segment加锁的情况,不提倡使用)

/*** Returns the number of key-value mappings in this map.  If the* map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns* <tt>Integer.MAX_VALUE</tt>.** @return the number of key-value mappings in this map*/public int size() {// Try a few times to get accurate count. On failure due to// continuous async changes in table, resort to locking.final Segment<K,V>[] segments = this.segments;int size;boolean overflow; // true if size overflows 32 bitslong sum;         // sum of modCountslong last = 0L;   // previous sumint retries = -1; // first iteration isn't retrytry {for (;;) {if (retries++ == RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)ensureSegment(j).lock(); // force creation//segment全部加锁
                }sum = 0L;size = 0;overflow = false;for (int j = 0; j < segments.length; ++j) {Segment<K,V> seg = segmentAt(segments, j);if (seg != null) {sum += seg.modCount;int c = seg.count;if (c < 0 || (size += c) < 0)overflow = true;}}if (sum == last)break;last = sum;}} finally {if (retries > RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)segmentAt(segments, j).unlock();}}return overflow ? Integer.MAX_VALUE : size;}

JDK1.8中的ConcurrentHashMap()

取消了Segment,采用 数组+链表(n)+红黑树(logN)结构

数据结构

static class Node<K,V> implements Map.Entry<K,V> {final int hash;final K key;volatile V val;volatile Node<K,V> next;Node(int hash, K key, V val, Node<K,V> next) {this.hash = hash;this.key = key;this.val = val;this.next = next;}

TreeNode 红黑树下的节点

static final int TREEIFY_THRESHOLD = 8;//链表节点数达到8个时变为红黑树
static final int UNTREEIFY_THRESHOLD = 6;//红黑树节点变为6个时重新成为链表
static final class TreeNode<K,V> extends Node<K,V> {TreeNode<K,V> parent;  // red-black tree linksTreeNode<K,V> left;TreeNode<K,V> right;TreeNode<K,V> prev;    // needed to unlink next upon deletionboolean red;

TreeBin 红黑树的根节点

static final class TreeBin<K,V> extends Node<K,V> {TreeNode<K,V> root;volatile TreeNode<K,V> first;volatile Thread waiter;volatile int lockState;// values for lockState//读写锁状态static final int WRITER = 1; // set while holding write lockstatic final int WAITER = 2; // set when waiting for write lockstatic final int READER = 4; // increment value for setting read lock

HashMap与ConcurrentHashMap中Node区别:

HashMap中TreeNode继承自LinkedHashMap 

static final class TreeNode<K,V> extends LinkedHashMap.Entry<K,V> {TreeNode<K,V> parent;  // red-black tree linksTreeNode<K,V> left;TreeNode<K,V> right;TreeNode<K,V> prev;    // needed to unlink next upon deletionboolean red;

ForwardingNode (扩容时,作为一个占位符放在table中,表示当前节点为null或者已经被移除)

/*** A node inserted at head of bins during transfer operations.*/static final class ForwardingNode<K,V> extends Node<K,V> {final Node<K,V>[] nextTable;ForwardingNode(Node<K,V>[] tab) {super(MOVED, null, null, null);this.nextTable = tab;}

private transient volatile int sizeCtl 控制1.8里table 初始化或扩容操作

/*** Table initialization and resizing control.  When negative, the* table is being initialized or resized: -1 for initialization,* else -(1 + the number of active resizing threads).  Otherwise,* when table is null, holds the initial table size to use upon* creation, or 0 for default. After initialization, holds the* next element count value upon which to resize the table.*/-1 表示当前线程正在进行初始化,其他表示当前有多少个线程正在进行扩容操作,默认0尚未初始化private transient volatile int sizeCtl;

核心方法

 @SuppressWarnings("unchecked")/*利用硬件级别的原子操作,获得在i位置上的Node节点* Unsafe.getObjectVolatile可以直接获取指定内存的数据,* 保证了每次拿到数据都是最新的*/static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);}/*利用CAS操作设置i位置上的Node节点*/static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) {return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);}/*利用硬件级别的原子操作,设置在i位置上的Node节点* Unsafe.putObjectVolatile可以直接设定指定内存的数据,* 保证了其他线程访问这个节点时一定可以看到最新的数据*/static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);}

get()方法

public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;int h = spread(key.hashCode());/*计算hash值*//*根据hash值确定节点位置*/if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {/*Node数组中的节点就是要找的节点*/if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}/*eh<0 说明这个节点在树上 调用树的find方法寻找*/else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;/*到这一步说明是个链表,遍历链表找到对应的值并返回*/while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;}

put()

public V put(K key, V value) {return putVal(key, value, false);}/** Implementation for put and putIfAbsent */final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());/*计算hash值*/int binCount = 0;/*死循环 何时插入成功 何时跳出*/for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable();/*如果table为空的话,初始化table*/else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {/*Node数组中的元素,这个位置没有值 ,使用CAS操作放进去*/if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break;                   // no lock when adding to empty bin
            }else if ((fh = f.hash) == MOVED)/*正在进行扩容,当前线程帮忙扩容*/tab = helpTransfer(tab, f);else {V oldVal = null;/*锁Node数组中的元素,这个位置是Hash冲突组成链表的头结点* 或者是红黑树的根节点*/synchronized (f) {if (tabAt(tab, i) == f) {/*fh>0 说明这个节点是一个链表的节点 不是树的节点*/if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;/*put操作和putIfAbsent操作业务实现*/if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;/*如果遍历到了最后一个结点,使用尾插法,把它插入在链表尾部*/尾插法if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}/*按照树的方式插入值*/else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {/*达到临界值8 就需要把链表转换为树结构*/if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}/*Map的元素数量+1,并检查是否需要扩容*/addCount(1L, binCount);return null;}
/** * Initializes table, using the size recorded in sizeCtl. */private final Node<K,V>[] initTable() {    Node<K,V>[] tab; int sc;    while ((tab = table) == null || tab.length == 0) {        /*小于0表示有其他线程正在进行初始化操作,把当前线程CPU时间让出来。        因为对于table的初始化工作,只能有一个线程在进行。*/        if ((sc = sizeCtl) < 0)            Thread.yield(); // lost initialization race; just spin        /*利用CAS操作把sizectl的值置为-1 表示本线程正在进行初始化*/        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {            try {                if ((tab = table) == null || tab.length == 0) {                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;                    @SuppressWarnings("unchecked")                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];                    table = tab = nt;                    /*n右移2位本质上就是n变为n原值的1/4,所以                    * sc=0.75*n */                    sc = n - (n >>> 2);                }            } finally {                /*将设置成扩容的阈值*/                sizeCtl = sc;            }            break;        }    }    return tab;}

remove()

public V remove(Object key) {return replaceNode(key, null, null);}/*** Implementation for the four public remove/replace methods:* Replaces node value with v, conditional upon match of cv if* non-null.  If resulting value is null, delete.*/final V replaceNode(Object key, V value, Object cv) {int hash = spread(key.hashCode());for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0 ||(f = tabAt(tab, i = (n - 1) & hash)) == null)break;else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;boolean validated = false;synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {validated = true;for (Node<K,V> e = f, pred = null;;) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {V ev = e.val;if (cv == null || cv == ev ||(ev != null && cv.equals(ev))) {oldVal = ev;if (value != null)e.val = value;else if (pred != null)pred.next = e.next;elsesetTabAt(tab, i, e.next);}break;}pred = e;if ((e = e.next) == null)break;}}else if (f instanceof TreeBin) {validated = true;TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> r, p;if ((r = t.root) != null &&(p = r.findTreeNode(hash, key, null)) != null) {V pv = p.val;if (cv == null || cv == pv ||(pv != null && cv.equals(pv))) {oldVal = pv;if (value != null)p.val = value;else if (t.removeTreeNode(p))setTabAt(tab, i, untreeify(t.first));}}}}}if (validated) {if (oldVal != null) {if (value == null)addCount(-1L, -1);return oldVal;}break;}}}return null;}

transfer()扩容

 /*** Moves and/or copies the nodes in each bin to new table. See* above for explanation.*/private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range  每个线程负责一定的长度的Node进行扩容(并发扩容)if (nextTab == null) {            // initiatingtry {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) {      // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = n;}int nextn = nextTab.length;ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;while (advance) {int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit
                }}else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {synchronized (f) {if (tabAt(tab, i) == f) {Node<K,V> ln, hn;if (fh >= 0) {int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}}

treeifyBin()
/* ---------------- Conversion from/to TreeBins -------------- *//*** Replaces all linked nodes in bin at given index unless table is* too small, in which case resizes instead.*/private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;if (tab != null) {if ((n = tab.length) < MIN_TREEIFY_CAPACITY)tryPresize(n << 1);else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {synchronized (b) {if (tabAt(tab, index) == b) {TreeNode<K,V> hd = null, tl = null;for (Node<K,V> e = b; e != null; e = e.next) {TreeNode<K,V> p =new TreeNode<K,V>(e.hash, e.key, e.val,null, null);if ((p.prev = tl) == null)hd = p;elsetl.next = p;tl = p;}setTabAt(tab, index, new TreeBin<K,V>(hd));}}}}}

size()

public int size() {long n = sumCount();return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);}
final long sumCount() {    CounterCell[] as = counterCells; CounterCell a;    long sum = baseCount;    if (as != null) {        for (int i = 0; i < as.length; ++i) {            if ((a = as[i]) != null)                sum += a.value;//两部分组成        }    }    return sum;}
private final void addCount(long x, int check) {    CounterCell[] as; long b, s;    if ((as = counterCells) != null ||        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//cas操作,尝试将次数加入到BASECOUNT中,        CounterCell a; long v; int m;//失败的话则由CountCell进行记录,所以size()方法中统计count包括这两部分        boolean uncontended = true;        if (as == null || (m = as.length - 1) < 0 ||            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||            !(uncontended =              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {            fullAddCount(x, uncontended);            return;        }        if (check <= 1)            return;        s = sumCount();    }    if (check >= 0) {        Node<K,V>[] tab, nt; int n, sc;        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&               (n = tab.length) < MAXIMUM_CAPACITY) {            int rs = resizeStamp(n);            if (sc < 0) {                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||                    transferIndex <= 0)                    break;                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))                    transfer(tab, nt);            }            else if (U.compareAndSwapInt(this, SIZECTL, sc,                                         (rs << RESIZE_STAMP_SHIFT) + 2))                transfer(tab, null);            s = sumCount();        }    }}
@sun.misc.Contended static final class CounterCell {//伪共享    volatile long value;    CounterCell(long x) { value = x; }}

ConcurrentSkipListMap和ConcurrentSkipListSet存储有序的Map和set

TreeMap和TreeSet(TreeSet是对TreeMap的重新包装)并发编程不安全。

SkipList(跳表)(概率数据结构)

跳跃表其实也是一种通过“空间来换取时间”的一个算法,令链表的每个结点不仅记录next结点位置,还可以按照level层级分别记录后继第level个结点。此法使用的就是“先大步查找确定范围,再逐渐缩小迫近”的思想进行的查找。跳跃表在算法效率上很接近红黑树。

ConcurrentLinkedQueue (LinkedList的并发版本)

无界非阻塞队列,它是一个基于链表的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的,尾是最近加入的。插入元素是追加到尾上。提取一个元素是从头提取。

oncurrentLinkedQueue.add("c");

concurrentLinkedQueue.offer("d"); // 将指定元素插入到此队列的尾部。

concurrentLinkedQueue.peek(); // 检索并不移除此队列的头,如果此队列为空,则返回 null。

concurrentLinkedQueue.poll(); // 检索并移除此队列的头,如果此队列为空,则返回 null。

CopyOnWriteArrayList和CopyOnWriteArraySet(写时复制容器)

使用场景:大多数读,偶尔写时(白名单,黑名单)

CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。

这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。如果读的时候有多个线程正在向CopyOnWriteArrayList添加数据,读还是会读到旧的数据,因为写的时候不会锁住旧的CopyOnWriteArrayList。

使用CopyOnWriteMap需要注意两件事情:

1. 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。

2. 使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。

写时复制容器的问题

性能问题

每次修改都创建一个新数组,然后复制所有内容,如果数组比较大,修改操作又比较频繁,可以想象,性能是很低的,而且内存开销会很大。

数据一致性问题。

CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,不要使用CopyOnWrite容器。

阻塞队列(BlockingQueue)

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。

在队列中插入一个队列元素称为入队,从队列中删除一个队列元素称为出队。因为队列只允许在一端插入,在另一端删除,所以只有最早进入队列的元素才能最先从队列中删除,故队列又称为先进先出(FIFO—first in first out)线性表。

阻塞队列:

1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。

2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空

抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queuefull")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。

返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。

一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。

超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出

一般在生产者,消费者模式中使用较多;

常用阻塞队列:

·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。(有界表示队列有长度,满了以后就后产生阻塞,反正表示无界,但实际过程中,由于系统资源限制,无界也会受空间限制)

·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。

·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。

·DelayQueue:一个使用优先级队列实现的无界阻塞队列。

·SynchronousQueue:一个不存储元素的阻塞队列。

·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列

ArrayBlockingQueue

是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。初始化时有参数可以设置

LinkedBlockingQueue

是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

Array实现和Linked实现的区别

1. 队列中锁的实现不同

ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁;

LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock

2. 在生产或消费时操作不同

ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的;

LinkedBlockingQueue实现的队列中在生产和消费的时候,需要把枚举对象转换为Node<E>进行插入或移除,会影响性能

3. 队列大小初始化方式不同

ArrayBlockingQueue实现的队列中必须指定队列的大小;

LinkedBlockingQueue实现的队列中可以不指定队列的大小,但是默认是Integer.MAX_VALUE

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

DelayQueue

是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。

缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。还有订单到期,限时支付等等

SynchronousQueue

是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

LinkedTransferQueue

多了tryTransfer和transfer方法,

(1)transfer方法

如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。

(2)tryTransfer方法

tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。

LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。

多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,使用时还是用带有First和Last后缀的方法更清楚。在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。

阻塞队列实现原理:

使用了等待通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。通过查看JDK源码发现ArrayBlockingQueue使用了Condition来实现。其余队列的实现,大家可以自行查看,队列的实现的代码总体来说,并不复杂。

参考:http://enjoy.ke.qq.com/

转载于:https://www.cnblogs.com/cangshublogs/p/10838901.html

并发编程(六)并发容器相关推荐

  1. JUC并发编程六 并发架构--偏向锁

    偏向锁 对象默认开启了偏向锁. import lombok.extern.slf4j.Slf4j; import org.openjdk.jol.info.ClassLayout;@Slf4j(top ...

  2. Java并发编程:并发容器之CopyOnWriteArrayList(转载)

    Java并发编程:并发容器之CopyOnWriteArrayList(转载) 原文链接: http://ifeve.com/java-copy-on-write/ Copy-On-Write简称COW ...

  3. 【转】Java并发编程:并发容器之ConcurrentHashMap

    JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能.因为同步容器将所有对容器状态的访问都串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了 ...

  4. 【Java并发编程】并发编程大合集

    转载请注明出处:http://blog.csdn.net/ns_code/article/details/17539599 为了方便各位网友学习以及方便自己复习之用,将Java并发编程系列内容系列内容 ...

  5. 【檀越剑指大厂--并发编程】并发编程总结

    并发编程 一.并发基础 1.什么是并行和并发? 并行,表示两个线程同时(同一时间)做事情. 并发,表示一会做这个事情,一会做另一个事情,存在着调度. 单核 CPU 不可能存在并行(微观上). 2.什么 ...

  6. 并发编程——JUC并发编程知识脑图

    摘要 并发编程在软件编程中尤为突出和重要,在当今面试或工作中也是不可缺少的.作为一名高级java开发工程师,并发编程的技能已经成为了重要的一项.本博文将详细介绍并发编程中的知识点和知识脑图,帮助大家更 ...

  7. 并发编程-15并发容器(J.U.C)核心 AbstractQueuedSynchronizer 抽象队列同步器AQS介绍

    文章目录 J.U.C脑图 J.U.C核心AQS简介 AQS底层数据结构 AQS特点 J.U.C脑图 为了体现出AQS和线程池的重要性,上图单独将AQS和线程池拿出来了. J.U.C的构成如下: J.U ...

  8. java并发编程并发容器_Java并发编程:同步容器

    为了方便编写出线程安全的程序,Java里面提供了一些线程安全类和并发工具,比如:同步容器.并发容器.阻塞队列.Synchronizer(比如CountDownLatch).今天我们就来讨论下同步容器. ...

  9. Java并发编程:同步容器

    为了方便编写出线程安全的程序,Java里面提供了一些线程安全类和并发工具,比如:同步容器.并发容器.阻塞队列.Synchronizer(比如CountDownLatch).今天我们就来讨论下同步容器. ...

  10. java面试-Java并发编程(六)——线程间的通信

    多条线程之间有时需要数据交互,下面介绍五种线程间数据交互的方式,他们的使用场景各有不同. 1. volatile.synchronized关键字 PS:关于volatile的详细介绍请移步至:Java ...

最新文章

  1. Python编程基础:第四十七节 抽象类Abstract Classes
  2. 和自己和解:方法的借鉴level
  3. 74HC595芯片使用说明
  4. source 1.5 中不支持 diamond 运算符
  5. CentOS7文本模式下配置及安装KVM虚拟机
  6. 从芯片到 AI,52 岁英特尔的蜕变!
  7. 心海软件学心理测试系统,心海软件-心理测试、心理档案、心理咨询室管理系统...
  8. 【数据库】HIVE SQL实现本月一号、月底的提取
  9. 页面打印插件 jquery.jqprint.js 插件使用实例
  10. linux看不到光模块,万兆光模块无法识别问题
  11. 百度AI语音SDK集成
  12. centos7 安装Kong和Konga
  13. 如何利用PPT制作九宫格图片效果
  14. Qt编写物联网管理平台39-报警联动
  15. 怎么批量查询快递物流信息
  16. 迟了太久,就不必到了
  17. Dmc雷赛板卡仿写(十):线程池的调用,(json中的maxthreadcount字段),写调试界面测试之前写的LTDMC的接口
  18. 基于51单片机的lcd1602万年历设计_温度+闹钟心得
  19. jenkins如何配置QQ邮箱,做预警
  20. uniapp 阻止事件冒泡

热门文章

  1. Vue快速上手笔记1 - 使用初体验
  2. java 类型转换方法_Java中的实用类型转换的方法
  3. ret php,php生成器的send方法详解,php yield send
  4. ajax无刷新提交表,Ajax无刷新提交表单和显示
  5. AcWing 788. 逆序对的数量
  6. Home_W的几何题 (计算几何)
  7. php android 乱码,如何解决android php 中文乱码问题
  8. 面试被问项目上线没_从面试官角度谈观察到的程序员技能瓶颈
  9. sklearn+gensim︱jieba分词、词袋doc2bow、TfidfVectorizer
  10. R︱mlr包挑选最适机器学习模型+变量评估与选择(案例详解)