ConcurrentHashMap

  • Post author:
  • Post category:其他

ConcurrentHashMap

初始化

SIZECTL变量

  • sizeCtl=0表示数组未初始化,且初始化容量为16
  • sizeCtl=-1表示数组正在初始化
  • sizeCtl>0表示阈值,数组已初始化完成(如果未初始化表示容量)
  • sizeCtl<0&&sizeCtl != -1表示正在扩容,

不允许null键null值

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();

构造函数

使用默认构造函数时,并没有初始化长度。第一次调用put时才会初始化长度,默认16

public ConcurrentHashMap() {
    }

initTable()方法初始化长度

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();

初始化(如何保证线程安全)

cas+自旋+dubboCheck保证线程安全
cas保证代码块只能单线程执行
dubboCheck保证只执行一次
自旋?

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
            	//sizeCtl<0说明其他线程在初始化,yield让出cpu执行权
                Thread.yield(); 
                //cas判断当前线程的SIZECTL与主内存的sizeCtl是否相等,如果相等就把主内存的sizeCtl更新为-1(只有一个线程x能够成功,但x执行完毕,有一个线程y在Thread.yield();位置,线程y就能进入,所以有了dubboCheck)
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                	//dubboCheck保证线程安全
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //扩容阈值,n-1/4n=0.75*n
                        sc = n - (n >>> 2);
                    }
                } finally {
                	//扩容阈值
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

初始容量计算

当指定其容量为32时,实际上是64( jdk1.8
假设指定容量为x,则实际长度y=1.5*x,然后是大于y的最近的2的幂次方的值

		//实际32
        new ConcurrentHashMap(21);
        //实际64
        new ConcurrentHashMap(22);
        //实际32
        new ConcurrentHashMap(20);

添加安全

put方法

数组+链表:数组的元素称为桶,一个桶可以装一个链表
cas保证数组添加是线程安全的
synchronized 保证链表安全,锁的对象是桶,最小粒度的锁保证效率
当链表长度大于等于8,会走转换树的方法,判断容量小于64会扩容,否则就转换成树
最终维护集合长度,判断是否达到阈值,如果达到就扩容

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();     
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //如果当前桶为null,cas设置值
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //判断是否是扩容,如果是扩容就不put
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //保证链表安全
                synchronized (f) {
                	//如果链表变成了树或者扩容了,当前元素就有可能不是原来的节点
                	if (tabAt(tab, i) == f) {
                		//hash值fh大于0就是链表
                		if (fh >= 0) {
	                		binCount = 1;
	                            for (Node<K,V> e = f;; ++binCount) {
	                                K ek;
	                                if (e.hash == hash &&
	                                    ((ek = e.key) == key ||
	                                     (ek != null && key.equals(ek)))) {
	                                    oldVal = e.val;
	                                    if (!onlyIfAbsent)
	                                        e.val = value;
	                                    break;
	                                }
	                                Node<K,V> pred = e;
	                                if ((e = e.next) == null) {
	                                    pred.next = new Node<K,V>(hash, key,
	                                                              value, null);
	                                    break;
	                                }
	                            }
	                        }
	                        //否则就是树
	                        else if (f instanceof TreeBin) {
	                            Node<K,V> p;
	                            binCount = 2;
	                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
	                                                           value)) != null) {
	                                oldVal = p.val;
	                                if (!onlyIfAbsent)
	                                    p.val = value;
	                            }
	                        }
	                    }
	                }
	                if (binCount != 0) {
	                //判断链表是不是>=8,是就走转红黑树的方法
                    if (binCount >= TREEIFY_THRESHOLD)  
	                    //treeifyBin会判断容量是否大于64,如果大于就转换成树,否则就扩容               
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //维护集合长度,判断是否达到阈值,如果达到就扩容
        addCount(1L, binCount);
        return null;
    }

容量长度如何计算

addCount方法,分段cas+自旋保证线程安全

//首先对这个变量进行++,当多个线程对其操作时分段cas一个数组(线程数最多=cpu核心树),之后加起来,伪代码如下
private transient volatile long baseCount;
//计数器单元格表。当非空时,大小是 2 的幂。
private transient volatile CounterCell[] counterCells;
//CounterCell对象里面的value值就是分段cas操作的目标值

维护一个数组,伪代码如下

/**
 * 分段cas,减少了线程竞争。
 * 每个线程分配一个可操作数字,最后加起来
 */
public class Test {
    //初始容量
    private static transient volatile long baseCount;
    public static void main(String[] args) {
        //假设cpu是4核
        int[] ints = {0,0,0};
        //线程1,直接加baseCount(这里是cas),线程2一直操作失败就会创建CounterCell数组长度=cpu核心数,线程3一直操作失败就会创建下一个CounterCell放入CounterCell数组,直到数组满
        baseCount++;
        //线程2,对数组的第一个元素操作,(假设一直++,加到了4)
        ints[0] = 4;
        //线程3,对数组的第二个元素操作,(假设一直++,加到了8)
        ints[1] = 8;
        //线程4,对数组的第三个元素操作,(假设一直++,加到了16)
        ints[2] = 16;
        baseCount = baseCount+ints[0]+ints[1]+ints[2];
        System.out.println(baseCount);
    }
}

扩容

  • 如果cpu核心数是1,就不需要多线程扩容
  • 将旧数组划分,(分治的思想)
  • 每个线程分配x个
  • 最少分配16个,如果小于16就是16,大于16就是x
  • 每个线程从每个数组的尾部开始迁移,如果迁移成功就将桶的设置为ForwardingNode(内部类对象),put时会判断节点为ForwardingNode表示在扩容,并且这个桶已经迁移(moved节点hashCode=-1)
  • 每个桶进行迁移时使用synchronized 保证线程安全

版权声明:本文为qq_25271459原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。