关于ConcurrentHashMap和ConcurrentSkipListMap

  • Post author:
  • Post category:其他




关于ConcurrentHashMap和ConcurrentSkipListMap

  • ConcurrentSkipListMap目标是代替synchronizedSortedMap(TreeMap) 解决并发环境下的线程安全问题,对应的非并发容器是TreeMap

  • ConcurrentHashMap也是位于juc包中,目标是代替Hashtable、synchronizedMap,支持复合操作 ,对应的非并发容器是HashMap

  • TreeMap底层数据结构是一个红黑树,HashMap底层是数组+链表+红黑树,TreeMap集合元素有序(Key排序),HashMap效率更高



一:ConcurrentHashMap



1.为什么HashMap不安全?

在多线程情况下HashMap是不安全的。在jdk1.8之前,在put的时候,插入的元素超过了容量(由负载因子决定)的范围就会触发扩容操作,就是rehash,这个会重新将原数组的内容重新hash到新的扩容数组中,在多线程的环境下,存在同时其他的元素也在进行put操作,如果hash值相同,可能出现同时在同一数组下出现两个用链表表示,造成闭环,导致在get时会出现死循环。

但是在jdk1.8之后,采用的是位桶 + 链表/红黑树的方式,当某个位桶的链表的长度超过 8 的时候,这个链表就将转换成红黑树

HashMap 不会因为多线程 put 导致死循环(JDK 8 用 head 和 tail 来保证链表的顺序和之前一样;JDK 7 rehash 会倒置链表元素)。但是还是会存在数据丢失的弊端。

  • 如果多个线程同时使用 put 方法添加元素,而且假设正好存在两个 put 的 key 发生了碰撞(根据 hash 值计算的 bucket 一样),那么根据 HashMap 的实现,这两个 key 会添加到数组的同一个位置,这样最终就会发生其中一个线程 put 的数据被覆盖
  • 如果多个线程同时检测到元素个数超过数组大小 * loadFactor,这样就会发生多个线程同时对 Node 数组进行扩容,都在重新计算元素位置以及复制数据,但是最终只有一个线程扩容后的数组会赋给 table,也就是说其他线程的都会丢失,并且各自线程 put 的数据也丢失

所以无论是jdk1.7还是1.8,HashMap都是不安全的。



2.HashTable解决了线程安全的问题,为什么不用?

HashTable和HashMap底层的数据结构是一模一样的,它的出现就是为了解决线程安全的问题,它是线程安全的,但是它在所有涉及到多线程操作的都加上了synchronized关键字来锁住整个table,这就意味着所有的线程都在竞争一把锁,在多线程的环境下,它是安全的,但是无疑是效率低下的 。



3.ConcurrentHashMap /jdk 1.7

下面两张图十分可以十分形象看出HashTable和ConcurrenHashMap区别:

在这里插入图片描述
在这里插入图片描述

Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,锁分离技术,而每一个Segment元素存储的是HashEntry数组+链表,这个和HashMap的数据存储结构一样。Segment继承了ReentrantLock,所以它就是一种可重入锁(ReentrantLock)。在ConcurrentHashMap,一个Segment就是一个子哈希表,Segment里维护了一个HashEntry数组,并发环境下,对于不同Segment的数据进行操作是不用考虑锁竞争的。

在这里插入图片描述

jdk1.7中ConcurrentHashMap构造函数:

构造方法有三个参数,如果用户不指定则会使用默认值,initialCapacity(初始容量)为16,loadFactor(负载因子)为0.75,concurrentLevel(并发级别)为16


   public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
            if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
            //MAX_SEGMENTS 为1<<16=65536,也就是最大并发数为65536
            if (concurrencyLevel > MAX_SEGMENTS)
                concurrencyLevel = MAX_SEGMENTS;
            //2的sshif次方等于ssize,例:ssize=16,sshift=4;ssize=32,sshif=5
           int sshift = 0;
          //ssize 为segments数组长度,根据concurrentLevel计算得出
          int ssize = 1;
          while (ssize < concurrencyLevel) {
              ++sshift;
              ssize <<= 1;
          }
          //segmentShift和segmentMask这两个变量在定位segment时会用到,后面会详细讲
          this.segmentShift = 32 - sshift;
          this.segmentMask = ssize - 1;
          if (initialCapacity > MAXIMUM_CAPACITY)
              initialCapacity = MAXIMUM_CAPACITY;
          //计算cap的大小,即Segment中HashEntry的数组长度,cap也一定为2的n次方.
          int c = initialCapacity / ssize;
          if (c * ssize < initialCapacity)
              ++c;
          int cap = MIN_SEGMENT_TABLE_CAPACITY;
          while (cap < c)
              cap <<= 1;
          //创建segments数组并初始化第一个Segment,其余的Segment延迟初始化
          Segment<K,V> s0 =
              new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                               (HashEntry<K,V>[])new HashEntry[cap]);
          Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
          UNSAFE.putOrderedObject(ss, SBASE, s0); 
          this.segments = ss;
      }



