ConcurrentHashMap源码详解

  • Post author:
  • Post category:其他

ConcurrentHashMap简介

ConcurrentHashMap是HashMap的同步容器,是线程安全的。

ConcurrentHashMap通过CAS和Synchronized来实现线程安全!

ConcurrentHashMap从Segment+ReentrantLock改为Node+Synchronized的原因

因为ConcurrentHashMap已经将锁细化到了一个槽中了,冲突不会太大。

Synchronized在重量锁的时候有自旋锁的行为,不会一下就挂起。

而ReentrantLock的话,只要当前线程只要不是排在第二个等待节点就会被挂起,消耗资源。

线程的挂起和唤醒会导致线程之间的切换,会频繁的导致内核态和用户态之间的切换,十分耗费资源。

所以综合而言,Synchronized的性能会比较好,并且占用内存小。

ConcurrentHashMap源码详解

Put()

public V put(K key, V value) {
    return putVal(key, value, false);
}

static final int spread(int h) {
    //HASH_BITS = 0x7fffffff  0111 1111 1111 1111 1111 1111 1111 1111
    return (h ^ (h >>> 16)) & HASH_BITS;
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
     if (key == null || value == null) throw new NullPointerException();
     //获取hash值
     int hash = spread(key.hashCode());
     int binCount = 0;
     for (Node<K,V>[] tab = table;;) {
         Node<K,V> f; int n, i, fh;
         //如果table为空
         if (tab == null || (n = tab.length) == 0)
             //进行初始化
             tab = initTable();
         //如果table已经初始化,下标为(n - 1) & hash,相当于hash % n,但是对应的槽位还未初始化
         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
             //初始化Node,并将Node设置为tab[i],设置成功就退出 
             if (casTabAt(tab, i, null,
                          new Node<K,V>(hash, key, value, null)))
                 break;                 
         }
         //如果f.hash = MOVED,说明当前槽位正在扩容,先帮助扩容,扩容完毕后再插入
         else if ((fh = f.hash) == MOVED)
             //帮助扩容
             tab = helpTransfer(tab, f);
         else {
             V oldVal = null;
             //用槽的第一个Node作为锁对象
             synchronized (f) {
                 if (tabAt(tab, i) == f) {
                     if (fh >= 0) {//红黑树有一个哨兵节点,hash值为-2,如果fh>0说明是链表
                         binCount = 1;//binCount代表着链表中的个数
                         for (Node<K,V> e = f;; ++binCount) {
                             K ek;
                             //如果存在对应的key
                             if (e.hash == hash &&
                                 ((ek = e.key) == key ||
                                  (ek != null && key.equals(ek)))) {
                                 oldVal = e.val;
                                 if (!onlyIfAbsent)
                                     e.val = value;
                                 break;
                             }
                             Node<K,V> pred = e;
                             //到了链表末尾
                             if ((e = e.next) == null) {
                                 pred.next = new Node<K,V>(hash, key,
                                                           value, null);
                                 break;
                             }
                         }
                     }
                     //如果是红黑树
                     else if (f instanceof TreeBin) {
                         Node<K,V> p;
                         binCount = 2;
                         //调用红黑树的put方法
                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                        value)) != null) {
                             oldVal = p.val;
                             if (!onlyIfAbsent)
                                 p.val = value;
                         }
                     }
                 }
             }
             
             if (binCount != 0) {
                 //如果超过树化阈值,升级为红黑树
                 if (binCount >= TREEIFY_THRESHOLD)
                     treeifyBin(tab, i);
                 if (oldVal != null)
                     return oldVal;
                 break;
             }
         }
     }
     //将size++,如果超过阈值,会触发扩容
     addCount(1L, binCount);
     return null;
 }


总结以下put()方法,和HashMap的put()方法没有什么大差别,就是多了CAS的操作以及对槽中的第一个Node加锁。

还有的话就是,如果插入的时候,Map正在扩容,会先帮助扩容,然后再插入。

我们先说下初始化initTable(),然后再说下扩容

initTable()

