关于ConcurrentHashMap和ConcurrentSkipListMap
文章目录
-
ConcurrentSkipListMap目标是代替synchronizedSortedMap(TreeMap) 解决并发环境下的线程安全问题,对应的非并发容器是TreeMap
-
ConcurrentHashMap也是位于juc包中,目标是代替Hashtable、synchronizedMap,支持复合操作 ,对应的非并发容器是HashMap
-
TreeMap底层数据结构是一个红黑树,HashMap底层是数组+链表+红黑树,TreeMap集合元素有序(Key排序),HashMap效率更高
一:ConcurrentHashMap
1.为什么HashMap不安全?
在多线程情况下HashMap是不安全的。在jdk1.8之前,在put的时候,插入的元素超过了容量(由负载因子决定)的范围就会触发扩容操作,就是rehash,这个会重新将原数组的内容重新hash到新的扩容数组中,在多线程的环境下,存在同时其他的元素也在进行put操作,如果hash值相同,可能出现同时在同一数组下出现两个用链表表示,造成闭环,导致在get时会出现死循环。
但是在jdk1.8之后,采用的是位桶 + 链表/红黑树的方式,当某个位桶的链表的长度超过 8 的时候,这个链表就将转换成红黑树
HashMap 不会因为多线程 put 导致死循环(JDK 8 用 head 和 tail 来保证链表的顺序和之前一样;JDK 7 rehash 会倒置链表元素)。但是还是会存在数据丢失的弊端。
- 如果多个线程同时使用 put 方法添加元素,而且假设正好存在两个 put 的 key 发生了碰撞(根据 hash 值计算的 bucket 一样),那么根据 HashMap 的实现,这两个 key 会添加到数组的同一个位置,这样最终就会发生其中一个线程 put 的数据被覆盖
- 如果多个线程同时检测到元素个数超过数组大小 * loadFactor,这样就会发生多个线程同时对 Node 数组进行扩容,都在重新计算元素位置以及复制数据,但是最终只有一个线程扩容后的数组会赋给 table,也就是说其他线程的都会丢失,并且各自线程 put 的数据也丢失
所以无论是jdk1.7还是1.8,HashMap都是不安全的。
2.HashTable解决了线程安全的问题,为什么不用?
HashTable和HashMap底层的数据结构是一模一样的,它的出现就是为了解决线程安全的问题,它是线程安全的,但是它在所有涉及到多线程操作的都加上了synchronized关键字来锁住整个table,这就意味着所有的线程都在竞争一把锁,在多线程的环境下,它是安全的,但是无疑是效率低下的 。
3.ConcurrentHashMap /jdk 1.7
下面两张图十分可以十分形象看出HashTable和ConcurrenHashMap区别:
Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,锁分离技术,而每一个Segment元素存储的是HashEntry数组+链表,这个和HashMap的数据存储结构一样。Segment继承了ReentrantLock,所以它就是一种可重入锁(ReentrantLock)。在ConcurrentHashMap,一个Segment就是一个子哈希表,Segment里维护了一个HashEntry数组,并发环境下,对于不同Segment的数据进行操作是不用考虑锁竞争的。
jdk1.7中ConcurrentHashMap构造函数:
构造方法有三个参数,如果用户不指定则会使用默认值,initialCapacity(初始容量)为16,loadFactor(负载因子)为0.75,concurrentLevel(并发级别)为16
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//MAX_SEGMENTS 为1<<16=65536,也就是最大并发数为65536
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
//2的sshif次方等于ssize,例:ssize=16,sshift=4;ssize=32,sshif=5
int sshift = 0;
//ssize 为segments数组长度,根据concurrentLevel计算得出
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//segmentShift和segmentMask这两个变量在定位segment时会用到,后面会详细讲
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//计算cap的大小,即Segment中HashEntry的数组长度,cap也一定为2的n次方.
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
//创建segments数组并初始化第一个Segment,其余的Segment延迟初始化
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}
4.ConcurrentHashMap /jdk 1.8
JDK1.8的实现已经摒弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构来实现Node +CAS +Synchronized来保证并发安全进行实现, 虽然在JDK1.8中还能看到Segment的数据结构,但是已经简化了属性,只是为了兼容旧版本。继承了AbstractMap<K,V> 实现了ConcurrentMap<K,V>接口和Serializable接口。jdk8用内置锁内置锁synchronized来代替重入锁ReentrantLock .
1.Node数组初始化
只有在执行第一次
put
方法时才会调用
initTable()
初始化
Node
数组,实现如下:
if (tab == null || (n = tab.length) == 0)
tab = initTable();
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
2.put实现
当执行
put
方法插入数据时,根据key的hash值,在
Node
数组中找到相应的位置,实现如下:
1、如果相应位置的
Node
还未初始化,则通过CAS插入相应的数据;
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
2、如果相应位置的
Node
不为空,且当前该节点不处于移动状态,则对该节点加
synchronized
锁,如果该节点的
hash
不小于0,则遍历链表更新节点或插入新节点;
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
3、如果该节点是
TreeBin
类型的节点,说明是红黑树结构,则通过
putTreeVal
方法往红黑树中插入节点;
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
4、如果
binCount
不为0,说明
put
操作对数据产生了影响,如果当前链表的个数达到8个,则通过
treeifyBin
方法转化为红黑树,如果
oldVal
不为空,说明是一次更新操作,没有对元素个数产生影响,则直接返回旧值;
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
5、如果插入的是一个新节点,则执行
addCount()
方法尝试更新元素个数
baseCount
;
关于ConcurrenHashMap源码的解析请看https://www.jianshu.com/p/865c813f2726
二:ConcurrentSkipListMap
1.继承关系
ConcurrentSkipListMap extends AbstractMap<K,V>
implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable
- 继承了AbstractMap类,具有Map的所以功能属性
- 实现了ConcurrentNavigableMap:(ConcurrentNavigableMap继承了ConcurrentMap以及NavigableMap,而NavigableMap又继承了SortedMap,所以ConcurrentSkipListMap支持排序以及倒序排序,找到比当前元素大或小的元素这些功能), 实现了Cloneable(支持深拷贝),
- 实现了Serializable:(序列化反序列化)
ConcurrentSkipListMap是线程安全的有序的哈希表,适用于高并发的场景。 ConcurrentSkipListMap和TreeMap,它们虽然都是有序的哈希表。但是,第一,它们的线程安全机制不同,TreeMap是非线程安全的,而ConcurrentSkipListMap是线程安全的。第二,ConcurrentSkipListMap是通过跳表实现的,而TreeMap是通过红黑树实现的。
2.数据结构
ConcurrentSkipListMap是基于跳表的数据结构实现的。跳表(skip list) 是一种随机的数据结构, 基于上下左右都是链表的数据结构, 其效率可以比拟二叉树; 基本上 skip list 是在链表的上层增加多层索引链表(增加是随机的)实现的
- ConcurrentSkipListMap 的节点主要由 Node, Index, HeadIndex 构成;
- ConcurrentSkipListMap 的数据结构横向纵向都是链表
- 最下面那层链表是Node层(数据节点层), 上面几层都是Index层(索引)
- 从纵向链表来看, 最左边的是 HeadIndex 层, 右边的都是Index 层, 且每层的最底端都是对应Node, 纵向上的索引都是指向最底端的Node
3.部分源码简单解析
ConcurrentSkipMap中有三个十分重要的对象Node数据节点,HeadIndex索引层,Index索引
3.1 Node数据结点
static final class Node<K, V>{
final K key; // key 是 final 的, 说明节点一旦定下来, 除了删除, 不然不会改动 key 了
volatile Object value; // 对应的 value
volatile Node<K, V> next; // 下一个节点的引用
// 构造函数
public Node(K key, Object value, Node<K, V> next) {
this.key = key;
this.value = value;
this.next = next;
}
/**
* 创建一个标记节点(通过 this.value = this 来标记)
* 这个标记节点非常重要: 有了它, 就能对链表中间节点进行同时删除了插入
* ps: ConcurrentLinkedQueue 只能在头上, 尾端进行插入, 中间进行删除
*/
public Node(Node<K, V> next) {
this.key = null;
this.value = this;
this.next = next;
}
/**
* CAS 操作设置 Value
*/
boolean casValue(Object cmp, Object val){
return unsafe.compareAndSwapObject(this, valueOffset, cmp, val);
}
/**
* CAS 操作设置 next
*/
boolean casNext(Node<K, V> cmp, Node<K, V> val){
return unsafe.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
* 检测是否为标记节点
*/
boolean isMarker(){
return value == this;
}
/**
* 检测是否为 链表最左下角的 BASE_HEADER 节点
*/
boolean isBaseHeader(){
return value == BASE_HEADER;
}
/**
* 对节点追加一个标记节点, 为最终的删除做准备
*/
boolean appendMarker(Node<K, V> f){
return casNext(f, new Node<K, V>(f));
}
/**
* helpDelete 方法, 这个方法要么追加一个标记节点, 要么进行删除操作
*/
void helpDelete(Node<K, V> b, Node<K, V> f){
if(f == next && this == b.next){
if(f == null || f.value != f){ // 还没有对删除的节点进行节点 marker
casNext(f, new Node<K, V>(f));
}else{
b.casNext(this, f.next); // 删除 节点 b 与 f.next 之间的节点
}
}
}
/**
* 校验数据
*/
V getValidValue(){
Object v = value;
if(v == this || v == BASE_HEADER){
return null;
}
V vv = (V)v;
return vv;
}
AbstractMap.SimpleImmutableEntry<K, V> createSnapshot(){
Object v = value;
if(v == null || v == this || v == BASE_HEADER){
return null;
}
V vv = (V) v;
return new AbstractMap.SimpleImmutableEntry<K, V>(key, vv);
}
// UNSAFE mechanics
private static final Unsafe unsafe;
private static final long valueOffset;
private static final long nextOffset;
static {
try {
unsafe = UnSafeClass.getInstance();
Class<?> k = Node.class;
valueOffset = unsafe.objectFieldOffset(k.getDeclaredField("value"));
nextOffset = unsafe.objectFieldOffset(k.getDeclaredField("next"));
}catch (Exception e){
throw new Error(e);
}
}
..............
}
3.2 Index索引
static class Index<K, V>{
final Node<K, V> node; // 索引指向的节点, 纵向上所有索引指向链表最下面的节点
final Index<K, V> down; // 下边level层的 Index
volatile Index<K, V> right; // 右边的 Index
/**
* Creates index node with given values
* @param node
* @param down
* @param right
*/
public Index(Node<K, V> node, Index<K, V> down, Index<K, V> right) {
this.node = node;
this.down = down;
this.right = right;
}
final boolean casRight(Index<K, V> cmp, Index<K, V> val){
return unsafe.compareAndSwapObject(this, rightOffset, cmp, val);
}
final boolean indexesDeletedNode(){
return node.value == null;
}
/**
* 在 index 本身 和 succ 之间插入一个新的节点 newSucc
* @param succ
* @param newSucc
* @return
*/
final boolean link(Index<K, V> succ, Index<K, V> newSucc){
Node<K, V> n = node;
newSucc.right = succ;
return n.value != null && casRight(succ, newSucc);
}
/**
* 将当前的节点 index 设置其的 right 为 succ.right 等于删除 succ 节点
* @param succ
* @return
*/
final boolean unlink(Index<K, V> succ){
return node.value != null && casRight(succ, succ.right);
}
// Unsafe mechanics
private static final Unsafe unsafe;
private static final long rightOffset;
static {
try{
unsafe = UnSafeClass.getInstance();
Class<?> k = Index.class;
rightOffset = unsafe.objectFieldOffset(k.getDeclaredField("right"));
}catch (Exception e){
throw new Error(e);
}
3.3 HeadIndex头部索引
static final class HeadIndex<K,V> extends Index<K,V> {//继承了Index,只是多了个level表示索引层级
final int level;//索引层级
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}