Java锁及其应用

  • Post author:
  • Post category:java


一、乐观锁和悲观锁

乐观锁:不加锁,但是依据是否有被修改过来判断失败与否,适用于大量读取的场景。乐观锁的实现:CAS、java.uil。concurrent.atomic包下面的类。

悲观锁:加锁,锁住资源不让其他线程操作,保证只有占有锁的线程去更新资源。适用于大量写入的场景。悲观锁的实现:Synchronzed、ReentrantLock.

关于悲观锁的实现前面我们已经详细学习过了,这里主要介绍一下乐观锁的CAS。

CAS是compare and swap的简称,即先比较后设置,CAS是一个原子操作。使用场景是更新一个值,不依赖于加锁实现,可以接收CAS失败。局限性是只可以更新一个值,如AtomicReference,AtomicInterger需要同时更新时,无法做到原子性。

注:AtomicStampedReference处理ABA问题是通过比较时间来处理的。

二、数据库乐观锁和悲观锁

1、mysal数据库悲观锁(innodb引擎)

数据库的锁都是在事务里面实现的

select…lock in share mode

eg:select * from tableA where id =1 lock in share mode

a)是共享锁,在事务内生效

b)给符合条件的行添加的是共享锁,其他事务会话同样可以继续给这些行添加共享锁,在锁释放前其他事务无法对这些行进行删除和修改。

c)两个事务同时对一行加共享锁,无法更新,直到有一个事务对该行加共享锁。

d)如果两个加了共享锁的事务同时更新一行,会发生DeadLock死锁问题。

e)某行已有排他锁,无法继续添加共享锁。

f)不会阻塞正常读。

结论:享锁,事务都加,都能读。修改是惟一的,必须等待前一个事务 commit,才可

select …for update

a)排他锁,在事务内生效。

b)给符合条件的行添加的是排他锁,其他事务无法再加排他锁,在释放前,其他事务无法对这些行进行删除和修改。

c)某行已有共享锁,无法再加排他锁

d)第一个事务对某行加了排他锁,第二个事务继续加排他锁,第二个事务需要等待。

e)加锁有超时时间

f)不会阻塞正常读


总结


事务之间不允许其它排他锁或共享锁读取,修改更不可能

一次只能有一个排他锁执行 commit 之后,其它事务才可执行

不允许其它事务增加共享或排他锁读取。修改是惟一的,必须等待前一个事务 commit。

三、AQS的数据结构

AQS:AbstractQueuedSynchronizer,它提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类均是基于AQS来实现的,具体用法是通过继承AQS实现其模板方法,然后将子类作为同步组件的内部类。

AQS队列

AQS维护了一个volatile语义(支持多线程下的可见性)的共享资源变量state和一个FIFO线程等待队列(多线程竞争state被阻塞时会进入此队列)。

具体看下面两张图的介绍

在这里插入图片描述

waitStatus

四、ReentrantLock

简单加锁解锁过程

ReentrantLock简单加锁解锁过程

可重入实现

可重入实现

公平锁和非公平锁

在这里插入图片描述

四、并发容器及原理分析

1、HashMap的实现原理

这部分网上资源很多,讲解的也很详细,这里贴出几个


jdk1.8 HashMap工作原理和扩容机制(源码解析)



Java中HashMap底层实现原理(JDK1.8)源码分析



Hashmap的结构,1.7和1.8有哪些区别,史上最深入的分析


2、HashMap在高并发场景下的死锁分析

并发场景下由于在扩容中的每一个操作都有可能被操作系统挂起,当挂起的是扩容转换中的操作的时候,多线程会造成每个线程的操作数据混合在同一个数据结构中。

所以并发场景下使用高并发场景下的HashMap:ConcurrentHashMap

3、ConcurrentHashMap和HashMap的数据结构类似,但还是有一些差别,看图

ConcurrentHashMap存储结构

Segment继承ReentrantLock,具有加锁解锁的功能,segment有多少个元素就说明有多少把锁,扮演了分段锁,降低并发竞争。