说完了插入方法,我们说下初始化

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //可能同时有多个线程同时初始化,自旋CAS来初始化,判断依据是sc
    //如果sc = -1,说明有其他线程在初始化
    //如果sc > 0 说明初始化完成了
    while ((tab = table) == null || tab.length == 0) {
         
        if ((sc = sizeCtl) < 0)
            Thread.yield(); //有其他线程在初始化了,当前线程可以停止了
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//如果CAS成功将sizeCtl更新成了-1
            try {//如果table为空
                if ((tab = table) == null || tab.length == 0) {
                    //如果初始化容量了,则设置为sc,否则设置为默认容量16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    //初始化table
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    //将sc设置为0.75*n,下一次扩容的阈值
                    sc = n - (n >>> 2);
                }
            } finally {
                //扩容的阈值
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

addCount()

HashMap中是用一个long 变量来表示Map中元素个数。而ConcurrentHashMap中则是用一个CounterCell数组+一个baseCount来表示的。

为什么会这样费劲的做呢?
直接定义一个long变量不就行了?假设我们只定义一个long变量 size,ConcurrentHashMap是支持并发的,很有可能会有很多线程同时CAS更新size,因为有很多线程,更新一次很可能会失败,所以需要自旋来一直CAS更新,直至成功。自旋会浪费很大的资源。

将size分为n个CounterCell,这样就大大的降低了线程更新的冲突,性能更好!


@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}
private transient volatile long baseCount;

ConcurrentHashMap中统计个数是调用的sumCount()方法

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

从sumCount()方法我们可以看出,sum = baseCount+CountCell中的元素和。

将size分到了n个CounterCell和一个baseCount中。

CountCells数组的长度也是2的幂次,里面存的有一个long值。

所以sum = baseCount+CountCell中的元素和。

当一个节点入队的时候,会首先利用CAS将baseCount++,如果CAS修改baseCount失败会随机挑选一个CounterCell,然后CAS修改的CountCell的变量。

下面的代码是addCount()的前半段,负责将元素个数+1,后半段是用来扩容的,我们先看前半段。

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    //如果counterCells == null,就将CAS将baseCount++
    //如果CAS成功,就代表更新成功
    //如果CAS不成功,就执行方法里面的代码
    //如果counterCells != null,直接执行方法里面的代码
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        //是否 不存在其他线程竞争
        boolean uncontended = true;
        //如果counterCells不为null,并且随机在counterCells中挑选一个槽位的CounterCell不为null,并且CAS 来增加size成功,就跳过下面的代码段
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            //如果counterCells == null 或者 随机挑选的CounterCell 为null,或者CAS更新CounterCell失败,这时uncontented = false,说明存在其他线程竞争,执行fullAddCount方法
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        //s为当前节点个数
        s = sumCount();
    }

如果CAS更新baseCount失败并且CAS更新CounterCell失败的话,就调用fullAddCount,具体代码如下:

