ConcurrentHashMap源码详解

  • Post author:
  • Post category:其他


ConcurrentHashMap是Java5中支持高并发、高吞吐量的线程安全HashMap实现。ConcurrentHashMap继承AbstractMap类,实现了ConcurrentMap,Serializable接口。

一、为什么多线程高并发场景要用ConcurrentHashMap

1)为什么不用HashMap:容易形成死循环。HashMap是非线程安全的,在多线程时,多个线程同时put添加的时候,触发了扩容resize(),可能线程1扩容后元素A是指向元素B,然后线程2扩容后元素B指向了元素A,然后就形成了闭合的链表结构,然后再get去获取就会陷入死循环中,cpu一下子就可能达到接近100%。

2)为什么不用Hashtable:Hashtable使用了synchronized实现了线程安全,但是给每个方法都加上了synchronized关键字,也就是说Hashtable锁住了整个table。线程1在put添加时,其他线程都进入了阻塞状态,线程2就不能进行put添加,甚至不能get获取。在多线程高并发场景下,线程之间的竞争资源非常激烈,这样效率就非常低,而且线程越多,竞争越大,效率就越低。

3)为什么要用ConcurrentHashMap:ConcurrentHashMap使用了锁分段技术。Hashtable使用了一把锁锁住了所有的数据,在ConcurrentHashMap内使用了多把锁,每把锁锁住一部分数据,这样线程1在对一段数据执行put添加的时候,并不会影响到其他段的数据,其他线程可以对其他段的数据进行操作,但是如果有其他线程需要对线程1正在操作的这段数据操作,就仍会被阻塞。相对于Hashtable而言,有多少段数据,效率就是Hashtable的多少倍。

二、jdk1.8前后的区别

①、jdk1.8之前:由Segment数组结构,HashEntry数组结构和链表组成,Segment是可重入锁(ReentrantLock),每个Segment的结构采用数组加链表存储结构。ConcurrentHashMap使用了

锁分段

技术,每一段数据就是一个Segment,每一个Segment加一把锁,默认Segment数是16,也就是效率是Hashtable的16倍。

通过三次hash来决定放在哪里,第一次将key.hashcode()得到hash1,第二次将hash1的高位hash得到存放在哪个Segment,第三次将hash1来hash得到存放在哪个HashEntry。

②jdk1.8之后:取消了Segment结构,

采用了CAS和synchronized来保证线程安全,

底层结构与HashMap在jdk1.8的结构一致——数组、链表、红黑树,ConcurrentHashMap锁的是hash桶了。与jdk1.7相比,结构不同,锁的粒度也不同。

(CAS技术:比较交换技术,核心算法CompareAndSwap,它一种无锁策略,就是假设对所有的资源访问都是没有冲突的,因此所有线程可以不用阻塞,持续进行,如果一旦监测到了冲突,它就会不断的重试,直到没有冲突为止!优点:①没有锁竞争带来的cpu开销,②不会产生死锁)

三、源码分析

源码分析主要从核心方法来分析,建议先去看看HashMap源码,有很多相同相似之处

    //在了解put,get等方法前,先了解一下关于table数组的几个重要方法


    //以volatile读的方式读取table数组中的元素
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    //以volatile写的方式,将元素插入table数组
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
    //以CAS的方式,将元素插入table数组
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        //原子的执行如下逻辑:如果tab[i]==c,则设置tab[i]=v,并返回ture.否则返回false
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

