ConcurrentHashMap源码解析第二部分,着重分析ConcurrentHashMap添加元素、删除元素、扩容、查找元素的具体流程
1、添加元素
添加元素用到的是putVal方法
public V put(K key, V value) {
return putVal(key, value, false);
}
//onlyIfAbsent为true时找到相同节点不替换 为false就替换
final V putVal(K key, V value, boolean onlyIfAbsent) {
//k和v不能为null
if (key == null || value == null) throw new NullPointerException();
//计算key的hash值
int hash = spread(key.hashCode());
//binCount表示当前k-v 封装成node插入到指定桶位后,在桶位中的所属链表的下标位置 =0时当前桶位为null可以直接插入
int binCount = 0;
//binCount>0 表示当前桶位节点数大于等于1,遍历链表,将封装k-v的新node放入链表末尾,并记录链表末尾的下标,如果binCount =2 表示当前桶位可能已经树化为红黑树了
for (Node<K,V>[] tab = table;;) {
//f是头结点 n是table长度 i是桶位下标 fh是头结点hash值
Node<K,V> f; int n, i, fh;
//table还没初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable(); //初始化table
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //table已经初始化 根据寻址算法确定一个桶位并且桶位没元素, f是在这里被赋值的
if (casTabAt(tab, i, null, //封装k v使用cas插入桶位 插入成功为true 插入失败为false
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin 插入成功就退出循环
}
//table已经初始化 头结点的hash值为-1 表示当前是FWD节点,表示头结点已经被迁移了,此时正在处于扩容状态
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); //当前线程加入到扩容过程中
else { //当前桶位存放的节点可能是链表或者红黑树代理节点 TreeBin
V oldVal = null; //替换成功会把替换前的值赋给这个引用
//给头结点加锁
synchronized (f) {
//再对比一次防止头结点加锁之前被其他线程修改 修改之后加锁也没有意义 因为不是原来的头结点
if (tabAt(tab, i) == f) {
//头结点hash值大于0是链表节点 当前插入key与链表当中所有元素的key都不一致时,当前的插入操作是追加到链表的末尾,binCount此时表示链表长度
if (fh >= 0) {
binCount = 1; // 当前插入key与链表当中的某个元素的key一致时,当前插入操作可能就是替换了。binCount表示冲突位置(binCount - 1)
for (Node<K,V> e = f;; ++binCount) { // 遍历当前桶位的链表,e是每次循环处理节点。
K ek;
//找到目标节点
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
//onlyIfAbsent为false就进行更新操作
if (!onlyIfAbsent)
e.val = value; //更新操作
break; //找到就退出循环
}
Node<K,V> pred = e; //更新尾结点
if ((e = e.next) == null) { //遍历下一个节点,如果当前节点的下一节点为null 表示遍历到链表尾部没找到元素就用尾插法插入元素
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { //头结点是红黑树代理节点TreeBin
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, //更新操作p不为null,插入操作p为null
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent) //更新操作
p.val = value;
}
}
}
} //说明当前桶位不为null,可能是红黑树也可能是链表
if (binCount != 0) { //如果binCount>=8 表示处理的桶位一定是链表
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); // 调用转化链表为红黑树的方法进行树化
if (oldVal != null) // 说明当前线程插入的数据key,与原有k-v发生冲突,需要将原数据v返回给调用者
return oldVal;
break;
}
}
}
//只有新增元素才会走到这里 因为更新元素的话oldVal有值会直接返回不走这里
addCount(1L, binCount);//这个方法的作用是统计当前table一共有多少数据,并判断table是否达到扩容阈值标准,触发扩容。
return null;
}
整体流程:
(1)桶数组如果还没初始化,就先初始化
(2)根据元素的key计算hash值,然后用hash值去定义桶位
(3)桶位如果为空,就直接插入元素
(4)桶位的元素如果被迁移了,说明当前桶处于扩容过程,当前线程应该要先帮助完成扩容,扩容之后再添加元素
(5)桶位有元素,且没有被迁移,就锁住这个桶位
(6)如果桶位元素是链表节点,就遍历链表添加或更新元素,
(7)如果桶位元素是红黑树节点,就往红黑树添加或更新元素
(8)如果元素存在,返回旧值,不走第9步
(9)如果元素不存在,桶的元素个数+1,,然后判断添加元素后的桶是否要扩容
2、删除元素
public V remove(Object key) {
return replaceNode(key, null, null);
}
//cv是原值 cv表示找到的节点的值要和这个一样才进行更新或删除操作
//cv如果为null就不用比较cv和找到的节点的值
//value表示找到节点后要替换成这个值,如果为空就是删除操作
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode()); //算出key的hash值
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; //f是桶位头结点 n是table长度 i是桶位下标 fh是桶位头结点hash值
if (tab == null || (n = tab.length) == 0 || //table还没初始化
(f = tabAt(tab, i = (n - 1) & hash)) == null) //对应桶位没元素
break;
//对应桶位头结点被迁移了表明处于扩容状态即写状态就帮助其他线程扩容 扩容完之后进行下一循环就可以删除元素
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null; //存旧值的引用
boolean validated = false; //标记是否处理过
synchronized (f) { //锁住头结点
if (tabAt(tab, i) == f) { //防止加锁前头结点被修改
if (fh >= 0) { //大于0表示目前没有进行扩容和当前节点不是树节点 即是链表节点或单个节点 遍历链表
validated = true;
for (Node<K,V> e = f, pred = null;;) { //pred是前驱节点 e是当前节点
K ek; //头结点是要删除的目标节点的key
//key和hash都一样表示找到目标节点
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
//原值为null即删除操作 cv等于ev是替换操作
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
//更新操作
if (value != null) //value不为空是替换操作
e.val = value;
//删除操作
else if (pred != null) //当前节点不是头结点
pred.next = e.next; //前驱节点指向当前节点的下一个节点 1 -> 2 -> 3变成 1 -> 3
else
setTabAt(tab, i, e.next); //当前节点是头结点 直接把桶位元素设置为下一个节点
}
break;
}
pred = e; //把当前节点变成pred给下一个循环使用
if ((e = e.next) == null) // 遍历到链表尾部还没找到元素,跳出循环
break;
}
}
else if (f instanceof TreeBin) { //头结点是树节点
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f; //转换类型
TreeNode<K,V> r, p; //r是树的根节点 p目标节点的引用
//根节点不为空
if ((r = t.root) != null &&
//找到目标节点
(p = r.findTreeNode(hash, key, null)) != null) { //以当前节点为入口,向下查找key(包括本身节点)
V pv = p.val; //找到就取出对应节点的val
//原值cv为空就不用比较直接做更新或删除操作 否则就比较cv和找到的节点的值是否一样
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv; //保留旧值
//更新操作
if (value != null) //替换的值不为空就替换
p.val = value;
//删除操作
else if (t.removeTreeNode(p)) //否则就删除 t.removeTreeNode(p)这个方法返回true表示删除节点后树的元素个数较少 较少的话就退化成链表
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
} //如果处理过,不管有没有找到元素都返回
if (validated) { //当其他线程加锁前修改过桶位头结点时,当前线程 sync 头结点 锁错对象时,validated 为false,会进入下次for自旋:
if (oldVal != null) { //找到就返回旧值
if (value == null) //替换后的值value为null表示是删除操作
addCount(-1L, -1); //元素减1,第二个参数-1表示不用判断是否需要扩容
return oldVal; //返回旧值
}
break;
}
}
}
return null; //找不到返回null
}
整体流程:
(1)如果table还没初始化,或根据hash值找到的桶位没元素,就返回null表示没有删除元素
(2)如果对应桶位的节点是FWD节点,则表示节点被迁移了,当前处于扩容状态,当前线程应该要协助完成扩容,扩容完之后进行下一循环就可以删除或更新元素
(3)锁住桶位节点,判断节点hash值是否大于0,大于0表示没有进行扩容和当前节点不是树节点,当前节点是链表节点,遍历链表删除或更新元素,cv为null就不用比较cv和找到的节点的值,value不为null是更新操作,为null是删除操作
(4)如果头结点是树节点,就根据红黑树的方式进行删除或更新元素,如果是删除则要判断删除后是否要退化成链表
(5)如果是更新操作就返回旧值,删除操作就table数量减1
3、扩容
和扩容有关的几个方法是transfer,helpTransfer和addCount
transfer是直接扩容
helpTransfer是发现了当前节点是FWD节点,处于扩容状态,参与到扩容的过程
addCount是添加元素后判断是否需要扩容
3.1、transfer方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;// n 表示扩容之前旧table数组的长度 stride表示分配给线程任务的步长,一个线程最多处理多少个桶位
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range 控制线程迁移数据的最小步长(桶位的跨度~)
// nextTab为空表示当前线程为触发本次扩容的线程,即第一个参与扩容的线程,需要做一些扩容准备工作
if (nextTab == null) {
try { //不为空表示当前线程是协助扩容的线程
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 创建一个比扩容之前容量n大一倍的新table
nextTab = nt; //赋值给nextTab
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
//nextTable是成员变量(全局变量) nextTab是方法参数(局部变量)
nextTable = nextTab;// 赋值nextTab对象给 nextTable ,方便协助扩容线程 拿到新表
transferIndex = n; // 记录迁移数据整体位置的一个标记,初始值是原table数组的长度。可以理解为:全局范围内散列表的数据任务分配到哪个桶的位置了
}
//新线程加入之后,是根据transferIndex给新线程分配任务的 例如32扩容64时,transferIndex初始值是32,第一个线程进到这个方法会被分配处理第17-第32个桶位,然后transferIndex变为16,下一个线程参与扩容时就被分配处理第1-第16个桶位
int nextn = nextTab.length; //新数组长度
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//当某个桶位数据迁移处理完毕后,将此桶位设置为fwd节点,其它写线程或读线程看到后,会有不同逻辑。fwd结点有指向新表nextTab的引用
boolean advance = true; //推进标记 桶位未迁移该标记为false,桶位迁移了该标记为true
boolean finishing = false; // 完成标记
//i 表示分配给当前线程的数据迁移任务,任务执行到的桶位下标(任务进度~表示当前线程的任务已经干到哪里了~) bound 表示分配给当前线程任务的下界限制。(线程的数据迁移任务先从高位开始迁移,迁移到下界位置结束)
//这里的i和bound都是局部变量,是线程私有的,所有每个线程进来时i和bound一开始都是1,transferIndex是成员变量,全局的,第一个线程会把transferIndex从32更新成16,nextIndex, nextBound分别是31和16,第二个线程会把transferIndex从16更新成0,nextIndex, nextBound分别是15和0
for (int i = 0, bound = 0;;) {
//f 桶位头结点 fh 桶位头结点的hash值
Node<K,V> f; int fh;
//给当前线程分配任务区间 维护当前线程任务进度(i表示当前处理的桶位)维护map对象全局范围内的扩容进度
while (advance) {
int nextIndex, nextBound; //i的值会从n-1依次递减 其中n是旧桶数组的大小,也就是说i从15开始一直减到1这样去迁移元素
/**
下面三个if分别表示:1、当前线程分配了任务还没完成
2、所有桶位都分配完了
3、分配任务
**/
//分配了任务还没完成
if (--i >= bound || finishing) //表示当前线程的任务尚未完成,还有相应的区间的桶位要处理,--i 就让当前线程处理下一个桶位,第一次循环这个if不会成立,因为i一开始是0,0-1后是小于bound的
advance = false; //设为false目的是退出当前循环
//把transferIndex赋值给nextIndex,如果小于等于0表示所有桶处理完了 transferIndex的变化32 -> 16 -> 0
//桶位已经全部分配了
else if ((nextIndex = transferIndex) <= 0) {
i = -1; //上个if不成立会来到这里表示当前线程任务已完成 或者未分配
advance = false; //true表示对象全局范围内的桶位都分配完毕了,没有区间可分配了,设置当前线程的i变量为-1 跳出循环,然后执行退出迁移任务相关的程序
} //false表示对象全局范围内的桶位尚未分配完毕,还有区间可分配
//i = 0时,即第一次循环时会走到这里 走到这个if表示 1、当前线程需要分配任务区间 2.全局范围内还有桶位尚未迁移
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex, //cas给当前线程分配任务 这里transferIndex会被设置成任务的下界 下个线程进到这个方法时transferIndex,即nextIndex就是上个线程任务的下界 例如TRANSFERINDEX本来是32 然后更新成16 下个线程来到这里时就把TRANSFERINDEX从16更新成0,如果nextIndex为16没有大于stride nextBound就为0
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound; //分配任务成功设置任务下界
i = nextIndex - 1; //设置任务初始进度 第一个线程的任务进度是31表示从第32个桶位开始处理 然后处理到第17个桶位 当处理完第17个桶位 --i >= bound 即15 >= 16这个条件不成立就说明当前线程的任务做完了
advance = false;
}
} //i < 0表示所有桶位都被分配完了,然后所有线程会进入这个if把sizeCtl减1表示参与扩容的线程完成了分配的任务,把参与扩容的线程减1然后退出这个transfer方法
// i >= n 当前处理的范围大于旧链表最大长度,已经不需要拷贝越界数据
// i + n >= nextn 。nextn表示新哈希表长度,如果当前长度超过新哈希表长度,是不合法的
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { //这里为true表示扩容结束
nextTable = null;
table = nextTab;// 如果全部桶中数据迁移完成了,则用新数组替换旧桶数组
sizeCtl = (n << 1) - (n >>> 1);// 并设置下一次扩容门槛为新桶数组容量的0.75倍
return;
} //当前线程扩容完成,把并发扩容的线程数-1 cas更新
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) //这个条件成立说明当前线程不是最后一个退出transfer任务的线程
return;
finishing = advance = true;// 完成标记finishing置为true表示扩容完成,finishing为true才会走到上面的if扩容结束条件判断
i = n; // recheck before commit i重新赋值为n 这样会再重新遍历一次桶数组,看看是不是都迁移完成了 也就是第二次遍历都会走到下面的(fh = f.hash) == MOVED这个条件
}
} //来到这里说明当前线程任务尚未处理完 正在进行中 进行元素的迁移
else if ((f = tabAt(tab, i)) == null) //这个条件成立说明当前桶位未存放数据,只需要将此处设置为fwd节点即可
advance = casTabAt(tab, i, null, fwd); //cas把i位置的元素更新成fwd节点 设置成功表示当前桶位已经处理
else if ((fh = f.hash) == MOVED) //如果桶中第一个元素的hash值为MOVED,说明头节f它是ForwardingNode节点,
advance = true; // already processed 表示当前桶位已经被迁移了,当前线程不用再处理了,直接再次更新当前线程任务索引,再次处理下一个桶位 或者 其它操作
else { //来到这里说明当前桶位有数据,而且node节点不是fwd节点,说明这些数据需要迁移
synchronized (f) { //头结点加锁
if (tabAt(tab, i) == f) { //再判断是防止加锁前f节点被其他线程修改
Node<K,V> ln, hn; //把链表拆成两个 低位链表不变 高位链表是原来的位置+容量
if (fh >= 0) { //当前桶位是链表节点
int runBit = fh & n;
Node<K,V> lastRun = f; //与HashMap不同的是多了一步寻找lastRun 例如 0 0 4 4 0 0 0最后三个不用处理 因为都是在一个桶里
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) { //看看最后几个元素在高位还是低位
ln = lastRun; //在低位
hn = null;
}
else {
hn = lastRun; //在高位
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln); //新节点指向旧节点 头插法
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);//低位链表的位置不变
setTabAt(nextTab, i + n, hn); //高位链表的位置是原位置+旧容量
setTabAt(tab, i, fwd);//标记当前桶已被迁移
advance = true; // advance为true,返回上面进行--i操作 第2369行
} //表示当前桶位是 红黑树 代理结点TreeBin
else if (f instanceof TreeBin) { //也是会分化成两棵树
TreeBin<K,V> t = (TreeBin<K,V>)f; //转换头结点为 treeBin引用 t
TreeNode<K,V> lo = null, loTail = null; // 低位双向链表 lo是头 tail是尾
TreeNode<K,V> hi = null, hiTail = null; // 高位双向链表 hi是头 tail是尾
int lc = 0, hc = 0; //低位高位元素数量
for (Node<K,V> e = t.first; e != null; e = e.next) { // 遍历整颗树(TreeBin中的双向链表,从头结点至尾节点),根据hash&n是否为0分化成两颗树:
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V> // 使用当前节点 构建出来的新的TreeNode
(h, e.key, e.val, null, null);
if ((h & n) == 0) { //低位
if ((p.prev = loTail) == null) //低位链表还没节点
lo = p;
else
loTail.next = p; //尾插法
loTail = p; //更新尾指针为当前节点
++lc;
}
else { //高位
if ((p.prev = hiTail) == null) //高位链表还没节点
hi = p;
else
hiTail.next = p; //尾插法
hiTail = p; //更新尾指针为当前节点
++hc;
}
} // 如果分化的树中元素个数小于等于6,则退化成链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln); // 低位树的位置不变
setTabAt(nextTab, i + n, hn); // 高位树的位置为原来位置 + 容量
setTabAt(tab, i, fwd); // 标记该桶已迁移
advance = true; //继续处理下一个桶位
}
}
}
}
}
}
整体流程:
(1)如果新table为空则表示当前线程是触发扩容的线程,即第一个进行扩容的线程,需要进行扩容前的一些准备工作:新建一个table,大小是旧table的两倍,然后把这个table赋值给成员变量nextTable,作用是给其他协助扩容的线程使用,其他协助扩容的线程会把旧table的数据迁移到这个nextTable,初始化transferIndex,初始值为旧table的长度,表示从旧table的最后一个桶位开始迁移
(2)实例化一个FWD节点,作用是旧table的桶位被迁移之后就把桶位设置成FWD节点表示这个桶位已经被迁移了,FWD节点有一个nextTable的引用表示这个FWD是哪个nextTable的
(3)初始化推进标记为true,扩容完成标记为false,先判断当前线程是否完成了分配的迁移任务,用–i >= bound或finishing扩容完成标记判断,bound是任务的最后一个桶位,最后一个线程完成了扩容并再次检查后会把finishing扩容完成标记设为true
(4)把transferIndex赋值给nextIndex作为任务的起点,如果nextIndex小于等于0表明所有桶位都被分配了
(5)CAS设置当前线程的任务,把transferIndex cas更新成 当前任务的下界,给下一个参与扩容的线程使用,例如32cas更新成16,然后下一个线程就用16来分配任务,当前任务起点为nextIndex – 1,因为数组索引从0开始,下界为更新后的transferIndex
(6)i小于0表明已经处理完了所有桶位,然后所有参与扩容的线程会cas把sizeCtl的值-1,表明参与扩容的线程少了一个,然后判断当前线程是否是最后一个参与扩容的线程,如果是的话就把finishing扩容完成标记置为true,然后把i恢复为数组长度大小,目的是再次检查是否还有桶位没有被迁移
(7)给当前线程分配完任务之后就可以开始迁移桶位了,首先判断下标为i的桶位是否为空,空的话就把FWD节点放入这个桶位
(8)如果当前桶位的节点的hash值是MOVED即-1则表明当前桶位已经被迁移了,进行下一个循环
(9)如果当前桶位的节点不为空并且还没被迁移就把头结点锁住,然后开始迁移节点,判断节点是否是链表节点,如果链表节点就把链表分成两段分别在原来的位置和原来的位置加旧table容量,然后把两条链表设置进新table,设置完之后就把FWD节点放入桶位表明当前桶位已经被迁移
(10)如果当前桶位的节点是红黑树节点,就遍历整颗树,是用TreeBin中的双向链表遍历的,然后按照节点hash值和旧table大小与运算是否为0分成低位高位两棵树,并判断分成的两棵树是否需要退化成链表,然后把两棵树设置进新table,设置完之后就把FWD节点放入桶位表明当前桶位已经被迁移
再简单概况就是:
先判断是否是参与扩容的第一个线程,是的话就新建一个新table大小是旧table的两倍并初始化任务下边,然后判断当前线程是否分配任务了,分配任务之后就开始迁移桶位,桶位为空就cas把FWD节点放入桶位表示已经被迁移了,桶位不为空就锁头结点然后把链表或树分成两段迁移到新table,然后把FWD节点放入桶位表示已经被迁移了,完成分配任务继续判断桶位是否被分配完了,如果还没分配完就继续分配,如果分配完所有参与扩容的线程都把sizeCtl减1表明当前线程已经完成了任务退出transfer方法,如果是最后一个退出transfer方法的线程就把finishing扩容完成标记置为true表示完成扩容,然后检查是否还有桶位没有被迁移
用图表示流程:
3.2、helpTransfer方法
//协助扩容(迁移元素),当线程添加元素时发现table正在扩容,且当前元素所在的桶元素已经迁移完成了,则协助迁移其它桶的元素。当前桶元素迁移完成了才去协助迁移其它桶元素
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) && //旧table不为空并且当前桶位已经迁移完了并且新table不为空 说明当前桶已经迁移完毕了,才去帮忙迁移其它桶的元素
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//根据旧table计算扩容标识戳 相当于一个凭证
int rs = resizeStamp(tab.length);
//第一个条件nextTab == nextTable成立说明当前扩容正在进行中 不成立时的情况有扩容完毕nextTable置为null或再次扩容了nextTable更新了
//第二个条件table == tab成立说明扩容正在进行中,还未完成 不成立:说明扩容已经结束了,扩容结束之后,最后退出的线程把table更新为扩容后的table
//第三个条件(sc = sizeCtl) < 0成立说明正在进行扩容 不成立:说明sizeCtl当前是一个大于0的数,此时代表下次扩容的阈值,当前扩容已经结束
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//第一个条件表示不是本批次扩容
//第二个条件表示扩容完毕
//第三个条件表示扩容线程数已达到最大值
//第四个条件表示任务已经分配完毕 即所有桶已经分配好了给其他线程
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break; //四个条件其中一个成立都不会协助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //扩容线程加1
transfer(tab, nextTab); //帮助扩容
break;
}
}
return nextTab;
}
return table;
}
整体流程:
(1)如果旧table不为空且当前桶位已经被迁移了
(2)根据旧table计算本次扩容标识戳
(3)判断当前FWD节点的nextTable是否等于成员变量nextTable,不等的话表示扩容完成 成员变量nextTable被置为null
(4)判断旧table tab是否等于成员变量table,不等的话表示扩容完成 成员变量table被更新成扩容后的table
(5)判断sizeCtl是否小于0,小于0才是扩容状态,如果大于等于0就不是扩容状态
(6)根据sizeCtl的高16位是否等于本次扩容标识戳判断是否是本次扩容
(7)如果扩容标识戳+1等于sizeCtl表示扩容完毕
(8)如果扩容线程数已达到最大值就不能参与扩容,如果所有桶已经分配好了给其他线程就不能参与扩容
(9)上面6-8都不成立就表明可以参与扩容,扩容线程加1,然后返回nextTab
3.3、addCount方法
//check大于0才检查扩容 小于0不检查扩容
private final void addCount(long x, int check) {
CounterCell[] as; long b, s; //as 表示 LongAdder.cells b 表示LongAdder.base s 表示当前map.table中元素的数量
if ((as = counterCells) != null || //true表示cells已经初始化了,当前线程应该去使用hash寻址找到合适的cell 去累加数据 false->表示当前线程应该直接将数据累加到base(没有线程竞争)
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { //false->表示写base成功,数据累加到base中了,当前竞争不激烈,不需要创建cells
//a为当前线程hash寻址命中的cell v 表示当前线程写cell时的期望值 m 表示当前cells数组的长度
CounterCell a; long v; int m; //cells初始化了或者写base失败会进入这个代码块 //true->表示写base失败,与其他线程在base上发生了竞争,当前线程应该去尝试创建cells
boolean uncontended = true; // uncontended为true表示没发生竞争
//第一个条件表示cells还没初始化
//第二个条件表示cells初始化了 根据线程取得对应的cell为null
//第三个条件表示cells初始化了 根据线程取得对应的cell不为null cas设置cell的值为v+x 设置成功为false 设置失败就进入fullAddCount进行重试或者扩容cells
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return; //考虑到fullAddCount里面的事情比较累,就让当前线程不参与到扩容相关的逻辑了,直接返回到调用点
}
if (check <= 1)
return;
s = sumCount();// sumCount统计当前散列表元素个数sum = base + cells[0] + ... + cells[n],这是一个期望值
}
//判断是否达到扩容阈值标准,触发扩容
if (check >= 0) { //check >= 0表示一定是一个put操作调用的addCount,check < 0是remove操作调用的addCount方法
Node<K,V>[] tab, nt; int n, sc; //tab 表示 map.table nt 表示 map.nextTable n 表示map.table数组的长度 sc 表示sizeCtl的临时值
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && //tab不为空才进行扩容
(n = tab.length) < MAXIMUM_CAPACITY) { //当前table长度小于最大值限制,则可以进行扩容
int rs = resizeStamp(n); //扩容标识戳 所有将map容量由16扩容到32的线程,其拿到的扩容唯一标识戳都是1000 0000 0001 1011
//sizeCtl小于0表示table正在扩容 当前线程理论上应该协助table完成扩容
if (sc < 0) {
//第一个条件true说明当前线程获取到的扩容唯一标识戳 不是 本批次扩容 false说明当前线程获取到的扩容唯一标识戳 是 本批次扩容
//第二个条件true表示扩容完毕,当前线程不需要再参与进来 false表示扩容还在进行中,当前线程可以参与
//第三个条件true表示表示当前参与并发扩容的线程达到了最大值 65535 - 1 false表示当前线程可以参与进来
//第四个条件true表示本次扩容结束 false表示当前线程可以参与进来
//第五个条件true表示所有桶位被分配完了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break; //四个条件有一个成立就不参与扩容
//cas设置sizeCtl为sizeCtl+1表示参与扩容的线程多了一个
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);// 扩容(迁移数据):协助扩容线程,持有nextTable参数,进入该方法协助扩容~
} //cas设置sizeCtl 设置成功说明当前线程是触发扩容的第一个线程,在transfer方法需要做一些扩容准备工作 将扩容唯一标识戳左移16位 然后+2
//来到这里表明是第一次扩容 sizeCtl的值更新为2
else if (U.compareAndSwapInt(this, SIZECTL, sc, //sizeCtl高16表示当前的扩容标识戳 低16位表示目前有多少个线程正在参与扩容 (1 + nThread)当前n为1所以就是1 + 1
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null); // 扩容(迁移数据):触发扩容条件的线程 不持有nextTable 因为是第一个参与扩容的线程
s = sumCount();
}
}
}
整体流程:
(1)如果counterCells还没初始化,就cas把增量更新到basecount里,此时没有发生线程竞争
(2)如果cas更新basecount失败,表明发生了线程竞争要初始化counterCells,根据线程获取对应的cell,如果对应的cell不为空,就cas把增量加到这个cell里,counterCells没初始化或counterCells长度为0或对应的cell为空或cas失败就调用fullAddCount方法进行重试或者扩容cells
(3)如果check小于等于1就不检查扩容,重新统计元素数量
(4)如果check大于等于0就检查扩容,tab不为空并且当前table长度小于最大值限制就可以进行扩容
(5)如果sizeCtl小于0当前处于扩容状态,如果当前线程获取到的扩容唯一标识戳 不是 本批次扩容标识戳或扩容完毕或当前参与并发扩容的线程达到了最大值或所有桶位被分配完毕了当前线程就不参与扩容
(6)否则cas把sizeCtl+1,然后调用transfer进行扩容
(7)如果sizeCtl等于0表明此次扩容是参与扩容的第一个线程,调用transfer进行扩容,重新统计元素数量
4、查找元素
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //e是当前遍历桶位 p是目标桶位 n是table长度 eh是当前遍历桶位hash值 ek是当前遍历桶位key
int h = spread(key.hashCode()); //算出key的hash值
if ((tab = table) != null && (n = tab.length) > 0 && //table不为空(如果为空即还没初始化)并且已经初始化并且key对应桶位有元素
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) { //桶位头结点hash和目标节点hash值一样
if ((ek = e.key) == key || (ek != null && key.equals(ek))) //再比较key
return e.val; //找到元素
}
else if (eh < 0) //eh小于0 -1表示正在进行扩容并且当前节点已经迁移到newTable 使用fwd的find方法找 -2表示头结点是树节点使用树find方法找
return (p = e.find(h, key)) != null ? p.val : null; //e.find(h, key))有可能调用的是fwd的find方法或TreeBin的find方法
while ((e = e.next) != null) { //当前节点是链表 遍历链表查找
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
整体流程:
(1)根据key的hash值定位桶位,桶位元素不为空就判断桶位元素是否是要找的元素,如果是就返回
(2)如果桶位元素的hash小于0则表示桶位元素可能是FWD节点或树节点,则调用FWD节点或树节点的find方法进行查找
(3)如果桶位元素的hash大于等于0则表示桶位元素是链表节点,遍历链表查找
FWD节点find方法
Node<K,V> find(int h, Object k) { //第一个自旋是外循环判断对应桶位是否有元素 有元素就进入第二个自旋,即内循环判读元素是fwd节点,TreeBin节点还是链表节点
// loop to avoid arbitrarily deep recursion on forwarding nodes 自旋1
outer: for (Node<K,V>[] tab = nextTable;;) { //tab一定不为空因为只在transfer方法实例化了fwd节点 实例化fwd节点时肯定有nextTable的 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab)
Node<K,V> e; int n; //e是节点从旧tab迁移到newTab对应的位置
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null) //key对应的桶位迁移前就没元素
return null;
for (;;) { //自旋2 能来到这里的前置条件是key对应的桶位不为null
int eh; K ek;
if ((eh = e.hash) == h && //newTab对应桶位的头结点就是要找的目标节点
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable; //执行了这步tab变成newTab之后一般下次内循环不会走到这里
continue outer; //是fwd节点就跳转到自旋1 相当于退出内循环
}
else
return e.find(h, k); //TreeBin节点就调用TreeBin的find方法
} //当前桶位是链表 如果遍历到链表尾部还是找不到就返回null
if ((e = e.next) == null)
return null;
}
}
}
红黑树节点find方法
final Node<K,V> find(int h, Object k) {
if (k != null) {
for (Node<K,V> e = first; e != null; ) {
int s; K ek;
if (((s = lockState) & (WAITER|WRITER)) != 0) { //当lockState为WAITER或WRITER状态时就用链表方式查找
if (e.hash == h && //写状态时 读则采取遍历链表的方式 虽然查找复杂度提高,但读写不阻塞 红黑树写时会阻塞读
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
e = e.next;
} //cas设置LOCKSTATE为读锁
else if (U.compareAndSwapInt(this, LOCKSTATE, s,
s + READER)) {
TreeNode<K,V> r, p;
try { //用红黑树方式查找
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null));
} finally {
Thread w; //如果是最后一个读线程,并且有写线程因为读锁而阻塞 那么要通知它,告诉它可以尝试获取写锁了
if (U.getAndAddInt(this, LOCKSTATE, -READER) == //U.getAndAddInt(this, LOCKSTATE, -READER)释放读锁 LOCKSTATE-4
(READER|WAITER) && (w = waiter) != null) // 减完之后如果等于0110表示还有最后一个线程在读 并且还有一个线程在等(因为0110是6 4 + 2 4是读线程 2是等待线程)
LockSupport.unpark(w); // (w = waiter) != null 说明有一个写线程在等待读操作全部结束。
} // 使用unpark 让 等待线程 即写线程恢复运行状态。
return p;
}
}
}
return null;
}