ConcurrentHashMap采用了非常精妙的”分段锁”策略,ConcurrentHashMap的主干是个Segment数组。
static class Segment<K,V> extends ReentrantLock implements Serializable
final Segment<K,V>[] segments;
Segment继承了ReentrantLock,所以它就是一种可重入锁(ReentrantLock)。在ConcurrentHashMap,一个Segment就是一个子哈希表,Segment里维护了一个HashEntry数组,并发环境下,对于不同Segment的数据进行操作是不用考虑锁竞争的。
所以,对于同一个Segment的操作才需考虑线程同步,不同的Segment则无需考虑。
Segment类似于HashMap,一个Segment维护着一个HashEntry数组
transient volatile HashEntry<K,V>[] table;
HashEntry是目前我们提到的最小的逻辑处理单元了。一个ConcurrentHashMap维护一个Segment
数组
,一个Segment维护一个HashEntry
数组
。
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
//其他省略
}
Segment类似哈希表,那么一些属性就跟我们之前提到的HashMap差不离,比如负载因子loadFactor,比如阈值threshold等等,看下Segment的构造方法。
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;//负载因子
this.threshold = threshold;//阈值
this.table = tab;//主干数组即HashEntry数组
}
ConcurrentHashMap的构造方法
主要的字段如下:
//node数组最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认初始值,必须是2的幂数
private static final int DEFAULT_CAPACITY = 16;
//数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//负载因子
private static final float LOAD_FACTOR = 0.75f;
//链表转红黑树阀值,> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
//树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
//2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//forwarding nodes的hash值
static final int MOVED = -1;
//树根节点的hash值
static final int TREEBIN = -2;
//ReservationNode的hash值
static final int RESERVED = -3;
//可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的数组
transient volatile Node<K,V>[] table;
/*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
*当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容
*当为0时:代表当时的table还没有被初始化
*当为正数时:表示初始化或者下一次进行扩容的大小
*/
private transient volatile int sizeCtl;
构造函数
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//MAX_SEGMENTS 为1<<16=65536,也就是最大并发数为65536
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
//2的sshif次方等于ssize,例:ssize=16,sshift=4;ssize=32,sshif=5
int sshift = 0;
//ssize 为segments数组长度,根据concurrentLevel计算得出
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//segmentShift和segmentMask这两个变量在定位segment时会用到,后面会详细讲
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//计算cap的大小,即Segment中HashEntry的数组长度,cap也一定为2的n次方.
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
//创建segments数组并初始化第一个Segment,其余的Segment延迟初始化
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}
put
方法
插入过程会进行第一次 key 的 hash 来定位 Segment 的位置,如果该 Segment 还没有初始化,即通过 CAS 操作进行赋值,然后进行第二次 hash 操作,找到相应的 HashEntry 的位置,这里会利用继承过来的锁的特性,在将数据插入指定的 HashEntry 位置时(尾插法),会通过继承 ReentrantLock 的
tryLock()
方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式去继续的调用
tryLock()
方法去获取锁,超过指定次数就挂起,等待唤醒。
putVal函数的流程大体如下。
① 判断存储的key、value是否为空,若为空,则抛出异常,否则,进入步骤②
② 计算key的hash值,随后进入无限循环,该无限循环可以确保成功插入数据,若table表为空或者长度为0,则初始化table表,否则,进入步骤③
③ 根据key的hash值取出table表中的结点元素,若取出的结点为空(该桶为空),则使用CAS将key、value、hash值生成的结点放入桶中。否则,进入步骤④
④ 若该结点的的hash值为MOVED,则对该桶中的结点进行转移,否则,进入步骤⑤
⑤ 对桶中的第一个结点(即table表中的结点)进行加锁,对该桶进行遍历,桶中的结点的hash值与key值与给定的hash值和key值相等,则根据标识选择是否进行更新操作(用给定的value值
替换该结点的value值),若遍历完桶仍没有找到hash值与key值和指定的hash值与key值相等的结点,则直接新生一个结点并赋值为之前最后一个结点的下一个结点。进入步骤⑥
⑥ 若binCount值达到红黑树转化的阈值,则将桶中的结构转化为红黑树存储,最后,增加binCount的值。
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 键或值为空,抛出异常
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
// 根据 key 计算出 hashcode。
for (Node<K,V>[] tab = table;;) { // 无限循环
Node<K,V> f; int n, i, fh;
// 判断是否需要初始化
if (tab == null || (n = tab.length) == 0) // 表为空或者表的长度为0
// 初始化表
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// f 即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null))) // 比较并且交换值,如tab的第i项为空则用新生成的node替换
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) // 该结点的hash值为MOVED
// 进行结点的转移(在扩容的过程中)
// 如果当前位置的 hashcode == MOVED == -1,则需要进行扩容。
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {// 找到table表下标为i的节点
// 如果都不满足,则利用 synchronized 锁写入数据。
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 该table表中该结点的hash值大于0
// binCount赋值为1
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { // 无限循环
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // 结点的hash值相等并且key也相等
// 保存该结点的val值
oldVal = e.val;
if (!onlyIfAbsent)// 进行判断
// 将指定的value保存至结点,即进行了结点值的更新
e.val = value;
break;
}
// 保存当前结点
Node<K,V> pred = e;
if ((e = e.next) == null) { // 当前结点的下一个结点为空,即为最后一个结点
// 新生一个结点并且赋值给next域
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 结点为红黑树结点类型
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) { // 将hash、key、value放入红黑树
// 保存结点的val
oldVal = p.val;
if (!onlyIfAbsent) // 判断
// 赋值结点value值
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
put的主要逻辑也就两步:
1.定位segment并确保定位的Segment已初始化 2.调用Segment的put方法。
关于segmentShift和segmentMask
segmentShift和segmentMask这两个全局变量的主要作用是用来定位Segment,int j =(hash >>> segmentShift) & segmentMask。
segmentMask
:段掩码,假如segments数组长度为16,则段掩码为16-1=15;segments长度为32,段掩码为32-1=31。这样得到的所有bit位都为1,可以更好地保证散列的均匀性
segmentShift
:2的sshift次方等于ssize,segmentShift=32-sshift。若segments长度为16,segmentShift=32-4=28;若segments长度为32,segmentShift=32-5=27。而计算得出的hash值最大为32位,无符号右移segmentShift,则意味着只保留高几位(其余位是没用的),然后与段掩码segmentMask位运算来定位Segment。
initTable的逻辑如下
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) { // 无限循环
//如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片
if ((sc = sizeCtl) < 0) // sizeCtl小于0,则进行线程让步等待
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 比较sizeCtl的值与sc是否相等,相等则用-1替换
try {
if ((tab = table) == null || tab.length == 0) { // table表为空或者大小为0
// sc的值是否大于0,若是,则n为sc,否则,n为默认初始容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 新生结点数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 赋值给table
table = tab = nt;
// sc为n * 3/4
sc = n - (n >>> 2);
}
} finally {
// 设置sizeCtl的值
sizeCtl = sc;
}
break;
}
}
// 返回table表
return tab;
}
helpTransfer函数、
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { // table表不为空并且结点类型使ForwardingNode类型,并且结点的nextTable不为空
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) { // 条件判断
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0) //
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 比较并交换
// 将table的结点转移到nextTab中
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
get方法
ConcurrentHashMap的get操作的流程很简单,也很清晰,可以分为三个步骤来描述
- 计算hash值,定位到该table索引位置,如果是首节点符合就返回
- 如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回
- 以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
//先定位Segment,再定位HashEntry
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
get方法无需加锁,由于其中涉及到的共享变量都使用volatile修饰,volatile可以保证内存可见性,所以不会读取到过期数据。
来看下concurrentHashMap代理到Segment上的put方法,Segment中的put方法是要加锁的。只不过是锁粒度细了而已。
replaceNode函数
final V replaceNode(Object key, V value, Object cv) {
// 计算key的hash值
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) { // 无限循环
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null) // table表为空或者表长度为0或者key所对应的桶为空
// 跳出循环
break;
else if ((fh = f.hash) == MOVED) // 桶中第一个结点的hash值为MOVED
// 转移
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
synchronized (f) { // 加锁同步
if (tabAt(tab, i) == f) { // 桶中的第一个结点没有发生变化
if (fh >= 0) { // 结点hash值大于0
validated = true;
for (Node<K,V> e = f, pred = null;;) { // 无限循环
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // 结点的hash值与指定的hash值相等,并且key也相等
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) { // cv为空或者与结点value相等或者不为空并且相等
// 保存该结点的val值
oldVal = ev;
if (value != null) // value为null
// 设置结点value值
e.val = value;
else if (pred != null) // 前驱不为空
// 前驱的后继为e的后继,即删除了e结点
pred.next = e.next;
else
// 设置table表中下标为index的值为e.next
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) { // 为红黑树结点类型
validated = true;
// 类型转化
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) { // 根节点不为空并且存在与指定hash和key相等的结点
// 保存p结点的value
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) { // cv为空或者与结点value相等或者不为空并且相等
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p)) // 移除p结点
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
// baseCount值减一
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
扩容
当 table 容量不足的时候,即 table 的元素数量达到容量阈值
sizeCtl
,需要对 table 进行扩容。
整个扩容分为两部分:
构建一个 nextTable,大小为 table 的两倍。 把 table 的数据复制到 nextTable 中。
/*
* 扩容后桶的大小总是2的幂次方
* 初始化容量:n=16 sizeCtl=12
* 第一次扩容:n=32 sizeCtl=24
* 第三次扩容:n=64 sizeCtl=48 n<=sizeCtl 退出扩容
*/
private final void tryPresize(int size) {
// 扩大桶的大小,必须是2的幂次方
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
/*
* sizeCtl=0表示容器没有被初始化
* sizeCtl>0表示容器已初始化,准备扩容
*/
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 容器没有被初始化,准备初始化
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
// 创建桶数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
// 设置扩容阈值
sc = n - (n >>> 2);
}
} finally {
// 设置扩容阈值0.75*n
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
/*
* 当阈值大于需要扩容的大小时(初始化的 t = n << 1)、
* 容器大于等于最大允许大小时成功,才退出。
* 注意:这里的阈值不是下次需要扩容的大小,是这次扩容的阈值,它是跟初始扩大
* 容量(n<<1)比较,小于则继续扩大容量(n<<1 sizeCtl=0.75*n)
*/
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
// 正在扩容中
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
// 扩容已经完成
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
/*
* 当前线程在扩容时发现已存在其他的线程正在执行扩容,
* 则参与进去扩容任务中,不同的线程分配不同的桶的迁移任务,
* 并使用内置锁来处理并发执行的情况
*/
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
/*
* 这里设置SIZECTL的值(rs << RESIZE_STAMP_SHIFT) + 2
* 数组槽的容量最大值是(1 << 30)=1073741824
* 当数组槽长度大于(1 << 15)=32767,就会出现负数
*/
transfer(tab, null);
}
}
}
/*
* 扩容过程中,会把旧数组的数据迁移到扩容后新数组上
* 从右到左每次从旧数组迁移stride个桶数据到新数组
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 计算每次迁移结点个数,不小于16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
// 容器未初始化则初始化容器
try {
@SuppressWarnings("unchecked")
// 初始化的时候扩容为原来两倍大小
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// 临时交换数组
nextTable = nextTab;
// 设置迁移索引
transferIndex = n;
}
int nextn = nextTab.length;
/*
* 这是一个空的标志节点,当数组结点为null或被转移之后就会把数组槽引用指向ForwardingNode结点
*/
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
// 是否全部转移标志
boolean finishing = false; // to ensure sweep before committing nextTab
/*
* 会对旧数组槽进项两次全局检查:
* 1、为所有的数组元素指向ForwardingNode对象引用
* 2、提交前再次检查是否都符合第一条规则
*/
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
/*
* while循环主要设置transferIndex标志位,为了迁移旧数组结点
*/
while (advance) {
int nextIndex, nextBound;
/*
* 退出while循环:
* 1:完成扩容
* 2:当前线程完成自己负责的那部分区域
*/
if (--i >= bound || finishing)
advance = false;
// 完成扩容
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
/*
* 设置transferIndex-=stride,当transferIndex小于0则设为0
* 从右到左迁移标志直到0索引位置
*/
// 记录当前线程迁移哈希桶的最左边界
bound = nextBound;
// 记录当前线程迁移哈希桶的最大索引
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
// 完成扩容
nextTable = null;
// 设置新的数组槽
table = nextTab;
// 设置扩容阈值0.75*n
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 再次设置i,再次检查迁移是否完成,同时设置完成标志finishing为true
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
/*
* 把数组槽中为null的元素设置为ForwardingNode结点
* ForwardingNode结点的哈希码是MOVED(-1)
*/
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
// 当槽的引用的结点的哈希码是MOVED表明已经设置过了
advance = true; // already processed
else {
// 加锁操作
synchronized (f) {
if (tabAt(tab, i) == f) {
// 原位置、新位置结点引用
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
// 与上一个结点的(hash & n)值不同的最近结点
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
// 记录不同的结点位置和(hash & n)值
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
// 原结点标签结束的链表
ln = lastRun;
hn = null;
}
else {
// 新索引结点结束的链表
hn = lastRun;
ln = null;
}
/*
* 拆分的链表顺序与原链表结点顺序相反
*/
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 原数组索引i指向ForwardingNode对象
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
// 与HashMap实现原理相同
}
}
}
}
}
}