4.ConcurrentHashMap /jdk 1.8

JDK1.8的实现已经摒弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构来实现Node +CAS +Synchronized来保证并发安全进行实现, 虽然在JDK1.8中还能看到Segment的数据结构,但是已经简化了属性,只是为了兼容旧版本。继承了AbstractMap<K,V> 实现了ConcurrentMap<K,V>接口和Serializable接口。jdk8用内置锁内置锁synchronized来代替重入锁ReentrantLock .

在这里插入图片描述



1.Node数组初始化

只有在执行第一次

put

方法时才会调用

initTable()

初始化

Node

数组,实现如下:

if (tab == null || (n = tab.length) == 0)
                tab = initTable();
private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }



2.put实现

当执行

put

方法插入数据时,根据key的hash值,在

Node

数组中找到相应的位置,实现如下:

1、如果相应位置的

Node

还未初始化,则通过CAS插入相应的数据;

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
        break;                   // no lock when adding to empty bin
}

2、如果相应位置的

Node

不为空,且当前该节点不处于移动状态,则对该节点加

synchronized

锁,如果该节点的

hash

不小于0,则遍历链表更新节点或插入新节点;

if (fh >= 0) {
    binCount = 1;
    for (Node<K,V> e = f;; ++binCount) {
        K ek;
        if (e.hash == hash &&
            ((ek = e.key) == key ||
             (ek != null && key.equals(ek)))) {
            oldVal = e.val;
            if (!onlyIfAbsent)
                e.val = value;
            break;
        }
        Node<K,V> pred = e;
        if ((e = e.next) == null) {
            pred.next = new Node<K,V>(hash, key, value, null);
            break;
        }
    }
}

3、如果该节点是

TreeBin

类型的节点,说明是红黑树结构,则通过

putTreeVal

方法往红黑树中插入节点;

else if (f instanceof TreeBin) {
    Node<K,V> p;
    binCount = 2;
    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
        oldVal = p.val;
        if (!onlyIfAbsent)
            p.val = value;
    }
}

4、如果

binCount

不为0,说明

put

操作对数据产生了影响,如果当前链表的个数达到8个,则通过

treeifyBin

方法转化为红黑树,如果

oldVal

不为空,说明是一次更新操作,没有对元素个数产生影响,则直接返回旧值;

if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    if (oldVal != null)
        return oldVal;
    break;
}   

5、如果插入的是一个新节点,则执行

addCount()

方法尝试更新元素个数

baseCount

关于ConcurrenHashMap源码的解析请看https://www.jianshu.com/p/865c813f2726



二:ConcurrentSkipListMap



1.继承关系

ConcurrentSkipListMap extends AbstractMap<K,V>
    implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable
  • 继承了AbstractMap类,具有Map的所以功能属性
  • 实现了ConcurrentNavigableMap:(ConcurrentNavigableMap继承了ConcurrentMap以及NavigableMap,而NavigableMap又继承了SortedMap,所以ConcurrentSkipListMap支持排序以及倒序排序,找到比当前元素大或小的元素这些功能), 实现了Cloneable(支持深拷贝),
  • 实现了Serializable:(序列化反序列化)

ConcurrentSkipListMap是线程安全的有序的哈希表,适用于高并发的场景。 ConcurrentSkipListMap和TreeMap,它们虽然都是有序的哈希表。但是,第一,它们的线程安全机制不同,TreeMap是非线程安全的,而ConcurrentSkipListMap是线程安全的。第二,ConcurrentSkipListMap是通过跳表实现的,而TreeMap是通过红黑树实现的。



2.数据结构

在这里插入图片描述

ConcurrentSkipListMap是基于跳表的数据结构实现的。跳表(skip list) 是一种随机的数据结构, 基于上下左右都是链表的数据结构, 其效率可以比拟二叉树; 基本上 skip list 是在链表的上层增加多层索引链表(增加是随机的)实现的

  • ConcurrentSkipListMap 的节点主要由 Node, Index, HeadIndex 构成;
  • ConcurrentSkipListMap 的数据结构横向纵向都是链表
  • 最下面那层链表是Node层(数据节点层), 上面几层都是Index层(索引)
  • 从纵向链表来看, 最左边的是 HeadIndex 层, 右边的都是Index 层, 且每层的最底端都是对应Node, 纵向上的索引都是指向最底端的Node



3.部分源码简单解析

ConcurrentSkipMap中有三个十分重要的对象Node数据节点,HeadIndex索引层,Index索引



3.1 Node数据结点

static final class Node<K, V>{
    final K key;  // key 是 final 的, 说明节点一旦定下来, 除了删除, 不然不会改动 key 了
    volatile Object value; // 对应的 value
    volatile Node<K, V> next; // 下一个节点的引用
    
