1.什么是队列同步器
用来构建锁或者其他同步组件的基础框架,使用int型的成员变量来
表示同步的状态,线程以及等待状态等信息被封装成了Node节点,而这些Node节点采用先进先出的队列来进行排队管理。
2.队列同步器所属包
package java.util.concurrent.locks;
3.队列同步器继承与实现关系
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
4.同步队列Node的数据结构
双向队列-数据结构图:
解释:对于这个链式队列结构,没有成功获取同步状态的线程将会成为节点加入到队列的尾部,而队列中的节点获取锁资源是从头部开始的
5.节点的等待状态
static final class Node {
/*
*指示节点在共享模式中等待的标记.
*/
static final Node SHARED = new Node();
/*
*指示节点在独占模式中等待的标记.
*/
static final Node EXCLUSIVE = null;
/*
*由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化
*/
static final int CANCELLED = 1;
/*
*后继结点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,
*将会通知后继节点,使后继节点的线程得以运行
*/
static final int SIGNAL = -1;
/*
*节点在等待队列中,节点线程等待在Condition上,
*当其他线程对Condition调用了Signal()方法后,该节点将会从等待队列转移到同步队列中,加入到对同步状态的获取中
*/
static final int CONDITION = -2;
/*
*表示下一次共享式同步状态获取将会无条件地被传播下去
*/
static final int PROPAGATE = -3;
//线程的等待状态volatile int waitStatus;
/**
* 前驱节点,当节点加入到队列尾部时被设置(尾部添加)
*/
volatile Node prev;
/**
* 后继节点
*/
volatile Node next;
/**
* 获取同步状态的线程
*/
volatile Thread thread;
/**
* 等待队列中的后继结点。
* 如果当前节点是共享的,那么这个字段将是一个SHARED常量,也就是节点类型(独占和共享)和等待队列中的后继结点共用同一个字段
*/
Node nextWaiter;
/**
* 等待的节点是否共享
*/
final boolean isShared() { return nextWaiter == SHARED;}
/**
* 获取前驱节点,但需要判断下前驱节点是否为空,否则将会抛出空指针异常
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null) throw new NullPointerException();
else return p; }
Node() { // 用于建立初始的头结点和共享标记}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
6.常用属性
/**
* 等待队列的头结点,采用懒初始化的方式(意思就是只有在需要头结点的时候才进行初始化)。
* 如果头结点存在,那么waitStatus状态就不会被标记位CANCELLED
*/
private transient volatile Node head;
/**
* 等待队列的尾节点,采用懒初始化的方式。
* 使用enq方法进行添加新的节点或修改节点。
*/
private transient volatile Node tail;
/**
* 同步状态值
*/
private volatile int state;
7.原子方法
getState()
方法:
/**
* 获取同步状态的当前值。操作具有volatile读的语义。
*/
protected final int getState() {
return state;
}
setState()
方法:
/**
* 设置同步状态的值。操作具有volatile写的语义。
*/
protected final void setState(int newState) {
state = newState;
}
compareAndSetState()
方法:
/**
* 如果当前状态值等于预期值,那么将以原子的方式将当前同步状态的值更新给定值。这个操作具有volatile读写语义。
* 原子性指要么更新过程成功,要么过程中断,还是原样。
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
解释:这个方法
采用cas原则来保证状态设置的原子性
,cas本身采用乐观锁的方式来实现的,从而不会产生线程的阻塞问题。
由于采用volatile读写语义,那么线程访问是保持一致性的
。
理由:因为volatile实现原则是将缓存中的数据写入到主存中的。所以每个线程读写的数据都是从主存中获取来的,而不是每个线程缓存的数据,所以保证了一致性。
——————————————————————————————————————
需要明确的问题:
(1)
问题 :乐观锁和CAS的区别?
首先乐观锁的思想是读多写少,认为并发写的可能性不高。
乐观锁的策略:
① 就是在写之前先读出版本号,然后加锁,保证当前的读出来的值是具有原子性(未被修改)。
② 然后比较当前加锁的版本号和取时的版本号是否一样,如果一样,那么说明别人没有写入值,那么我们就更新值,然后更新版本号。
③ 如果不一样,那么说明在我们取值加锁之后别人更新了值。我们就要重新读取版本号,然后加锁,比较版本号,直到当前加锁的版本号和取时的版本号不一样,才进行写入更新值,并更新版本号。
CAS的策略:
比较当前值和传入的期望值是否一样,如果一样,那么就更新当前值为需要更新的值。CAS只是一种乐观锁的实现而已。
(2)
问题 :乐观锁的版本号和 “ABA”问题?
CAS虽然在前面提到了可以高效的解决了并发的原子操作,但是同样也存在很多问题。
“ABA”问题:CAS操作就是检查值是否发生变化,如果发生变化则重新读取检查,如果没有发生变化则更新值。但是可能会出现一个值原来为A,被一个线程读取后更新为B,然后又被另一个线程读取更新为A,那么其他线程过来进行检查更新时,发现值并没有发生变化,但是实际上前面已经被更新过了,那么就采用版本号1A -> 2B -> 3A ,这样1A和3A由于版本号就不会出现发生变化却使其他线程未检查出发生变化。
(3)
问题:同步状态值(state)干什么用的?
/**
* 同步状态值
*/
private volatile int state;
①首先在多线程竞争的条件下,采用CAS的方式来获取和设置同步状态值(state)。
②同步状态值state代表获取锁的线程加锁的次数,如果线程获取锁,那么state加1变为1。如果线程释放锁,那么state减1变为0。
③volatile实现原则还是将缓存中的数据写入到主存,每个线程都是从主存中读取值。保证了数据的一致性。
8.独占式同步状态的获取
tryAcquire()
方法:查询是否允许它在独占模式下获取对象状态,如果允许则获取它。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
acquire(int arg)
方法:以独占模式获取对象,忽略中断。
/**
* 以独占模式获取对象,忽略中断。
*/
public final void acquire(int arg) {
//如果没获取到队列的状态并且不停循环的方式获取线程的结果中断失败标志为true
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//中断线程
selfInterrupt();
}
方法结构图:
acquire
|—– tryAcquire
|—– addWaiter
|—– enq
|—– acquiredQueued
addWaiter
方法:为当前线程和给定的模式创建节点
/**
* 为当前的线程和给定的模式创建节点
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//获取尾节点
Node pred = tail;
//如果尾节点不为空
if (pred != null) {
//设置创建节点的前驱为尾节点
node.prev = pred;
//判断当前状态值是否等于预期的尾节点pred,如果等于就将当前同步状态值更新尾节点为node
if (compareAndSetTail(pred, node)) {
//尾节点的后继结点为node
pred.next = node;
return node;
}
}
//将节点插入到队列
enq(node);
return node;
}
注意:方法compareAndSetTail采用原子的方式进行更新尾节点。
enq
方法:将节点插入到同步队列尾部
/**
* 将节点插入到队列
*/
private Node enq(final Node node) {
for (;;) {
//获取尾节点为t
Node t = tail;
//如果尾节点为空
if (t == null) {
/*
* 初始化一个头结点作为尾节点t
* 如果节点node第一次入队列,会创建新的节点new node()作为头节点
* 而此时队列的尾节点和头结点是同一个节点
*/
if (compareAndSetHead(new Node()))
tail = head;
} else {
//设置节点node的前驱为t
node.prev = t;
//判断node节点是否是预期的尾节点t,如果是就更新尾节点t为node
if (compareAndSetTail(t, node)) {
//尾节点的后继结点为node
t.next = node;
return t;
}
}
}
}
acquiredQueued
方法:以死循环的方式不停的获取同步状态,如果获取到返回true,否则false
/**
* 以死循环的方式不停的获取同步状态
* 如果获取不到则阻塞节点中的线程,
* 而被阻塞线程只能依靠其前驱节点的出队列操作或者阻塞线程中断
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取节点node的前驱节点p
final Node p = node.predecessor();
/* 如果p为头节点,并且已经成功获取了同步对象
* 获取成功就将队列头结点取出,
* 将头结点的后继结点作为头结点,
* 删除头结点与后继结点的关系
*/
if (p == head && tryAcquire(arg)) {
//设置头节点为node
setHead(node);
//前驱节点p(头结点)的后继结点设置为null
p.next = null; // help GC
//失败标志设置为false
failed = false;
//返回中断失败标志false
return interrupted;
}
//检查并更新无法获取的节点的状态并且检查是否中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//设置中断失败标志为true
interrupted = true;
}
} finally {
//如果中断标志位为true
if (failed)
//取消正在尝试执行的请求
cancelAcquire(node);
}
}
问题:为什么死循环中,只有前驱节点为头节点才能够尝试获取同步状态?
首先清楚的是头节点是成功获取同步状态的节点。当头节点的线程释放了同步状态之后,将唤醒其后继节点,后继节点唤醒时也得检查其前驱节点是否为头节点。(而先进先出队列本身就是这种前驱后继关系,不可能当前节点的前驱节点没有被唤醒获取同步状态,就直接跳到当前节点。其次如果当前节点的前驱节点不是头节点这个限制条件,那么当前节点的前驱节点还有其前驱节点(就是其前前驱节点没有被唤醒获取同步状态),还是不满足从队列头开始获取同步状态)。
acquire方法的流程图:
tryRelease(int arg)
:在独占模式下,尝试释放对象的状态
//独占模式下,尝试释放对象的状态
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
release(int arg)
:在独占模式下,释放对象的状态。
/**
* 释放对象的状态
*/
public final boolean release(int arg) {
//是否允许尝试在独占模式下释放对象的状态
if (tryRelease(arg)) {
//获取队列同步器的头结点
Node h = head;
//如果头结点不为null并且头结点的等待状态不为0
if (h != null && h.waitStatus != 0)
//唤醒头结点的后继结点
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor(Node node)
:唤醒输入节点node的后继节点
/**
* 唤醒节点的后继结点
*/
private void unparkSuccessor(Node node) {
//获取线程节点的等待状态值
int ws = node.waitStatus;
//如果等待状态值小于0
if (ws < 0)
//使用CAS方式比较更新设置节点的等待状态值为0
compareAndSetWaitStatus(node, ws, 0);
//获取当前节点的后继结点
Node s = node.next;
//如果后继结点为null或者后继节点的等待状态值大于0
if (s == null || s.waitStatus > 0) {
//设置当前节点的后继后继结点为null
s = null;
//从尾节点向前遍历
for (Node t = tail; t != null && t != node; t = t.prev)
//节点的等待状态小于等于0(需要执行)
if (t.waitStatus <= 0)
s = t;
}
//让查找到的线程节点可用
if (s != null)
LockSupport.unpark(s.thread);
}
release方法的流程图:
9.共享式同步状态的获取
tryAcquireShared()
方法:查询是否允许在共享模式下获取对象的状态,如果允许,那么就获取它
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
acquireShared()
方法:共享模式获取对象,忽略中断
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
doAcquireShared()
方法:在共享模式下,以死循环的方式不停地获取同步状态
private void doAcquireShared(int arg) {
//获取共享模式下创建的节点
final Node node = addWaiter(Node.SHARED);
//失败标志位默认为true
boolean failed = true;
try {
//中断标志位默认为false
boolean interrupted = false;
for (;;) {
//获取节点的前驱节点p
final Node p = node.predecessor();
//如果p为头结点
if (p == head) {
//是否允许在共享模式下获取对象
int r = tryAcquireShared(arg);
if (r >= 0) {
//设置队列的头结点
setHeadAndPropagate(node, r);
p.next = null; // help GC
//如果中断标志位为true,则中断当前线程
if (interrupted)
selfInterrupt();
//设置失败标志位为false
failed = false;
return;
}
}
//检查并更新无法获取的节点的状态并且检查是否中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//设置中断失败标志为true
interrupted = true;
}
} finally {
//如果中断标志位为true
if (failed)
//取消正在尝试执行的请求
cancelAcquire(node);
}
}
解释:在共享模式获取自旋过程中,成功获取同步状态并退出自旋的条件是tryAcquireShared() 方法返回值大于等于0。如果当前节点的前驱节点为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出。
tryReleaseShared(int arg)
方法:在共享模式下,尝试释放对象的状态。
//共享模式下尝试释放对象的状态
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
releaseShared(int arg)
方法:在共享模式下,释放对象状态。
//尝试在共享模式下释放对象的状态
public final boolean releaseShared(int arg) {
//是否允许在共享模式下获取对象的同步状态
if (tryReleaseShared(arg)) {
//在共享模式下释放对象的状态
doReleaseShared();
return true;
}
return false;
}
doReleaseShared()
方法:在共享模式下,释放对象。
/**
* 在共享模式下释放对象
*/
private void doReleaseShared() {
//死循环
for (;;) {
//获取头结点
Node h = head;
//如果头节点不为null并且头结点不等于尾节点
if (h != null && h != tail) {
//获取头结点的等待状态值
int ws = h.waitStatus;
//如果等待状态值为-1,那么通知等待的后继线程节点
if (ws == Node.SIGNAL) {
/**
* 使用CAS方式来比较线程节点h的等待状态值为-1,
* 那么将h的等待状态值更新为0(就是h还在同步队列中)
*/
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒后继结点h
unparkSuccessor(h);
}//如果等待状态值为0(h节点在同步队列中)并且节点h的等待状态值为0,那么就将等待状态值设置为-3
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) //如果循环过程中,头结点发生改变,中断循环
break;
}
}
10.其他AQS方法
getExclusiveQueuedThreads()
方法:在独占模式下,获取同步队列中等待的线程数。
//在独占模式下,获取队列中等待的线程集合
public final Collection<Thread> getExclusiveQueuedThreads() {
//创建一个线程类型数组列表
ArrayList<Thread> list = new ArrayList<Thread>();
//从尾节点开始向前遍历
for (Node p = tail; p != null; p = p.prev) {
//线程节点的状态是否处于独占状态
if (!p.isShared()) {
//获取独占状态的节点线程
Thread t = p.thread;
//如果线程不为null,那么就将线程加入到数组列表中
if (t != null)
list.add(t);
}
}
return list;
}
getQueueLength()
方法:获取队列中等待的线程数
//获取队列中等待的线程数
public final int getQueueLength() {
int n = 0;
//从尾节点开始向前遍历
for (Node p = tail; p != null; p = p.prev) {
//如果线程节点对应的线程不为null,就加1
if (p.thread != null)
++n;
}
return n;
}
getSharedQueuedThreads()
方法 :在共享模式下,获取队列中等待的线程集合。
//在共享模式下,获取队列中等待的线程集合
public final Collection<Thread> getSharedQueuedThreads() {
//创建一个线程类型数组列表
ArrayList<Thread> list = new ArrayList<Thread>();
//从尾节点开始向前遍历
for (Node p = tail; p != null; p = p.prev) {
//节点处于共享状态
if (p.isShared()) {
//获取节点的线程
Thread t = p.thread;
//如果线程不为null,就将其加入到数组列表中
if (t != null)
list.add(t);
}
}
return list;
}
hasContended()
方法:查看是否其他线程节点也要获取同步状态。
//查看是否其他线程节点也正要获取同步状态
public final boolean hasContended() {
return head != null;
}
hasQueuedThreads()
方法:队列中是否存在正要获取同步状态的线程节点
//队里中是否存在正要获取同步状态的线程节点
public final boolean hasQueuedThreads() {
return head != tail;
}
isHeldExclusively()
方法:对于当前正调用的线程,是否是以同步的方式独占的运行
//对于当前正调用的线程,是否是以同步的方式独占的运行
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
isQueued()
方法:线程节点是否已经加入到了队列
//线程节点是否已经加入到了队列
public final boolean isQueued(Thread thread) {
//如果线程为null,那么抛出空指针异常
if (thread == null)
throw new NullPointerException();
//从尾节点开始遍历查找
for (Node p = tail; p != null; p = p.prev)
//如果节点对应的线程等于查找的线程,返回true
if (p.thread == thread)
return true;
return false;
}
11.AQS中等待队列的实现
1.内部实现类ConditionObject的继承与实现关系
//Condition接口的实现内部类
public class ConditionObject implements Condition, java.io.Serializable
2.常用属性及构造器
/** 等待队列的头节点 */
private transient Node firstWaiter;
/** 等待队列的尾节点 */
private transient Node lastWaiter;
/**
* 无参构造器
*/
public ConditionObject() { }
3.实现方法
addConditionWaiter()
方法:将当前线程作为一个等待节点添加到等待队列
//将当前线程作为一个等待节点添加到等待队列
private Node addConditionWaiter() {
//获取等待队列的尾节点
Node t = lastWaiter;
//如果尾节点不为null并且尾节点的等待状态不在等待队列中
if (t != null && t.waitStatus != Node.CONDITION) {
//释放在同步队列中等待的节点
unlinkCancelledWaiters();
//将t被赋值为等待队列的尾节点
t = lastWaiter;
}
//将当前线程构造成一个处于等待队列中的节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果尾节点为空,说明等待队列中没有等待节点
if (t == null)
//将当前线程构造的等待节点作为等待队列的头节点
firstWaiter = node;
else//如果尾节点不为空,说明等待队列中存在等待节点
//将当前线程构造的等待节点作为尾节点的后继节点
t.nextWaiter = node;
//更新尾节点,将当前线程构造的节点作为尾节点
lastWaiter = node;
//返回更新后尾节点
return node;
}
doSignal(Node first)
方法:将等待节点first从等待队列转移到同步队列
//将等待节点first从等待队列转移到同步队列
private void doSignal(Node first) {
do {
//如果输入节点的后继节点作为头节点,如果头节点为null
if ( (firstWaiter = first.nextWaiter) == null)
//尾节点设置为空
lastWaiter = null;
//输入节点的后继节点设置为null
first.nextWaiter = null;
} while (!transferForSignal(first) && //将输入节点从等待队列转移到同步状态
//并且头节点不为空
(first = firstWaiter) != null);
}
doSignalAll(Node first)
方法:将等待队列中的节点全部移出,然后转移到同步队列。
//将等待队列中的节点全部移出,然后转移到同步队列
private void doSignalAll(Node first) {
//设置等待队列的头节点等于尾节点等于null(等待队列的置空操作)
lastWaiter = firstWaiter = null;
//策略就是将等待队列中的节点遍历一遍,然后每个节点的后继关系解除,然后转移到同步队列中
do {
//获取输入节点的后继节点
Node next = first.nextWaiter;
//将后继节点设置为空
first.nextWaiter = null;
//将等待队列中的节点转移到同步队列中
transferForSignal(first);
//后继节点作为当前节点,依次向后遍历
first = next;
} while (first != null);//直到遍历到尾节点
}
unlinkCancelledWaiters()
方法:释放在同步队列中等待的节点。
private void unlinkCancelledWaiters() {
//获取等待队列的头节点
Node t = firstWaiter;
//创建临时节点
Node trail = null;
//从头节点开始遍历
while (t != null) {
//获取当前遍历到的节点的后继节点
Node next = t.nextWaiter;
//如果当前节点不在等待队列中
if (t.waitStatus != Node.CONDITION) {
//后继节点则设置为null
t.nextWaiter = null;
//如果临时节点为空,那么当前节点设置为后继节点
if (trail == null)
firstWaiter = next;
else//如果临时节点不为空,那么临时节点的后继节点为当前节点的后继节点
trail.nextWaiter = next;
if (next == null)//如果当前节点的后继节点为空,那么尾节点为临时节点
lastWaiter = trail;
}
else//如果当前节点在等到队列中
trail = t;//将当前节点赋值给临时节点
t = next;//后继节点作为当前遍历节点,向后遍历
}
}
4.对外提供的常用方法
signal()
方法:独占模式下,移出等待队列中等待时间最长的节点,将其移到同步队列。
//移出等待队列中等待时间最长的节点,将其移到同步队列
public final void signal() {
//如果当前线程不是以独占的方式运行的,那么就抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//获取等待队列的头节点
Node first = firstWaiter;
//如果等待队列的头节点不为空
if (first != null)
//将等待队列中的头节点从等待队列转移到同步队列
doSignal(first);
}
signal方法流程图:
await()
方法:让当前线程处于等待状态。
public final void await() throws InterruptedException {
//如果当前线程中断,抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程作为一个等待节点添加到等待队列,返回当前等待队列中的尾节点
Node node = addConditionWaiter();
//释放节点,就是将当前节点从同步队列中释放,返回保存的状态
int savedState = fullyRelease(node);
int interruptMode = 0;
//循环判断的条件是node节点是否不在同步队列中
while (!isOnSyncQueue(node)) {
//允许当前线程可用
LockSupport.park(this);
//检查不处于等待状态时,是否中断,不为0,说明已经中断了,直接退出当前循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//将node设置为同步队列的头节点,获取同步状态并且中断模型对应的值不为中断状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//如果节点node的后继节点不为null
if (node.nextWaiter != null) // clean up if cancelled
//释放同步队列中等待的节点
unlinkCancelledWaiters();
//如果当前中断模型值还是不为0,就是如果节点node还是处于中断情况
if (interruptMode != 0)
//在等待状态之后,处理中断模型值
reportInterruptAfterWait(interruptMode);
}
12.阅读总结
(1)AQS是采用双向链队列这种数据结构实现的。
(2)AQS中包含一个同步双端队列和一个等待双端队列。
(2)AQS没有获取同步状态的线程节点都加入到队列的尾部,AQS是从头节点开始获取锁资源。
(3)AQS获取同步状态的线程对象采用死循环+cas来保证。