//执行到这个方法说明有以下三个情况
//1、CounterCells == null 
//2、CounterCell == null
//3、CAS更新CounterCell失败,这时unContented = false
private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    //h是要更新的CounterCells下标,是一个随机数
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();   
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                
    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;
        //CounterCells不为null
        if ((as = counterCells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {//如果对应的CounterCell为null
                if (cellsBusy == 0) {//当没有人扩容的时候       
                    CounterCell r = new CounterCell(x); // Optimistic create
                    if (cellsBusy == 0 &&//将cellBusyCAS设置为1
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {//将CounterCells[j]设置为r               
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            //代表更新成功
                            cellsBusy = 0;
                        }
                        if (created)//更新成功返回
                            break;
                        continue;           // 被别人抢先了
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)  //之前CAS更新失败了
                wasUncontended = true;   
            //将对应的CounterCell更新
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                break;
            //说明有其他线程将CounterCells扩容了
            else if (counterCells != as || n >= NCPU)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            //将CounterCells扩容,当连续两次循环都没有更新成功的话,说明这时并发程度较高,会尝试扩容
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                try {
                    if (counterCells == as) {// Expand table unless stale
                        CounterCell[] rs = new CounterCell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = ThreadLocalRandom.advanceProbe(h);
        }
        //这时cellsBusy = 0,并且counterCells=as =null,说明CounterCells还未被初始化
        //当满足上面两个要求,就尝试CAS将cellBusy改为1
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            boolean init = false;
            try {//初始化CounterCells,长度为2
                if (counterCells == as) {
                    CounterCell[] rs = new CounterCell[2];
                    //并将rs[0]设置为x
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {
                //将cellBusy = 0,这也就是为什么额外设置counterCells == as这个条件来控制初始化了
                cellsBusy = 0;
            }
            if (init)//初始化成功,就退出死循环,代表addCount成功
                break;
        }
        //尝试将baseCount++
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                         
    }
}

扩容

addCount()方法的后半段是用来扩容的。

    //如果是新加入节点的话,也就是put的时候,check=1,会检测是否需要扩容
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        
        //如果节点个数大于阈值sizeCtl,sizeCtl=length*0.75
        //并且length 小于MAXIMUM_CAPACITY 最大容量
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);//获取扩容的时间戳,扩容前的长度不一样,时间戳也不一样。
            
//  static final int resizeStamp(int n) {

//   Integer.numberOfLeadingZeros(n)是获取n的高位连续的0的个数count,因为n是2的幂次,所以n不一样,对应的高位连续的0的个数count也不一样
// RESIZE_STAMP_BITS=16, 将count | 1<<15,也就是说后16位代表扩容的时间戳,也就是版本号  
  
//      return Integer.numberOfLeadingZeros(n) | (1 << //(RESIZE_STAMP_BITS - 1));

//   }

            //sc = sizeCtl,正常情况下是代表当前扩容的阈值,当<0说明正在扩容
            if (sc < 0) {//别的线程已经抢先扩容了
                 
                 //如果扩容完毕了,就break退出
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                //如果还未扩容完,就帮助扩容,将sizeCtl+1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }

            //还未扩容,当前线程是第一个扩容的线程
            //将sizeCtl设置为rs<<16,并且+2,我们之前说了rs的后16位代表扩容时间戳版本,将其右移16,现在就说明sizeCtl的高16位代表扩容时间戳版本,低16位代表扩容的线程数+1,
            //rs<<16+2肯定小于0,因为rs的倒数第十六位为1,然后左移16位,rs的第一位肯定为1,所以肯定小于0
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

Transfer()

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    //tab是旧table,nextTab是扩容后的table
    
    int n = tab.length, stride;
    //stride是一次性负责搬运槽位的个数。
    //如果是单核就设置stride为tab的length
    //如果是多核,就设置为 n / 8/ cpu的个数,如果stride<MIN_TRANSFER_STRIDE(16)的话,就设置stride为16
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; 

    //如果nextTab还未初始化,就初始化nextTab,只有第一个线程才会初始化
    if (nextTab == null) { 
        try {
            //扩容为原来的2倍,n<<1
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        //transferIndex = 旧table的长度
        transferIndex = n;
    }

    //首先我先概略说一下,ConcurrentHashMap的扩容原理,
    //需要扩容的槽位个数有transferIndex,一个线程开始搬运的时候,会设置一个上下界,上界i为transferIndex-1,下界bound为transferIndex - stride-1,然后将transferIndex -= stride,接着开始搬运[bound - i]区间中的槽位
    //扩容后的长度
    int nextn = nextTab.length;
    //扩容的设置标志
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    
    //是否分配了搬运区间
    boolean advance = true;
    //扩容是否结束
    boolean finishing = false;
    
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        //还未分配搬运区间
        while (advance) {
            int nextIndex, nextBound;
            //如果上届i >= bound,说明还未搬运完,或者扩容结束了,这两种情况都不需要再申请搬运区间了。
            if (--i >= bound || finishing)
                advance = false;
            //如果transferIndex <= 0,说明搬运区间已经被分配光了,将i设置为-1
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            //分配搬运区间,将transferIndex-=strid
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                //设置下界
                bound = nextBound;
                //设置上界
                i = nextIndex - 1;
                advance = false;
            }
        }
        //如果i<0,说明搬运区间分配完了。搬运区间分配完了,不代表搬运完了,有可能其他线程还在搬运呢
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            //如果搬运结束了
            if (finishing) {
                nextTable = null;
                //将table设置为nextTab
                table = nextTab;
                //将sizeCtl设置为0.75*nextTab.length
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
              
            //如果其他线程还未搬运完,就将sizeCtl-1,因为我们之前说过,除了第一个扩容的线程将sizeCtl设置为 resizeStamp(n) << 16 + 2,其他帮助扩容的线程再进来之前会将sizeCtl+1的,所以再搬运完毕后,需要将sizeCtl-1。只有当所有线程都-1之后,sizeCtl = resizeStamp(n) << 16 + 2的时候,才说明所有线程都搬运结束了。
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                //如果 sizeCtl != resizeStamp(n) << 16 + 2,说明其他线程还未搬运完,自己就返回吧。插不上脚了
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                //搬运完毕
                finishing = advance = true;
                i = n; 
            }
        }
        //执行到这个分支的时候,就说明开始搬运了,
        //如果老table中第i个槽位的抵押给节点为空,就将其第一个节点设置为fwd,fwd的hash = MOVED,然后就继续执行循环,会i--的。
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        //如果f.hash等于MOVED,说明当前槽位为空,就继续执行循环,会i--的。
        else if ((fh = f.hash) == MOVED)
            advance = true; 
        else {
            //当前槽位不为空,开始搬运了。
            //也是将槽位的第一个node 锁定
            //槽位中的node分为两类,一种是呆在原来下标i的,还有一种是挪到i+n的。因为我们的槽位下标是通过hash%n确定的,并且n都是2的幂次,32位中只有一个1.
            //我们可以通过hash&n来确定是否需要搬运到i+n槽位中,如果hash&n==0,说明hash对应的n的那一位是为0的。如果hash&n==1,说明hash对应的n的那一位是为1的,hash%(2*n) = n + hash%n的.
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        //lastRun是从后往前连续的搬运到同一个槽位的最后一个节点,这样的话当再搬运的时候,遍历到lastRun时,就不用再往后遍历了。
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        //如果runBit == 0,说明最后一段节点属于在老位置的
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {//最后一段节点在新位置
                            hn = lastRun;
                            ln = null;
                        }
                        
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            //老位置节点,ln是老位置槽位的头节点
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                            //新位置节点,hn是新位置槽位的头节点
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        //将新table中新老槽位的节点分别设置
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        //将老table中老位置节点设置成fwd
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

最后总结一下,第一个扩容的线程会将新table初始化,然后申请搬运区间,开始搬运连续的stride个槽位,搬运完之后,会继续申请搬运区间,搬运stride个节点,直到搬运区间申请不到了,就退出。

其他普通线程和抵押给扩容的线程类似,除了新table初始化。

接着说一下helpTransfer()

helpTransfer()

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    //如果tab已经初始化了,并且对应的槽位为ForwardingNode,并且新table不为null
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        //扩容对应的时间戳
        int rs = resizeStamp(tab.length);
        //如果sc<0说明还在搬运
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            //如果进来之后搬运玩了,就退出
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            //开始尝试搬运,将sizeCtl+1
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

通过联系transfer()和put()两个方法我们可以看出,如果一个槽位还未被搬运到,这个时候有线程往这个槽位中放元素,会尝试将占领槽位第一个node的锁,然后往里面放入元素。

之后放入的元素会在之后的搬运过程中,搬运到新table中。

remove()

public V remove(Object key) {
    return replaceNode(key, null, null);
}

final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        else if ((fh = f.hash) == MOVED)
        	//如果正在搬运就帮助搬运,等待搬运后再删除
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            //锁定槽位对应的第一个node
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        validated = true;
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

从源码看出,如果正在扩容,就会先帮助扩容。否则就锁定槽位对应的第一个node,开始删除节点。

总结

通过源码可以看出,ConcurrentHashMap和HashMap的区别就是
1、引入了sizeCtl,sizeCtl有三种状态
1.1、0:还未初始化
1.2、-1:正在初始化
1.3、<0但是不等于-1,此时表示正在扩容。第一个扩容的线程会将sizeCtl设置为sizeStamp(n)<<16+2
1.4、大于0.这时为扩容的阈值,为n*0.75
2、通过Synchronized将对应的槽位的第一个节点锁住,只会锁住一个槽,而不是一整个table

3、扩容:
是将n个槽分为若干个连续的组,一个组的长度为固定的stride,stride=n/8/cpu个数,如果小于16会将其赋值为16。

并允许多个线程同时扩容,每个线程都有其搬运的上下界区间。上界由一个transferIndex变量控制,transferIndex初始值为n,每个线程再申请搬运区间的时候,会将transfer -= stride,其对应的区间为[transferIndex -stride,transferIndex],然后开始搬运区间中对应的槽位。搬运完这stride个槽位后,如果发现transferIndex>0,还会继续搬运,直到transderIndex < 0 说明搬运区间已经被申请完了,但是这时未必全部都搬运完成了,申请搬运区间到搬运完成是需要一个过程的。

sizeCtl与扩容状态也有关,sizeCtl的高16位代表着扩容的时间戳,后16位对应着搬运的线程数。第一个扩容的线程会将sizeCtl设置为sizeStamp(n)<<16+2,当线程插入或者删除的时候,发现正在扩容,首先会帮助扩容,当扩容完毕后才会执行插入或者删除操作。具体做法是将sizeCtl+1,然后申请搬运区间并搬运,当线程申请不到搬运区间的时候,将sizeCtl-1,当sizeCtl == sizeStamp(n)<<16+2的时候,说明搬运完成。


版权声明:本文为qq_40276626原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。