    // 构造函数
    public Node(K key, Object value, Node<K, V> next) {
        this.key = key;
        this.value = value;
        this.next = next;
    }
    
    /**
     * 创建一个标记节点(通过 this.value = this 来标记)
     * 这个标记节点非常重要: 有了它, 就能对链表中间节点进行同时删除了插入
     * ps: ConcurrentLinkedQueue 只能在头上, 尾端进行插入, 中间进行删除 
     */
    public Node(Node<K, V> next) {
        this.key = null;
        this.value = this;
        this.next = next;
    }

    /**
     * CAS 操作设置 Value
     */
    boolean casValue(Object cmp, Object val){
        return unsafe.compareAndSwapObject(this, valueOffset, cmp, val);
    }

    /**
     * CAS 操作设置 next
     */
    boolean casNext(Node<K, V> cmp, Node<K, V> val){
        return unsafe.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    /**
     * 检测是否为标记节点
     */
    boolean isMarker(){
        return value == this;
    }

    /**
     * 检测是否为 链表最左下角的 BASE_HEADER 节点
     */
    boolean isBaseHeader(){
        return value == BASE_HEADER;
    }

    /**
     * 对节点追加一个标记节点, 为最终的删除做准备
     */
    boolean appendMarker(Node<K, V> f){
        return casNext(f, new Node<K, V>(f));
    }

    /**
     * helpDelete 方法, 这个方法要么追加一个标记节点, 要么进行删除操作
     */
    void helpDelete(Node<K, V> b, Node<K, V> f){
        if(f == next && this == b.next){
            if(f == null || f.value != f){ // 还没有对删除的节点进行节点 marker
                casNext(f, new Node<K, V>(f));
            }else{
                b.casNext(this, f.next); // 删除 节点 b 与 f.next 之间的节点
            }
        }
    }

    /**
     * 校验数据
     */
    V getValidValue(){
        Object v = value;
        if(v == this || v == BASE_HEADER){
            return null;
        }
        V vv = (V)v;
        return vv;
    }

    AbstractMap.SimpleImmutableEntry<K, V> createSnapshot(){
        Object v = value;
        if(v == null || v == this || v == BASE_HEADER){
            return null;
        }
        V vv = (V) v;
        return new AbstractMap.SimpleImmutableEntry<K, V>(key, vv);
    }

    // UNSAFE mechanics
    private static final Unsafe unsafe;
    private static final long valueOffset;
    private static final long nextOffset;

    static {
        try {
            unsafe = UnSafeClass.getInstance();
            Class<?> k = Node.class;
            valueOffset = unsafe.objectFieldOffset(k.getDeclaredField("value"));
            nextOffset = unsafe.objectFieldOffset(k.getDeclaredField("next"));
        }catch (Exception e){
            throw new Error(e);
        }
    }
  ..............
}



3.2 Index索引

static class Index<K, V>{

    final Node<K, V> node; // 索引指向的节点, 纵向上所有索引指向链表最下面的节点
    final Index<K, V> down; // 下边level层的 Index
    volatile Index<K, V> right; // 右边的  Index

    /**
     * Creates index node with given values
     * @param node
     * @param down
     * @param right
     */
    public Index(Node<K, V> node, Index<K, V> down, Index<K, V> right) {
        this.node = node;
        this.down = down;
        this.right = right;
    }

    final boolean casRight(Index<K, V> cmp, Index<K, V> val){
        return unsafe.compareAndSwapObject(this, rightOffset, cmp, val);
    }

    final boolean indexesDeletedNode(){
        return node.value == null;
    }

    /**
     * 在 index 本身 和 succ 之间插入一个新的节点 newSucc
     * @param succ
     * @param newSucc
     * @return
     */
    final boolean link(Index<K, V> succ, Index<K, V> newSucc){
        Node<K, V> n = node;
        newSucc.right = succ;
        return n.value != null  && casRight(succ, newSucc);
    }

    /**
     * 将当前的节点 index 设置其的 right 为 succ.right 等于删除 succ 节点
     * @param succ
     * @return
     */
    final boolean unlink(Index<K, V> succ){
        return node.value != null && casRight(succ, succ.right);
    }

    // Unsafe mechanics
    private static final Unsafe unsafe;
    private static final long rightOffset;

    static {
        try{
            unsafe = UnSafeClass.getInstance();
            Class<?> k = Index.class;
            rightOffset = unsafe.objectFieldOffset(k.getDeclaredField("right"));
        }catch (Exception e){
            throw new Error(e);
        }
    



3.3 HeadIndex头部索引

static final class HeadIndex<K,V> extends Index<K,V> {//继承了Index,只是多了个level表示索引层级
        final int level;//索引层级
        HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
            super(node, down, right);
            this.level = level;
        }
    }



版权声明:本文为ITgagaga原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。