(一)、put(K key,V value)添加方法

 public V put(K key, V value) {
    return putVal(key, value, false);
 }
    /** 实现put 和 putIfAbsent 方法       从这里可以得到putIfAbsent()底层就是调用了putVal() */
    // 参数   onlyIfAbsent  引用HashMap中的解释:若为true则修改值
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();  // 不允许空键值存在,为空报空指针异常
        int hash = spread(key.hashCode());  // 获取hash值
        int binCount = 0;    // 链表长度,如果为树就为2
        for (Node<K,V>[] tab = table;;) {  // for循环作用:使用了CAS技术,需要不断失败重试直到成功
            Node<K,V> f;  // 引用 f 用于锁定table内链表或红黑树的头节点

            int n, i, fh;
            if (tab == null || (n = tab.length) == 0)  // table数组是否初始化,为null或容量为0即未初始化
                tab = initTable();    //  调用initTable()初始化table数组 ; 第一次执行put操作时的第一次for循环到这里结束
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //根据tab容量和hash值取到索引下标,判断table该索引位置是否有值
                if (casTabAt(tab, i, null,    // tab数组该索引位置没有值,使用CAS添加,成功返回true,break循环;失败继续for循环直到成功       第一次put操作时,到这里直到成功就添加成功
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin   当容器为空时,没有锁
            }
            else if ((fh = f.hash) == MOVED) // tab[i]!=null时,监测内部是否在数组扩容
                tab = helpTransfer(tab, f); //在扩容时,帮助它扩容
            else {  // tab[i]!=null且未扩容,就给头结点加锁
                V oldVal = null;
                synchronized (f) { // 锁定链表或红黑树的头节点
                    if (tabAt(tab, i) == f) {  // 判断f是不是tab[i]的头节点
                        if (fh >= 0) { // 头节点的hash值大于等于0就是链表
                            binCount = 1; // 已知的链表长度暂时为1
                            for (Node<K,V> e = f;; ++binCount) { //遍历链表所有节点,每循环一次,链表长度binCount+1
                                K ek;
                                if (e.hash == hash &&   // 节点已存在就新值覆盖旧值,putVal方法的第三个参数onlyIfAbsent=false作用就来了
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {   //  节点不存在,就在链表尾部next添加节点
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {  // 如果不为链表,再判断是不是树结构
                            Node<K,V> p;
                            binCount = 2;   // 为红黑树就不需要再转红黑树
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,   //如果红黑树内节点已存在,返回已存在的节点,然后新值覆盖旧值;如果不存在,直接添加到红黑树内
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {    // 链表长度不为0,tab[i]不为空
                    // 当binCount >= TREEIFY_THRESHOLD=8时,链表转为红黑树。
                    //要注意的是,其实这里链表长度实际为9,这里需要自己跟着上面链表的循环画图走一遍才能看出来
                    //也就是这里的8指的是本次put添加之前的链表长度。。。。
                    if (binCount >= TREEIFY_THRESHOLD) 
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);   // 新添加一次,size就加1,可能会触发扩容
        return null;
    }
//获取hash值的方法

static final int spread(int h) {

    //通过一次无符号右移,一次异或运算,一次位与运算取到hash值; 得到的hash值位与上0x7FFFFFFF是避免为负数
    return (h ^ (h >>> 16)) & HASH_BITS; // ( key.hashcode() ^ (key.hashcode()>>>16) ) & 0x7FFFFFFF
}

对put方法流程做一个简述:

1)put方法底层调用了putVal方法,传入了key,value,onlyIfAbsent=fasle三个参数

2)对键值判空,为空报空指针异常

3)调用spread方法通过三次扰动计算key的hash值

4)判断数组table有没有被初始化,没有被初始化就调用initTable初始化table

5)根据hash值和table数组容量得到索引,如果数组该索引位置为空,使用CAS添加节点,失败就重试,直到成功

6)监测数组是否在扩容,如果在就帮助它扩容

7)如果table数组该索引位置不为空且没有扩容,就给头节点加synchornized锁

8)对链表或者红黑树遍历,如果已存在,新值覆盖旧值;不存在,为链表就在尾部添加节点,为红黑树根据自平衡添加树节点

9)监测binCount达到链表转红黑树的阈值,达到就链表转红黑树,此时链表长度实际为9(链表转红黑树方法内实际做了一次判断,如果table数组容量小于最低值64,会对数组扩容,这里与HashMap一样)

10)统计table数组元素个数的size变量+1

总结一下,put方法是如何通过CAS保证线程安全的:使用CAS会有一个预期值,如果取到的值与预期值一致,就会将值更新添加。当多个线程同时执行put操作,线程1获取到tab[i]=null,与预期值一致,就执行casTabAt(tab,i,null,添加的节点)添加节点进去;这时线程2执行casTabAt(tab,i,null,添加的节点)的时候,获取到tab[i]!=null,就会下一次循环,再来判断添加,直到成功。

(二)initTable()初始化数组方法