只有写才会对对应的segment加锁,读操作不加锁。

其实和 1.8 HashMap 结构类似,当链表节点数超过指定阈值的话,也是会转换成红黑树的,大体结构也是一样的。

那么它到底是如何实现线程安全的?

答案:其中抛弃了原有的Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性。至于如何实现,那我继续看一下put方法逻辑

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    //1. 计算key的hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null))) //casTabAt()利用CAS操作设置table数组中索引为i的元素
                break;                   // no lock when adding to empty bin
        }
        //4. 当前正在扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //5. 当前为链表,在链表中插入新的键值对
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 6.当前为红黑树,将新的键值对插入到红黑树中
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 7.插入完键值对后再根据实际大小看是否需要转换成红黑树
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容 
    addCount(1L, binCount);
    return null;
}

整个过程为:

1、判断Node[]数组是否初始化,没有则进行初始化操作

2、通过hash定位数组的索引坐标,是否有Node节点,如果没有则使用CAS进行添加(链表的头节点),添加失败则进入下次循环。

3、检查到内部正在扩容,就帮助它一块扩容。

4、如果f!=null,则使用synchronized锁住f元素(链表/红黑树的头元素)。如果是Node(链表结构)则执行链表的添加操作;如果是TreeNode(树型结构)则执行树添加操作。

5、判断链表长度已经达到临界值8(默认值),当节点超过这个值就需要把链表转换为树结构。

6、如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容

spread(key,hashCode())

这方法作用重哈希,以减小Hash冲突

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

该方法主要是将key的hashCode的低16位于高16位进行异或运算,这样不仅能够使得hash值能够分散能够均匀减小hash冲突的概率,另外只用到了异或运算,在性能开销上也能兼顾。

initTable() 主要作用将tab进行初始化

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            // 1. 保证只有一个线程正在进行初始化操作
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // 2. 得出数组的大小
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    // 3. 这里才真正的初始化数组
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // 4. 计算数组中可用的大小:实际大小n*0.75(加载因子)
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

为了保证能够正确初始化,在第1步中会先通过if进行判断,若当前已经有一个线程正在初始化即sizeCtl值变为-1,这个时候其他线程在If判断为true从而调用Thread.yield()让出CPU时间片。正在进行初始化的线程会调用U.compareAndSwapInt方法将sizeCtl改为-1即正在初始化的状态。另外还需要注意的事情是,在第四步中会进一步计算数组中可用的大小即为数组实际大小n乘以加载因子0.75.可以看看这里乘以0.75是怎么算的,0.75为四分之三,这里n – (n >>> 2)是不是刚好是n-(1/4)n=(3/4)n,挺有意思的吧:)。如果选择是无参的构造器的话,这里在new Node数组的时候会使用默认大小为DEFAULT_CAPACITY(16),然后乘以加载因子0.75为12,也就是说数组的可用大小为12。


HashMap、Hashtable、ConccurentHashMap三者的区别


HashMap线程不安全,数组+链表+红黑树

Hashtable线程安全,锁住整个对象,数组+链表

ConccurentHashMap线程安全,CAS+同步锁,数组+链表+红黑树

HashMap的key,value均可为null,其他两个不行。


在JDK1.7和JDK1.8中的区别


在JDK1.8主要设计上的改进有以下几点:

1、不采用segment而采用node,锁住node来实现减小锁粒度。

2、设计了MOVED状态 当resize的中过程中 线程2还在put数据,线程2会帮助resize。

3、使用3个CAS操作来确保node的一些操作的原子性,这种方式代替了锁。

4、sizeCtl的不同值来代表不同含义,起到了控制的作用。

采用synchronized而不是ReentrantLock

参考文章:


ConcurrentHashMap17.和1.8



并发容器之ConcurrentHashMap(JDK 1.8版本)



Java的ConcurrentHashMap



版权声明:本文为weixin_44726656原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。