ConcurrentHashMap源码底层分析
1.ConcurrentHashMap初始化
jdk8之后,ConcurrentHashMap
采用了HashMap的底层结构(数据,链表,红黑树),在此基础上保障了线程安全问题.
public ConcurrentHashMap() { }
- 无参构造
public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }
- 有参构造,初始化容量,第一步判断其参数是否合法化,第二步判断参数的大小,如果大于
MAXIMUM_CAPACITY
的一半,那么直接让初始容量最大化,没那么大的话就去执行tableSizeFor
方法
public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); }
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }
- 有参构造,指定容量和负载因子大小和并发级,第一步还是老惯例判断参数合法化,第二步则判断容量是否小于并发级,为了保障预估的线程数有相同数量的容器.这里不理解,没关系.后面则是根据容量和负载因子,计算size,看看是否需要调用
tableSizeFor
方法,和之前一样最后赋值给容量,然后将我们的容量初始化给成员变量sizeCtl
private transient volatile int sizeCtl;
- 这个字段的具体含义是用来表示控制含义的,后面在做分析.
private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }
- 这个方法在HashMap中是一模一样,就不多赘述了,其本质是调整容量大小为2的幂次.
2.ConcurrentHashMap的put方法
public V put(K key, V value) { return putVal(key, value, false); }
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
老样子,这个方法的实现还是依旧很长,所以需要去仔细阅读,将代码进行拆分去理解
-
先计算key的哈希值,然后通过spread减少哈希冲突
int hash = spread(key.hashCode());
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
spread
源码的意思就是通过哈希值进行异或运算得到的结果与HASH_BITS
进行与运算,那么这个结果有什么独特之处呢?这里的HASH_BITS
常量的值是0x7fffffff
,它是一个十六进制数,对应的二进制形式是0111 1111 1111 1111 1111 1111 1111 1111
。也就是说最后与运算的结果处最高位其余结果位都根据前者计算,也就决定了这个运算结果的范围: [0, 2^31 - 1] 之间,也就可以正确映射到哈希表中,有效分布在桶中. -
紧接着就是一个大大的for循环里面包含着很多逻辑,为了拆分更好的理解里面的if-else语句,于是一个一个看
for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable();
- 第一个if简单清晰,就是对
bucket
进行判断,如果为0或者null说明还没初始化表,也就是数组为空的情况,initTable
方法后面再看.
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin }
- 这个if判断在HashMap中也有出现,就是判断一下桶的那个位置是否为空,为空表明此时不存在哈希冲突,申请一个新节点放进去,这个放的过程就是CAS的过程,底层通过unsafe类进行的操作,操作成功则插入成功,操作失败说明有人先行一步,则break,下次再来.
else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);
-
这个if是获取节点的哈希值,如果此时节点的哈希值等于MOVED,说明容器正在进行扩容,这个地方不理解可能是正常的
static final int MOVED = -1; // hash for forwarding nodes static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
-
这几个字段其本身具有特殊含义,现在理解没什么意义,要结合后面的代码具体理解,这个地方是与普通的HashMap不一样的地方,不需要过度纠结
-
helpTransfer
方法后面再理解比较合适
-
由于最后一个else代码过长,所以同样进行拆分
else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) {
- 走到这里,说明之前声明的引用指向的索引位置不为空,需要去走相应的逻辑.也就是说遇到了哈希冲突.首先申请了一个旧值,这个旧值的作用后续会知道,
f
则是我们这个桶的引用,也就是指向,也就是第一个节点的位置,进行同步操作,然后获取一下索引对于哈希表的节点,然后判断一下f
是不是首个元素的指向,如果是说明在进入同步代码块之前没有被其他线程改变,那么ok,然后在判断fh
是不是大于0,这个fh
也就是前文的哈希值,如果大于0,则说明是链表,执行后续的操作.
if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } }
bigCount
这个字段则是用来记录链表属性长度的,然后去遍历链表,如果发现了相等的key,则判断一下是否需要值覆盖,同时把旧值保存给我们之前声明好的变量oldVal
中去,如果没找到,说明到了链表末尾,则尾插,说明不存在相同的key值.
else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } }
- 这一部分则是红黑树节点的情况了,这里的设计思路和之前类似,不过在插入时调用的是红黑树的插入,然后保存一下旧值.
if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; }
- 这个最后的判断,则是用来判断链表是否过长的原因,然后就保存的旧值返回.
addCount(1L, binCount);
- 最后也就是增加一下元素数量.
- 第一个if简单清晰,就是对
3.initTable方法
这个方法在此之前有被调用过,也就是put方法中第一次判断的时候对数组初始化的时候.
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
代码量不是很长,先循环保证一下数组没有被初始化,因为是多线程的情况下,所以去判断一下sizeCtl
这个控制字段,根据下文解释,也能理解,其意思就是如果这个字段小于0,说明此时有线程正在进行初始化,那么就让此线程让出时间片,为什么这么说,看后面的else-if语句就知道,他会进行CAS操作,保证sizeCtl
这个字段被设置为-1,也就跟拿到了锁一样.然后进行初始化就完事,默认容量跟HashMap一样16,让tab引用指向他,让内部实际的哈希表table
指向他,table
是被volatile
修饰的表示可见的,要对其进行赋值,至于 sc = n - (n >>> 2);
就是n的四分之三,也就是计算阈值,默认情况下就是12,然后赋值给我们这个控制字段,因此 sizeCtl
这个字段其实也本身有阈值的含义.最后结束.
4.treeifyBin方法
链表长度大于8的时候就会调用此方法
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
代码量其实也不是很长,并不难理解,也就是当我们的数组长度如果小于MIN_TREEIFY_CAPACITY
(默认64)时,就执行tryPresize(n << 1);
方法,这个方法过会再看,否则则需要对链表进行一个转变,除此之外的if-else则是判断链表是否有效,哈希值是否大于0,之后用synchronized
去获得我们那个链表节点的锁.接下来就是遍历将链表节点都换成红黑树节点,最后调用setTabAt
方法.也就是将位置 index
处的链表替换为一个红黑树(TreeBin
)的根节点,从而完成了链表到红黑树的转换.
5.tryPresize方法
这个方法在其上一个方法内部被调用用来扩容数组,其底层源码实现如下
private final void tryPresize(int size) { int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { int rs = resizeStamp(n); if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }
扩容机制的实现本身代码量也足够长,拆开分析理解
-
根据当前哈希表的大小来计算合适的
sizeCtl
值,以在初始化或状态转换时进行合适的控制。int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc;
其逻辑在初始化中有类似设计情况,参考仅可,不过要说明的是,size+0.5size+1,向2的幂次取整,也就是2.5size向上取整
- 注:这里的size在传入过来时就已经是原数组两倍的大小了,可以回头去看源码
-
来看while中的第一个if判断
Node<K,V>[] tab = table; int n; if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } }
显而易见就是数组未初始化,根据sc的值赋值给n,然后再用CAS操作对sc进行赋值为-1,表示此时有线程进行初始化,之后那几步其实略有相同,将sc也就是阈值调整为四分之三赋值给
sizeCtl
,这一部分可以参考initTable
方法,基本一样. -
第二个if-else
else if (c <= sc || n >= MAXIMUM_CAPACITY) break;
这个c也就是我们之前重新计算的容量,和我们的阈值进行比较,如果还没到,就返回就是或者数组长度已经是最大值了,表示也不能再进行扩容了,则返回.
-
第三个if-else
else if (tab == table) { int rs = resizeStamp(n); if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); }
其实这个条件判断还是蛮有看头的,tab和table两个引用指向,后者是真正需要用到的引用对象,因此,这个判断主要是为了多线程的情况下,如果table指向其他哈希数组,意味着哈希数组已经发生变化,所以做这一步是为了保障多线程安全,至于下面的
int rs = resizeStamp(n);
则表示扩容标记,用于区分每一个线程的,接下来的if则是表明当前有线程在执行扩容操作.具体if的条件单独拿出来看,因为内部有些复杂if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break;
(sc >>> RESIZE_STAMP_SHIFT) != rs
,RESIZE_STAMP_SHIFT是一个固定的值为16,也就是说一个32位整数右移16位,也就是仅保留高16位结果和rs做判断sc == rs + 1
,这个其实是根据后续CAS操作观察得出的,因为sc这个字段表示容器的一个控制状态,所以如果发生+1的情况,则这种情况就会出现,那么就会break.sc == rs + MAX_RESIZERS
这个则是用来和扩容标记和一个MAX_RESIZERS
做比较nt = nextTable) == null
则是用来判断下一个哈希表是否为空transferIndex <= 0
则表示还未进行扩容或者扩容已经完成,这个字段也是一个属性字段,其意义表示扩容进度的一个表示,大于0,小于0时被赋予的意义也完全不同和sizeCtl
字段同样具备相应的设计意义.
-
最后一个else-if则是将sizeCtl设置为
rs << RESIZE_STAMP_SHIFT) + 2)
,然后执行transfer
方法进行迁移,想必是进行数据迁移else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null);
6.transfer方法
这个就是继数据扩容之后的数据迁移方法,代码量有点长
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
可谓是长的臭人...,因此ConcurrentHashMap底层的难点就在于扩容和迁移两个操作,为了方便理解,进行拆分
-
首先根据数组长度n和CPU核数计算出每个线程需要迁移的元素数量
stride
int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE;
如果
NCPU
大于 1,则将数组长度除以 8 并除以NCPU
,如果结果小于MIN_TRANSFER_STRIDE
则使用MIN_TRANSFER_STRIDE
(固定值16)。 -
如果
nextTab
(新数组)为null
,表示扩容刚开始,会创建一个新的数组nextTab
,并将其赋值给nextTable
,同时将transferIndex
设置为旧数组的长度n
,表示迁移的起始索引。if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; }
说明第一次调用此方法,要迁移的数组还没被初始化,进行初始化之后原来的数组长度n则相当于新数组的中部位置.
-
然后进入一个循环,循环中会根据不同情况进行处理:
为了避免循环代码过长,在进行切分前,先理解其声明的几个变量
int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false;
当无法去理解一个变量的声明的时候,就去英翻汉,外国人都很直接,因此
ForwardingNode
其实表示正在拆迁的节点,观看源码发现,内部仅有一个成员变量就是下一个数组,所以可以把他当搬家公司.-
接下来进入循环,循环第一步是判断如果
advance
为true
,表示需要继续迁移元素。for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } }
for循环依旧还未结束,这里只是表示了while循环内部的过程,有一说一,个人感觉这段代码也不好理解.首先
nextIndex, nextBound
这两个变量表示下一个迁移的起始位置和末尾位置,他会去用i去判断是否超过了末尾位置,finishing这个字段则表示迁移完成为true就说明迁移完成,那么就将advance改为false,如果不行那么去判断迁移字段的值是否小于0,因为扩容之后,迁移字段会更至末尾,也就是新数组的中部位置,正常来表示想要迁移,还没迁移完,小于0则表示迁移完成了,所以advance也就false,和之前transferIndex
讲解的意义是一样的,而最后一个if-else则表示进行CAS竞争,获取到下一次迁移的起始位置,同时修改transferIndex
的值,因为底层使用的是CAS操作,所以他实际上会去找偏移量为TRANSFERINDEX
的位置,而这个位置起始就存储了transferIndex
这个值,他会拿去和nextIndex做比较,在第二次else if中进行了赋值操作,所以值的比较一定是相同的,除非在CAS之前有其他线程进行了CAS操作,因此会改变transferIndex
这个值,同时对nextBound也同样进行操作,然后用bound和i两个临时变量去保存迁移的结束位置和起始位置.这么说可能难理解,用图解的形式看即可.至此,这一块个人认为很难理解的部分就结束了.
- 注:这里迁移的顺序是从大往小,所以会有--i,而不是i++
-
-
判断是否需要结束迁移操作并执行收尾工作
if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } }
- 条件判断的第一个条件if,由于i是下一次起始位置-1,所以可能出现被分完的操作,也有可能迁移完,因此无论哪种都可以表示整个要被分配的数组被分配完了给不同线程.
为什么说不同线程分配完了会有这种结果?
个人是这么理解的,因为上面去划分负责区域的时候,其实每一个if-else语句内部都将advance设置为false,前两种其实也就是没有划分区域的情况,而后者则是为了划分所设置,这意味着每一个线程其实只分配一块区域,然后就退出,所以不同的线程他所负责的区域肯定是不同的,这也就意味着有部分的线程其实他会出现这种情况,即可能本身也没分配到,等待即可.
- 条件判断的第二个判断i>=n这里,个人是这么认为的,根据前文代码给出i其实是nextIndex-1,也就说实际上是数组长度最大n-1的位置,因为nextIndex是根据
transferIndex
这个字段来的,而transferIndex
这个字段一开始是由length决定的(扩容的时候),也就是说,i实际上不可能大于n,这仅我个人的猜测和理解,如有不对,希望给出说明. - 至于第三个i+n>=nextn,这个判断我也觉得有些毛病,nextn本身是新数组的新长度,但是这个新长度在扩容机制里是2倍,在扩容时size传的2倍,所以这个地方感觉有些多余写的.
至于后面的第一个if是这样:如果
finishing
为true
,说明当前线程负责的迁移任务已经完成。在这种情况下,会执行一些清理操作,如将nextTable
设为null
,将table
更新为nextTab
,并根据迁移后的数组大小更新sizeCtl
的值。然后,方法会直接返回,结束线程的迁移任务。第二个if个人感觉就稍微难理解一些:在进行CAS操作时,则对sizeCtl字段进行了更新,如果更新成功说明完成了迁移操作,
(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT
这个地方则是用来表示当前线程是否是最后一个迁移的线程,并且迁移完成,这里有道难理解,后面在理解,后续的代码i=n
,则是为了下一个循环继续检查,因为i刚好等于数组长度n,就会使线程退出迁移循环.else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd);
- 这段代码的意思就是如果要迁移的索引位置节点为空,那么就说明可以迁移,而这里的tab就是我们的原数组,i就是迁移量(会对i计算算出一个这个索引内存位置的偏移量),fwd就是之前说的迁移节点
ForwardingNode
.
else if ((fh = f.hash) == MOVED) advance = true;
- 如果此时迁移的节点位置的哈希值为MOVED,则表示已经移动了,不需要处理,到下一个迁移位置去迁移,就说明了前面几种字段的含义.
-
接下来便是对数组索引节点下的迁移,迁移过程有些长
else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } }
为了方便观感,还是将代码分割开来.
-
先来看第一个if语句判断,他包含了整个迁移过程,如果if没进去,也就没有迁移,所以说这是一个前提条件
tabAt(tab, i) == f
因为是在获取到对象f的锁之后,所以他会去判断一下原表有没有改变这个对象的引用,如果发生改变,就不处理.这也是必要的,保证了并发的问题,因为其他线程可能在获得锁之前将这个节点进行了处理或者放入了迁移节点都是有可能的.
-
之后就是两种节点的处理方法了,一种是链表,一种是红黑树的,先看链表的处理
Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; }
这样看上去舒服多了,为了去判断节点是不是链表,他会先去判断节点的哈希值,如果大于0就说明是我们的链表,这个之前也有了解过.至于
ln
和hn
两个引用后续理解,int runBit = fh & n;
根据英文意思是运行的位或者说经营位,是根据节点哈希值和数组长度算出来的,暂时不清楚先放着,后面的f
也不理解,都放着,看内部的循环for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } }
这个循环里有着一件很重要的事就是每一个节点他都重新计算了运行位,为了理解这个运行位在链表的含义中到底担任什么角色,我们需要回头看一下运行位的计算.
int runBit = fh & n;
这里根据n其实是哈希表数组的长度,其实我们能够知道n=2的幂次,这也就意味着二进制只有一位1,在进行位运算之后,得到的答案只有0和n两种,举个例就明白了:当哈希桶数组的长度n
为8时,它的二进制表示是1000
。假设有两个节点的哈希值分别是18
和25
,它们的二进制表示分别是10010
和11001
。现在我们来看一下运行位的计算:
- 节点哈希值
18
与运行位计算:18 & 7 = 0
,运行位为0
。 - 节点哈希值
25
与运行位计算:25 & 7 = 1
,运行位为n
。
意味着所有的链表节点被划分成两种,这种做法其实和普通的HashMap有些类似,在进行节点的迁移时,也会将链表分成两组,只不过计算索引的方式和划分的方式有些不同,所以根据前者的经验,我们大致理解了其思路,但是坑爹的是他的做法还有些不同,为了理解这个过程用图解的形式:
if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; }
之后就需要根据runbit判断0和n来进行分组了,这里的分组可以发现仅仅分的是lastRun链表后面的,也就是仅仅只分一部分,而这部分一定属于一组而且连续在链表末尾.
for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); }
在之后,就会重新循环遍历一遍链表,终止条件就是遍历到lastRun指向的地方,将前面的链表正常的分到两个链表组里去
setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true;
这个就是将两条链表放到新哈希表对应的位置,然后让旧表用fwd也就是迁移节点声明为迁移位置的对象,就说明迁移完成了.
这个地方这个运算位和lastRun算是迁移设计的核心,正常都会直接遍历直接分,通过这种方式其实避免了最后一段重新创建的开销,也就是lastRun指向的那一组,不管是0还是n都会被直接分到ln或hn中去,这样链表的迁移就完成了!
- 节点哈希值
-
之后就是红黑树节点的迁移了,这种迁移方式比较符合正常思维
else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; }
其实也就是遍历节点,然后计算索引的方法和链表一样,然后分成两个链表去管理,只不过在分组管理的时候还对每一组的节点进行了计数,然后看看是不是要转变成红黑树,下面放到新数组的操作就如出一辙了.
至此,整个迁移的过程就结束了!!!.
-
7.helpTransfer方法
之前在put方法中其实有调用过这个方法,现在回过头看这个方法就简单很多了
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length) << RESIZE_STAMP_SHIFT; while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if (sc == rs + MAX_RESIZERS || sc == rs + 1 || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
-
第一步就是判断表是否为空的情况下,或者说当前这个桶是否为迁移节点并且判断一下是否不为null,不为则说明有新的哈希表
-
第二步则是满足条件,则计算一个扩容戳记
-
第三步则是多线程情况下的一种判断,判断下一个表的引用也就是新的哈希表是否有发生改变,当前哈希表是否指向tab,当前是否是迁移的状态.
-
第四步则是第一个if判断去判断当前线程数是否太多的情况或者说当前哈希表至少有一个线程正在扩容,不需要更多的帮助,就退出
-
第五步则是进行数据迁移,同时让当前线程数加一.
具体不理解的地方在下一个方法中可以得到理解
8.resizeStamp方法
这块代码在此之前出现过好多次,但是一直没有系统的去理解,这里单独拿出来说明一下,这个重置戳记方法是什么含义?
static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }
Returns the stamp bits for resizing a table of size n. Must be negative when shifted left by RESIZE_STAMP_SHIFT.
- 官方给出的解释是:返回用于调整大小为 n 的表大小的戳记位。向左移动 RESIZE_STAMP_SHIFT 时必须为负数。
RESIZE_STAMP_BITS
:默认是16,也就是说(1 << (RESIZE_STAMP_BITS - 1)
的结果是215,也就是15位1;numberOfLeadingZeros
:这个方法也比较鬼使神差,这个传递的参数n是数组长度,而其方法内部具体展开为:
public static int numberOfLeadingZeros(int i) { // HD, Figure 5-6 if (i == 0) return 32; int n = 1; if (i >>> 16 == 0) { n += 16; i <<= 16; } if (i >>> 24 == 0) { n += 8; i <<= 8; } if (i >>> 28 == 0) { n += 4; i <<= 4; } if (i >>> 30 == 0) { n += 2; i <<= 2; } n -= i >>> 31; return n; }
-
这个方法的实现是基于jdk1.5在Interger类下实现的,运用了大量逻辑运算,其实现的核心思想就是通过二分法实现,具体干了一件事根据每一个if来判断
-
第一个if:无符号移位十六位,也就是低16位被移出去,保留了高16位,如果此时为0,说明高16为为0,然后让n+16记录有16个前导0,再移动回去,为了下一次继续移动
-
第二个if:无符号移位二十四位,也就是第8位被依了出去,这次高24位,其目的是和上一次if一样,这次多移动的8位如果同样不存在1,就依旧加8个前导0,然后恢复,继续下一次移动
-
第三个if:无符号移动二十八位,实际上就是检测二十四位后的后四位,同上步一样
-
第四个if:同上步一样
-
第五步:其实也就是检查最高位是否为1的情况,不然前导0就是0了
为什么这么说?
在前面的所有if判断中,没有提及到1如果出现在移位操作的情况,而n之所以初始化为1是因为n进行了所有前导0加和之后依旧最大是31,因为最高位1没有前导0,就是为了预防这种情况的出现,实际上在某次移位出现了最高位1的时候,不会进入复位,而是继续下一次移位操作,他会去检查其中位数的前面一半,依次向下,就能找到前导0的个数了,可以自行举例理解.
-
-
结合之前代码可以得出,例如n=16情况下,按位或进行运算处最高位16位为1的情况下,其余位的取值取决与前导0的个数,也就是二进制
n=16时,返回前导0个数:27(11011),低5位为11011,高27位全部为0,和
RESIZE_STAMP_BITS
进行或运算得到答案是32768
(215) + 前导0(范围0~31),此时为27,因此是:
32795
,单纯看这个数值并没有任何含义,我们要去理解这个值在之前方法中出现的含义.-
从最早的
tryPresize
方法中后半段回看就能明白int rs = resizeStamp(n); if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null);
- 首先判断sc是否小于0,这个变量的原型来自
sizeCtl
,其官网给出的解释是:表初始化和调整大小控制。如果为负数,则表示正在初始化或调整表大小:-1 表示初始化,否则为 -(1 + 活动调整大小的线程数)。否则,当 table 为 null 时,保留创建时使用的初始表大小,或默认值为 0。初始化后,保存要调整表大小的下一个元素计数值。因此小于0说明正在初始化或者有线程正在扩容表,在初始化表中,曾将他初始化为n的0.75倍,所以他有着控制大小,此时他为正数. - 因此先来看else if语句正常线程要进行扩容时,sc发生的变化,通过之前得到的rs扩容戳记有符号左移
RESIZE_STAMP_SHIFT
(16位),然后加2,低16位的第1位一定是1,意味着移动之后,高16位的第1位就会是1,表示负数,而加2是低十六位进行加2,意味着现在处扩容线程数加1,所以目前可以推断出高16位和低16位有着不同的含义.所以他会得到一个扩容戳记但是是为负数的情况,也就是说sizectl
是通过高16位去标记每一个线程,而低16位用来表示线程占有数,比如此次就意味着低16位加2,线程数是2-1的情况也就是1. - 之后如果有其他线程需要调整表时,就会走第一个if判断中,去执行另一个if加1的操作,然后也进行迁移,也就是线程数同样加1.
- 而在进行扩容之前,他会去先判断一下当前的sc的高16位是否等于rs,不等于rs,说明此时数组n发生了改变,已经扩容完毕了,所以不需要进行了(sc >>> RESIZE_STAMP_SHIFT) != rs) ,之后一个判断则说明sc的高16位为0的情况下,低16位等于rs+1,说明此时也有线程正在进行扩容,就break退出,因此在迁移代码过程中,会利用此方法去判断是不是最后一个迁移的线程,就是根据低16位进行的判断.
- 首先判断sc是否小于0,这个变量的原型来自
至此,ConcurrentHashMap的大部分扩容迁移的地方就实现完毕了!
-