条件:sizeCtl=0 ,table数组为空或长度为0

    /**
     * Initializes table, using the size recorded in sizeCtl. 使用sizeCtl来记录初始化table
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) { // 使用CAS技术初始化table数组,失败会不断重试,直到初始化成功
            if ((sc = sizeCtl) < 0)    //  sizeCtl默认为0
                Thread.yield(); // lost initialization race; just spin 释放自己的时间片,重新与其他线程竞争资源
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {  //使用CAS将sizeCtl修改为-1,执行CAS成功返回true
                try {
                    if ((tab = table) == null || tab.length == 0) {  // 监测table有没有初始化
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 上面sc=sizeCtl=0 —>n=16
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt; // 初始化数组结束,table容量为16
                        sc = n - (n >>> 2); // sc = 16-4=12
                    }
                } finally {
                    sizeCtl = sc; // sizeCtl=12,用于下次扩容
                }
                break;
            }
        }
        return tab;
    }
/**               相关属性
* hash表初始化或扩容时的一个控制位标识量。
* sizeCtl=0   数组未初始化
* sizeCtl=-1  正在初始化数组
*/

private transient volatile int sizeCtl;  // 默认为0

//默认的初始化table容量

private static final int DEFAULT_CAPACITY = 16;

初始化table数组要注意一个问题,在多线程模式下,每个线程都在进行put操作,要怎么保证只初始化table数组一次?

解决:sizeCtl默认为0,当有线程在初始化table数组,并且执行CAS初始化成功时,会将sizeCtl使用compareAndSwapInt()修改为-1,这时其他线程获取到sizeCtl<0,会执行Thread.yield()释放自己的时间片,从而达到只初始化table数组一次的目的。

(三)、get(K key)根据key获取value方法

    /**
     * 返回指定键映射到的值,
     */
    public V get(Object key) {
        Node<K,V>[] tab;
        Node<K,V> e, p; 
        int n, eh; K ek;
        int h = spread(key.hashCode());  //  取key的hash值
        if ((tab = table) != null && (n = tab.length) > 0 &&   // 数组必须初始化过且不为空
            (e = tabAt(tab, (n - 1) & h)) != null) {  //定位数组且不为空
            //数组该索引位置的头节点的hash和key一致,切不为null,返回它的值
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek))) 
                    return e.val;
            }
            else if (eh < 0)   // ForwardingNode节点、红黑树等
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) { // 遍历链表
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

流程简述:1)定位到数组该索引位置,获取到头节点e

2)判断头节点e是否为需要找的节点

3)是,直接返回值;否,遍历链表或树找到节点并返回值,未找到返回null

(四)、treeifyBin(Node<K,V>[] tab, int index)链表转红黑树方法

条件:当前put添加节点之前链表结构中元素数超过了阈值TREEIFY_THRESHOLD(默认为8),在当前节点添加后执行该方法,此时链表实际长度为9

    /**
     * 在这里链表转成红黑树,会替代掉所有相关节点     
     *  除非table容量太小(小于64),会在这里进行扩容
     */
    private final void treeifyBin(Node<K,V>[] tab, int index) {  // 参数数组tab和索引 i
        Node<K,V> b;  // 用于锁住头节点

        int n, sc;
        if (tab != null) {
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)  // 数组容量小于最小值64,就会进行扩容为原来的两倍,来缓解单个链表长度过大的问题
                tryPresize(n << 1);
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 数组容量大于64,就将链表转为树结构,然后调用setTabAt方法转为红黑树           b.hash >= 0 即为普通的链表节点
                synchronized (b) {  // 锁住头节点
                    if (tabAt(tab, index) == b) {  // 确定锁住的是头节点
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {  // 遍历链表,将原本的Node链表转成TreeNode链表
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));  // 将TreeNode链表转成红黑树
                    }
                }
            }
        }
    }

从源码可以看出,红黑树的构造过程是在TreeBin的构造方法中完成的。

在TreeBin的构造方法中怎么转成红黑树的更详细分析看这里:

https://www.cnblogs.com/Joy-Hu/p/10876508.html

     /**           相关内部类
     * Nodes for use in TreeBins           TreeNode是一个内部类,继承了Node类,下面显示暂时只需              要知道代码
     */
    static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;

        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }

        ...  ....

    }

    /**        
     * 可以对容器进行treeified处理的最小表容量。
     * (如果一个容器中有太多节点,就会重新调整表的大小。)
     * 该值应至少为4 * TREEIFY_THRESHOLD     4*8 以避免
     * 调整大小和treeification阈值之间的冲突。
     */
    static final int MIN_TREEIFY_CAPACITY = 64; 

