初始化
SIZECTL变量
- sizeCtl=0表示数组未初始化,且初始化容量为16
- sizeCtl=-1表示数组正在初始化
- sizeCtl>0表示阈值,数组已初始化完成(如果未初始化表示容量)
- sizeCtl<0&&sizeCtl != -1表示正在扩容,
不允许null键null值
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
构造函数
使用默认构造函数时,并没有初始化长度。第一次调用put时才会初始化长度,默认16
public ConcurrentHashMap() {
}
initTable()方法初始化长度
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
初始化(如何保证线程安全)
cas+自旋+dubboCheck保证线程安全
cas保证代码块只能单线程执行
dubboCheck保证只执行一次
自旋?
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
//sizeCtl<0说明其他线程在初始化,yield让出cpu执行权
Thread.yield();
//cas判断当前线程的SIZECTL与主内存的sizeCtl是否相等,如果相等就把主内存的sizeCtl更新为-1(只有一个线程x能够成功,但x执行完毕,有一个线程y在Thread.yield();位置,线程y就能进入,所以有了dubboCheck)
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//dubboCheck保证线程安全
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//扩容阈值,n-1/4n=0.75*n
sc = n - (n >>> 2);
}
} finally {
//扩容阈值
sizeCtl = sc;
}
break;
}
}
return tab;
}
初始容量计算
当指定其容量为32时,实际上是64( jdk1.8)
假设指定容量为x,则实际长度y=1.5*x,然后是大于y的最近的2的幂次方的值
//实际32
new ConcurrentHashMap(21);
//实际64
new ConcurrentHashMap(22);
//实际32
new ConcurrentHashMap(20);
添加安全
put方法
数组+链表:数组的元素称为桶,一个桶可以装一个链表
cas保证数组添加是线程安全的
synchronized 保证链表安全,锁的对象是桶,最小粒度的锁保证效率
当链表长度大于等于8,会走转换树的方法,判断容量小于64会扩容,否则就转换成树
最终维护集合长度,判断是否达到阈值,如果达到就扩容
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果当前桶为null,cas设置值
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//判断是否是扩容,如果是扩容就不put
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//保证链表安全
synchronized (f) {
//如果链表变成了树或者扩容了,当前元素就有可能不是原来的节点
if (tabAt(tab, i) == f) {
//hash值fh大于0就是链表
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//否则就是树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//判断链表是不是>=8,是就走转红黑树的方法
if (binCount >= TREEIFY_THRESHOLD)
//treeifyBin会判断容量是否大于64,如果大于就转换成树,否则就扩容
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//维护集合长度,判断是否达到阈值,如果达到就扩容
addCount(1L, binCount);
return null;
}
容量长度如何计算
addCount方法,分段cas+自旋保证线程安全
//首先对这个变量进行++,当多个线程对其操作时分段cas一个数组(线程数最多=cpu核心树),之后加起来,伪代码如下
private transient volatile long baseCount;
//计数器单元格表。当非空时,大小是 2 的幂。
private transient volatile CounterCell[] counterCells;
//CounterCell对象里面的value值就是分段cas操作的目标值
维护一个数组,伪代码如下
/**
* 分段cas,减少了线程竞争。
* 每个线程分配一个可操作数字,最后加起来
*/
public class Test {
//初始容量
private static transient volatile long baseCount;
public static void main(String[] args) {
//假设cpu是4核
int[] ints = {0,0,0};
//线程1,直接加baseCount(这里是cas),线程2一直操作失败就会创建CounterCell数组长度=cpu核心数,线程3一直操作失败就会创建下一个CounterCell放入CounterCell数组,直到数组满
baseCount++;
//线程2,对数组的第一个元素操作,(假设一直++,加到了4)
ints[0] = 4;
//线程3,对数组的第二个元素操作,(假设一直++,加到了8)
ints[1] = 8;
//线程4,对数组的第三个元素操作,(假设一直++,加到了16)
ints[2] = 16;
baseCount = baseCount+ints[0]+ints[1]+ints[2];
System.out.println(baseCount);
}
}
扩容
- 如果cpu核心数是1,就不需要多线程扩容
- 将旧数组划分,(分治的思想)
- 每个线程分配x个
- 最少分配16个,如果小于16就是16,大于16就是x
- 每个线程从每个数组的尾部开始迁移,如果迁移成功就将桶的设置为ForwardingNode(内部类对象),put时会判断节点为ForwardingNode表示在扩容,并且这个桶已经迁移(moved节点hashCode=-1)
- 每个桶进行迁移时使用synchronized 保证线程安全
版权声明:本文为qq_25271459原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。