ConcurrentHashMap简介
ConcurrentHashMap是HashMap的同步容器,是线程安全的。
ConcurrentHashMap通过CAS和Synchronized来实现线程安全!
ConcurrentHashMap从Segment+ReentrantLock改为Node+Synchronized的原因
因为ConcurrentHashMap已经将锁细化到了一个槽中了,冲突不会太大。
Synchronized在重量锁的时候有自旋锁的行为,不会一下就挂起。
而ReentrantLock的话,只要当前线程只要不是排在第二个等待节点就会被挂起,消耗资源。
线程的挂起和唤醒会导致线程之间的切换,会频繁的导致内核态和用户态之间的切换,十分耗费资源。
所以综合而言,Synchronized的性能会比较好,并且占用内存小。
ConcurrentHashMap源码详解
Put()
public V put(K key, V value) {
return putVal(key, value, false);
}
static final int spread(int h) {
//HASH_BITS = 0x7fffffff 0111 1111 1111 1111 1111 1111 1111 1111
return (h ^ (h >>> 16)) & HASH_BITS;
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//获取hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果table为空
if (tab == null || (n = tab.length) == 0)
//进行初始化
tab = initTable();
//如果table已经初始化,下标为(n - 1) & hash,相当于hash % n,但是对应的槽位还未初始化
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//初始化Node,并将Node设置为tab[i],设置成功就退出
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
//如果f.hash = MOVED,说明当前槽位正在扩容,先帮助扩容,扩容完毕后再插入
else if ((fh = f.hash) == MOVED)
//帮助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//用槽的第一个Node作为锁对象
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {//红黑树有一个哨兵节点,hash值为-2,如果fh>0说明是链表
binCount = 1;//binCount代表着链表中的个数
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果存在对应的key
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//到了链表末尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果是红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//调用红黑树的put方法
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果超过树化阈值,升级为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//将size++,如果超过阈值,会触发扩容
addCount(1L, binCount);
return null;
}
总结以下put()方法,和HashMap的put()方法没有什么大差别,就是多了CAS的操作以及对槽中的第一个Node加锁。
还有的话就是,如果插入的时候,Map正在扩容,会先帮助扩容,然后再插入。
我们先说下初始化initTable(),然后再说下扩容
initTable()
说完了插入方法,我们说下初始化
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//可能同时有多个线程同时初始化,自旋CAS来初始化,判断依据是sc
//如果sc = -1,说明有其他线程在初始化
//如果sc > 0 说明初始化完成了
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); //有其他线程在初始化了,当前线程可以停止了
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//如果CAS成功将sizeCtl更新成了-1
try {//如果table为空
if ((tab = table) == null || tab.length == 0) {
//如果初始化容量了,则设置为sc,否则设置为默认容量16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//初始化table
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//将sc设置为0.75*n,下一次扩容的阈值
sc = n - (n >>> 2);
}
} finally {
//扩容的阈值
sizeCtl = sc;
}
break;
}
}
return tab;
}
addCount()
HashMap中是用一个long 变量来表示Map中元素个数。而ConcurrentHashMap中则是用一个CounterCell数组+一个baseCount来表示的。
为什么会这样费劲的做呢?
直接定义一个long变量不就行了?假设我们只定义一个long变量 size,ConcurrentHashMap是支持并发的,很有可能会有很多线程同时CAS更新size,因为有很多线程,更新一次很可能会失败,所以需要自旋来一直CAS更新,直至成功。自旋会浪费很大的资源。
将size分为n个CounterCell,这样就大大的降低了线程更新的冲突,性能更好!
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
private transient volatile long baseCount;
ConcurrentHashMap中统计个数是调用的sumCount()方法
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
从sumCount()方法我们可以看出,sum = baseCount+CountCell中的元素和。
将size分到了n个CounterCell和一个baseCount中。
CountCells数组的长度也是2的幂次,里面存的有一个long值。
所以sum = baseCount+CountCell中的元素和。
当一个节点入队的时候,会首先利用CAS将baseCount++,如果CAS修改baseCount失败会随机挑选一个CounterCell,然后CAS修改的CountCell的变量。
下面的代码是addCount()的前半段,负责将元素个数+1,后半段是用来扩容的,我们先看前半段。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//如果counterCells == null,就将CAS将baseCount++
//如果CAS成功,就代表更新成功
//如果CAS不成功,就执行方法里面的代码
//如果counterCells != null,直接执行方法里面的代码
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
//是否 不存在其他线程竞争
boolean uncontended = true;
//如果counterCells不为null,并且随机在counterCells中挑选一个槽位的CounterCell不为null,并且CAS 来增加size成功,就跳过下面的代码段
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//如果counterCells == null 或者 随机挑选的CounterCell 为null,或者CAS更新CounterCell失败,这时uncontented = false,说明存在其他线程竞争,执行fullAddCount方法
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
//s为当前节点个数
s = sumCount();
}
如果CAS更新baseCount失败并且CAS更新CounterCell失败的话,就调用fullAddCount,具体代码如下:
//执行到这个方法说明有以下三个情况
//1、CounterCells == null
//2、CounterCell == null
//3、CAS更新CounterCell失败,这时unContented = false
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
//h是要更新的CounterCells下标,是一个随机数
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false;
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
//CounterCells不为null
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {//如果对应的CounterCell为null
if (cellsBusy == 0) {//当没有人扩容的时候
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&//将cellBusyCAS设置为1
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {//将CounterCells[j]设置为r
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
//代表更新成功
cellsBusy = 0;
}
if (created)//更新成功返回
break;
continue; // 被别人抢先了
}
}
collide = false;
}
else if (!wasUncontended) //之前CAS更新失败了
wasUncontended = true;
//将对应的CounterCell更新
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
//说明有其他线程将CounterCells扩容了
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
//将CounterCells扩容,当连续两次循环都没有更新成功的话,说明这时并发程度较高,会尝试扩容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
//这时cellsBusy = 0,并且counterCells=as =null,说明CounterCells还未被初始化
//当满足上面两个要求,就尝试CAS将cellBusy改为1
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try {//初始化CounterCells,长度为2
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
//并将rs[0]设置为x
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
//将cellBusy = 0,这也就是为什么额外设置counterCells == as这个条件来控制初始化了
cellsBusy = 0;
}
if (init)//初始化成功,就退出死循环,代表addCount成功
break;
}
//尝试将baseCount++
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;
}
}
扩容
addCount()方法的后半段是用来扩容的。
//如果是新加入节点的话,也就是put的时候,check=1,会检测是否需要扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//如果节点个数大于阈值sizeCtl,sizeCtl=length*0.75
//并且length 小于MAXIMUM_CAPACITY 最大容量
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);//获取扩容的时间戳,扩容前的长度不一样,时间戳也不一样。
// static final int resizeStamp(int n) {
// Integer.numberOfLeadingZeros(n)是获取n的高位连续的0的个数count,因为n是2的幂次,所以n不一样,对应的高位连续的0的个数count也不一样
// RESIZE_STAMP_BITS=16, 将count | 1<<15,也就是说后16位代表扩容的时间戳,也就是版本号
// return Integer.numberOfLeadingZeros(n) | (1 << //(RESIZE_STAMP_BITS - 1));
// }
//sc = sizeCtl,正常情况下是代表当前扩容的阈值,当<0说明正在扩容
if (sc < 0) {//别的线程已经抢先扩容了
//如果扩容完毕了,就break退出
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//如果还未扩容完,就帮助扩容,将sizeCtl+1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//还未扩容,当前线程是第一个扩容的线程
//将sizeCtl设置为rs<<16,并且+2,我们之前说了rs的后16位代表扩容时间戳版本,将其右移16,现在就说明sizeCtl的高16位代表扩容时间戳版本,低16位代表扩容的线程数+1,
//rs<<16+2肯定小于0,因为rs的倒数第十六位为1,然后左移16位,rs的第一位肯定为1,所以肯定小于0
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
Transfer()
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//tab是旧table,nextTab是扩容后的table
int n = tab.length, stride;
//stride是一次性负责搬运槽位的个数。
//如果是单核就设置stride为tab的length
//如果是多核,就设置为 n / 8/ cpu的个数,如果stride<MIN_TRANSFER_STRIDE(16)的话,就设置stride为16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
//如果nextTab还未初始化,就初始化nextTab,只有第一个线程才会初始化
if (nextTab == null) {
try {
//扩容为原来的2倍,n<<1
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//transferIndex = 旧table的长度
transferIndex = n;
}
//首先我先概略说一下,ConcurrentHashMap的扩容原理,
//需要扩容的槽位个数有transferIndex,一个线程开始搬运的时候,会设置一个上下界,上界i为transferIndex-1,下界bound为transferIndex - stride-1,然后将transferIndex -= stride,接着开始搬运[bound - i]区间中的槽位
//扩容后的长度
int nextn = nextTab.length;
//扩容的设置标志
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//是否分配了搬运区间
boolean advance = true;
//扩容是否结束
boolean finishing = false;
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//还未分配搬运区间
while (advance) {
int nextIndex, nextBound;
//如果上届i >= bound,说明还未搬运完,或者扩容结束了,这两种情况都不需要再申请搬运区间了。
if (--i >= bound || finishing)
advance = false;
//如果transferIndex <= 0,说明搬运区间已经被分配光了,将i设置为-1
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//分配搬运区间,将transferIndex-=strid
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//设置下界
bound = nextBound;
//设置上界
i = nextIndex - 1;
advance = false;
}
}
//如果i<0,说明搬运区间分配完了。搬运区间分配完了,不代表搬运完了,有可能其他线程还在搬运呢
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//如果搬运结束了
if (finishing) {
nextTable = null;
//将table设置为nextTab
table = nextTab;
//将sizeCtl设置为0.75*nextTab.length
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//如果其他线程还未搬运完,就将sizeCtl-1,因为我们之前说过,除了第一个扩容的线程将sizeCtl设置为 resizeStamp(n) << 16 + 2,其他帮助扩容的线程再进来之前会将sizeCtl+1的,所以再搬运完毕后,需要将sizeCtl-1。只有当所有线程都-1之后,sizeCtl = resizeStamp(n) << 16 + 2的时候,才说明所有线程都搬运结束了。
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//如果 sizeCtl != resizeStamp(n) << 16 + 2,说明其他线程还未搬运完,自己就返回吧。插不上脚了
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//搬运完毕
finishing = advance = true;
i = n;
}
}
//执行到这个分支的时候,就说明开始搬运了,
//如果老table中第i个槽位的抵押给节点为空,就将其第一个节点设置为fwd,fwd的hash = MOVED,然后就继续执行循环,会i--的。
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果f.hash等于MOVED,说明当前槽位为空,就继续执行循环,会i--的。
else if ((fh = f.hash) == MOVED)
advance = true;
else {
//当前槽位不为空,开始搬运了。
//也是将槽位的第一个node 锁定
//槽位中的node分为两类,一种是呆在原来下标i的,还有一种是挪到i+n的。因为我们的槽位下标是通过hash%n确定的,并且n都是2的幂次,32位中只有一个1.
//我们可以通过hash&n来确定是否需要搬运到i+n槽位中,如果hash&n==0,说明hash对应的n的那一位是为0的。如果hash&n==1,说明hash对应的n的那一位是为1的,hash%(2*n) = n + hash%n的.
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
//lastRun是从后往前连续的搬运到同一个槽位的最后一个节点,这样的话当再搬运的时候,遍历到lastRun时,就不用再往后遍历了。
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//如果runBit == 0,说明最后一段节点属于在老位置的
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {//最后一段节点在新位置
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
//老位置节点,ln是老位置槽位的头节点
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
//新位置节点,hn是新位置槽位的头节点
hn = new Node<K,V>(ph, pk, pv, hn);
}
//将新table中新老槽位的节点分别设置
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
//将老table中老位置节点设置成fwd
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
最后总结一下,第一个扩容的线程会将新table初始化,然后申请搬运区间,开始搬运连续的stride个槽位,搬运完之后,会继续申请搬运区间,搬运stride个节点,直到搬运区间申请不到了,就退出。
其他普通线程和抵押给扩容的线程类似,除了新table初始化。
接着说一下helpTransfer()
helpTransfer()
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//如果tab已经初始化了,并且对应的槽位为ForwardingNode,并且新table不为null
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//扩容对应的时间戳
int rs = resizeStamp(tab.length);
//如果sc<0说明还在搬运
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//如果进来之后搬运玩了,就退出
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//开始尝试搬运,将sizeCtl+1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
通过联系transfer()和put()两个方法我们可以看出,如果一个槽位还未被搬运到,这个时候有线程往这个槽位中放元素,会尝试将占领槽位第一个node的锁,然后往里面放入元素。
之后放入的元素会在之后的搬运过程中,搬运到新table中。
remove()
public V remove(Object key) {
return replaceNode(key, null, null);
}
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
else if ((fh = f.hash) == MOVED)
//如果正在搬运就帮助搬运,等待搬运后再删除
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
//锁定槽位对应的第一个node
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
从源码看出,如果正在扩容,就会先帮助扩容。否则就锁定槽位对应的第一个node,开始删除节点。
总结
通过源码可以看出,ConcurrentHashMap和HashMap的区别就是
1、引入了sizeCtl,sizeCtl有三种状态
1.1、0:还未初始化
1.2、-1:正在初始化
1.3、<0但是不等于-1,此时表示正在扩容。第一个扩容的线程会将sizeCtl设置为sizeStamp(n)<<16+2
1.4、大于0.这时为扩容的阈值,为n*0.75
2、通过Synchronized将对应的槽位的第一个节点锁住,只会锁住一个槽,而不是一整个table
3、扩容:
是将n个槽分为若干个连续的组,一个组的长度为固定的stride,stride=n/8/cpu个数,如果小于16会将其赋值为16。
并允许多个线程同时扩容,每个线程都有其搬运的上下界区间。上界由一个transferIndex变量控制,transferIndex初始值为n,每个线程再申请搬运区间的时候,会将transfer -= stride,其对应的区间为[transferIndex -stride,transferIndex],然后开始搬运区间中对应的槽位。搬运完这stride个槽位后,如果发现transferIndex>0,还会继续搬运,直到transderIndex < 0 说明搬运区间已经被申请完了,但是这时未必全部都搬运完成了,申请搬运区间到搬运完成是需要一个过程的。
sizeCtl与扩容状态也有关,sizeCtl的高16位代表着扩容的时间戳,后16位对应着搬运的线程数。第一个扩容的线程会将sizeCtl设置为sizeStamp(n)<<16+2,当线程插入或者删除的时候,发现正在扩容,首先会帮助扩容,当扩容完毕后才会执行插入或者删除操作。具体做法是将sizeCtl+1,然后申请搬运区间并搬运,当线程申请不到搬运区间的时候,将sizeCtl-1,当sizeCtl == sizeStamp(n)<<16+2的时候,说明搬运完成。