(五)扩容机制


协助扩容

:在讲扩容机制之前,先讲一个优化的方法。因为在ConcurrentHashMap中最耗时的操作就是  扩容  了,所以对扩容进行优化能很大程度上提升性能。在jdk1.8之前,ConcurrentHashMap在进行put操作遇见扩容时,会阻塞等待,直到扩容完成后,才能继续执行。在jdk1.8之后,ConcurrentHashMap在进行put操作遇见扩容时(监测tab数组该索引位置的头节点的hash=-1时即遇见扩容),会调用helpTransfer()方法帮助它扩容,从而实现优化效果。


扩容场景

: 1)数组未被初始化,第一次调用put()时,调用initTable()初始化数组实现扩容

2)链表转红黑树的时候,数组容量小于最小值MIN_TREEIFY_CAPACITY =64时,调用tryPresize()扩容

3)put()添加完成后会调用addCount()记录元素个数,addCount()会监测数组元素个数在达到阈值时,触发

transfer

()方法扩容


transferIndex:扩容索引,表示已经分配给扩容线程的table数组索引位置。主要用来协调多个线程,并发安全地
获取迁移任务(hash桶)。
    /**
     *  将每个bin中的节点移动或者复制到新表中
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) 
            stride = MIN_TRANSFER_STRIDE; // subdivide range 细分范围   // 每次至少迁移16个hash桶
        if (nextTab == null) {            // initiating    启动     // 根据当前数组容量,新建一个当前两倍容量的数组nextTable
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;  // 去当前数组的容量

        //标记当前节点已经迁移完成,它的hash值是MOVED=-1
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 初始化ForwardingNode节点,ForwardingNode保存了nextTable数组的引用
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab   确保在提交nextTab之前进行清理

        //1、逆序遍历迁移已经获取到的hash桶集合,如果迁移完毕,则更新transferIndex,获取下一批待迁移的hash桶

        //2、如果transferIndex=0,表示所以hash桶均被分配,将i置为-1,准备退出transfer方法
        for (int i = 0, bound = 0;;) {  // 移动处理数组中每个索引位置的链表元素,默认advance = true;
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt        // 使用for循环和CAS算法设置transferIndex =transferIndex -stride ,保证了扩容的并发安全
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;   //  这里 i 表示当前处理的数组索引下标
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }

               /**
                * 第一个扩容的线程,执行transfer方法之前,会设置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
                *后续帮其扩容的线程,执行transfer方法之前,会设置 sizeCtl = sizeCtl+1
                *每一个退出transfer的方法的线程,退出之前,会设置 sizeCtl = sizeCtl-1
                *那么最后一个线程退出时:
                * 必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
                */
                //不相等,说明不到最后一个线程,直接退出transfer方法
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { 
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)   // 如果该索引位置没有节点,通过CAS插入ForwardingNode节点,表示迁移处理过
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)  //   已迁移完成的节点,hash为MOVED=-1
                advance = true; // already processed  已结处理过的
            else {   // 开始迁移
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {         //  迁移链表,将node链表分成两个新的链表
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {  // 遍历这个桶
                                int b = p.hash & n;  // 取到桶中每个元素的hash值
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);   // 将链表放进新数组中
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {  // 迁移红黑树
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
/**    
* hash表初始化或扩容时的一个控制位标识量。
* sizeCtl=0   数组未初始化
* sizeCtl=-1  正在初始化数组
* sizeCtl>0   初始化的容量
* sizeCtl=0.75*数组的容量  扩容的阈值  (正常状态)
*sizeCtl<0  正在扩容
*/

private transient volatile int sizeCtl;  // 默认为0

/**可用处理器的jvm数量 */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/** 扩容的线程每次至少要迁移16个hash桶*/
private static final int MIN_TRANSFER_STRIDE = 16;

总结一下,每次扩容table数组容量为之前的两倍,sizeCtl阈值大小为之前的两倍



版权声明:本文为qq_42679299原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。