一、乐观锁和悲观锁
乐观锁:不加锁,但是依据是否有被修改过来判断失败与否,适用于大量读取的场景。乐观锁的实现:CAS、java.uil。concurrent.atomic包下面的类。
悲观锁:加锁,锁住资源不让其他线程操作,保证只有占有锁的线程去更新资源。适用于大量写入的场景。悲观锁的实现:Synchronzed、ReentrantLock.
关于悲观锁的实现前面我们已经详细学习过了,这里主要介绍一下乐观锁的CAS。
CAS是compare and swap的简称,即先比较后设置,CAS是一个原子操作。使用场景是更新一个值,不依赖于加锁实现,可以接收CAS失败。局限性是只可以更新一个值,如AtomicReference,AtomicInterger需要同时更新时,无法做到原子性。
注:AtomicStampedReference处理ABA问题是通过比较时间来处理的。
二、数据库乐观锁和悲观锁
1、mysal数据库悲观锁(innodb引擎)
数据库的锁都是在事务里面实现的
select…lock in share mode
eg:select * from tableA where id =1 lock in share mode
a)是共享锁,在事务内生效
b)给符合条件的行添加的是共享锁,其他事务会话同样可以继续给这些行添加共享锁,在锁释放前其他事务无法对这些行进行删除和修改。
c)两个事务同时对一行加共享锁,无法更新,直到有一个事务对该行加共享锁。
d)如果两个加了共享锁的事务同时更新一行,会发生DeadLock死锁问题。
e)某行已有排他锁,无法继续添加共享锁。
f)不会阻塞正常读。
结论:享锁,事务都加,都能读。修改是惟一的,必须等待前一个事务 commit,才可
select …for update
a)排他锁,在事务内生效。
b)给符合条件的行添加的是排他锁,其他事务无法再加排他锁,在释放前,其他事务无法对这些行进行删除和修改。
c)某行已有共享锁,无法再加排他锁
d)第一个事务对某行加了排他锁,第二个事务继续加排他锁,第二个事务需要等待。
e)加锁有超时时间
f)不会阻塞正常读
总结
事务之间不允许其它排他锁或共享锁读取,修改更不可能
一次只能有一个排他锁执行 commit 之后,其它事务才可执行
不允许其它事务增加共享或排他锁读取。修改是惟一的,必须等待前一个事务 commit。
三、AQS的数据结构
AQS:AbstractQueuedSynchronizer,它提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类均是基于AQS来实现的,具体用法是通过继承AQS实现其模板方法,然后将子类作为同步组件的内部类。
AQS维护了一个volatile语义(支持多线程下的可见性)的共享资源变量state和一个FIFO线程等待队列(多线程竞争state被阻塞时会进入此队列)。
具体看下面两张图的介绍
四、ReentrantLock
简单加锁解锁过程
可重入实现
公平锁和非公平锁
四、并发容器及原理分析
1、HashMap的实现原理
这部分网上资源很多,讲解的也很详细,这里贴出几个
jdk1.8 HashMap工作原理和扩容机制(源码解析)
Java中HashMap底层实现原理(JDK1.8)源码分析
Hashmap的结构,1.7和1.8有哪些区别,史上最深入的分析
2、HashMap在高并发场景下的死锁分析
并发场景下由于在扩容中的每一个操作都有可能被操作系统挂起,当挂起的是扩容转换中的操作的时候,多线程会造成每个线程的操作数据混合在同一个数据结构中。
所以并发场景下使用高并发场景下的HashMap:ConcurrentHashMap
3、ConcurrentHashMap和HashMap的数据结构类似,但还是有一些差别,看图
Segment继承ReentrantLock,具有加锁解锁的功能,segment有多少个元素就说明有多少把锁,扮演了分段锁,降低并发竞争。
只有写才会对对应的segment加锁,读操作不加锁。
其实和 1.8 HashMap 结构类似,当链表节点数超过指定阈值的话,也是会转换成红黑树的,大体结构也是一样的。
那么它到底是如何实现线程安全的?
答案:其中抛弃了原有的Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性。至于如何实现,那我继续看一下put方法逻辑
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//1. 计算key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null))) //casTabAt()利用CAS操作设置table数组中索引为i的元素
break; // no lock when adding to empty bin
}
//4. 当前正在扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
//5. 当前为链表,在链表中插入新的键值对
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 6.当前为红黑树,将新的键值对插入到红黑树中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 7.插入完键值对后再根据实际大小看是否需要转换成红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容
addCount(1L, binCount);
return null;
}
整个过程为:
1、判断Node[]数组是否初始化,没有则进行初始化操作
2、通过hash定位数组的索引坐标,是否有Node节点,如果没有则使用CAS进行添加(链表的头节点),添加失败则进入下次循环。
3、检查到内部正在扩容,就帮助它一块扩容。
4、如果f!=null,则使用synchronized锁住f元素(链表/红黑树的头元素)。如果是Node(链表结构)则执行链表的添加操作;如果是TreeNode(树型结构)则执行树添加操作。
5、判断链表长度已经达到临界值8(默认值),当节点超过这个值就需要把链表转换为树结构。
6、如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容
spread(key,hashCode())
这方法作用重哈希,以减小Hash冲突
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
该方法主要是将key的hashCode的低16位于高16位进行异或运算,这样不仅能够使得hash值能够分散能够均匀减小hash冲突的概率,另外只用到了异或运算,在性能开销上也能兼顾。
initTable() 主要作用将tab进行初始化
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 1. 保证只有一个线程正在进行初始化操作
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 2. 得出数组的大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 3. 这里才真正的初始化数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 4. 计算数组中可用的大小:实际大小n*0.75(加载因子)
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
为了保证能够正确初始化,在第1步中会先通过if进行判断,若当前已经有一个线程正在初始化即sizeCtl值变为-1,这个时候其他线程在If判断为true从而调用Thread.yield()让出CPU时间片。正在进行初始化的线程会调用U.compareAndSwapInt方法将sizeCtl改为-1即正在初始化的状态。另外还需要注意的事情是,在第四步中会进一步计算数组中可用的大小即为数组实际大小n乘以加载因子0.75.可以看看这里乘以0.75是怎么算的,0.75为四分之三,这里n – (n >>> 2)是不是刚好是n-(1/4)n=(3/4)n,挺有意思的吧:)。如果选择是无参的构造器的话,这里在new Node数组的时候会使用默认大小为DEFAULT_CAPACITY(16),然后乘以加载因子0.75为12,也就是说数组的可用大小为12。
HashMap、Hashtable、ConccurentHashMap三者的区别
HashMap线程不安全,数组+链表+红黑树
Hashtable线程安全,锁住整个对象,数组+链表
ConccurentHashMap线程安全,CAS+同步锁,数组+链表+红黑树
HashMap的key,value均可为null,其他两个不行。
在JDK1.7和JDK1.8中的区别
在JDK1.8主要设计上的改进有以下几点:
1、不采用segment而采用node,锁住node来实现减小锁粒度。
2、设计了MOVED状态 当resize的中过程中 线程2还在put数据,线程2会帮助resize。
3、使用3个CAS操作来确保node的一些操作的原子性,这种方式代替了锁。
4、sizeCtl的不同值来代表不同含义,起到了控制的作用。
采用synchronized而不是ReentrantLock
参考文章:
ConcurrentHashMap17.和1.8
并发容器之ConcurrentHashMap(JDK 1.8版本)
Java的ConcurrentHashMap