一、AQS基础
AQS定义了一套多线程访问共享资源的同步模板,解决了实现同步器时涉及的大量细节问题。
AQS由三部分组成,state同步状态、Node组成的CLH队列、ConditionObject条件变量(包含Node组成的条件单向队列),下面会分别对这三部分做介绍。
先贴下AbstractQueuedSynchronizer提供的核心函数,混个脸熟就够了,后面会讲解
状态
- getState():返回同步状态
- setState(int newState):设置同步状态
- compareAndSetState(int expect, int update):使用C A S设置同步状态
- isHeldExclusively():当前线程是否持有资源
独占资源(不响应线程中断)
- tryAcquire(int arg):独占式获取资源,子类实现
- acquire(int arg):独占式获取资源模板
- tryRelease(int arg):独占式释放资源,子类实现
- release(int arg):独占式释放资源模板
共享资源(不响应线程中断)
- tryAcquireShared(int arg):共享式获取资源,返回值大于等于0则表示获取成功,否则获取失败,子类实现
- acquireShared(int arg):共享式获取资源模板
- tryReleaseShared(int arg):共享式释放资源,子类实现
- releaseShared(int arg):共享式释放资源模板
这里补充下,获取独占、共享资源操作还提供超时与响应中断的扩展函数,有兴趣的读者可以去AbstractQueuedSynchronizer源码了解。
二、CLH队列
CLH队列全称是(Craig, Landin, and Hagersten) lock queue,用来存储被阻塞的线程信息。
CLH是AQS内部维护的FIFO(先进先出)双端双向队列(方便尾部节点插入),基于链表数据结构。
当一个线程竞争资源失败,就会将等待资源的线程封装成一个Node节点,通过CAS原子操作插入队列尾部,最终不同的Node节点连接组成了一个CLH队列,所以说AQS通过CLH队列管理竞争资源的线程,个人总结CLH队列具有如下几个优点:
- 先进先出保证了公平性;
- 非阻塞的队列,通过自旋锁和C A S保证节点插入和移除的原子性,实现无锁快速插入;
- 采用了自旋锁思想,所以CLH也是一种基于链表的可扩展、高性能、公平的自旋锁;
Node内部类
Node是AQS的内部类,每个等待资源的线程都会封装成Node节点组成CLH队列、等待队列
,所以说Node是非常重要的部分,理解它是理解AQS的第一步。
列Node类中的变量都很好理解,只有waitStatus、nextWaiter没有细说,下面做个补充说明;
waitStatus等待状态如下:
nextWaiter特殊标记:
- Node在CLH队列时,nextWaiter表示共享式或独占式标记;
- Node在条件队列时,nextWaiter表示下个Node节点指针;
流程概述
线程获取资源失败,封装成Node节点从CLH队列尾部入队并阻塞线程,某线程释放资源时会把CLH队列首部Node节点关联的线程唤醒(此处的首部是指第二个节点),再次获取资源。
入队
获取资源失败的线程需要封装成Node节点,接着尾部入队,在AQS中提供 addWaiter函数完成Node节点的创建与入队。
private Node addWaiter(Node mode) {
//根据当前线程创建节点,等待状态为0
Node node = new Node(Thread.currentThread(), mode);
// 获取尾节点
Node pred = tail;
if (pred != null) {
//如果尾节点不等于null,把当前节点的前驱节点指向尾节点
node.prev = pred;
//通过cas把尾节点指向当前节点
if (compareAndSetTail(pred, node)) {
//之前尾节点的下个节点指向当前节点
pred.next = node;
return node;
}
}
//如果添加失败或队列不存在,执行enq函数
enq(node);
return node;
}
添加节点的时候,如果从CLH队列已经存在,通过CAS快速将当前节点添加到队列尾部;
如果添加失败或队列不存在,则指向enq函数自旋入队。
private Node enq(final Node node) {
for (;;) { //循环
//获取尾节点
Node t = tail;
if (t == null) {
//如果尾节点为空,创建哨兵节点,通过cas把头节点指向哨兵节点
if (compareAndSetHead(new Node()))
//cas成功,尾节点指向哨兵节点
tail = head;
} else {
//当前节点的前驱节点设指向之前尾节点
node.prev = t;
//cas设置把尾节点指向当前节点
if (compareAndSetTail(t, node)) {
//cas成功,之前尾节点的下个节点指向当前节点
t.next = node;
return t;
}
}
}
}
通过自旋CAS尝试往队列尾部插入节点,直到成功;自旋过程如果发现CLH队列不存在时会初始化CLH队列,入队过程流程如下图(
队列不存在流程图
):
第一次循环
- 刚开始C L H队列不存在,head与tail都指向null
- 要初始化C L H队列,会创建一个哨兵节点,head与tail都指向哨兵节点
第二次循环
- 当前线程节点的前驱节点指向尾部节点(哨兵节点)
- 设置当前线程节点为尾部,tail指向当前线程节点
- 前尾部节点的后驱节点指向当前线程节点(当前尾部节点)
最后结合addWaiter与enq函数的入队流程图如下:
出队
CLH队列中的节点都是获取资源失败的线程节点,当持有资源的线程释放资源时,会将head.next指向的线程节点唤醒(C L H队列的第二个节点),
如果唤醒的线程节点获取资源成功,线程节点清空信息设置为头部节点(新哨兵节点),原头部节点出队(原哨兵节点)
acquireQueued函数中的部分代码
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 0.自旋阻塞等待获取资源
for (;;) {
//1.获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是首节点,获取资源(子类实现)
if (p == head && tryAcquire(arg)) {
//2.获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
setHead(node);
//3.原来首节点下个节点指向为null
p.next = null; // help GC
//4.非异常状态,防止指向finally逻辑
failed = false;
//5.返回线程中断状态
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
//节点设置为头部
head = node;
//清空线程
node.thread = null;
//清空前驱节点
node.prev = null;
}
只需要关注1~3步骤即可,过程非常简单,假设获取资源成功,更换头部节点,并把头部节点的信息清除变成哨兵节点,注意这个过程是不需要使用CAS来保证,因为只有一个线程能够成功获取到资源。
三、条件变量
Object的wait、notify函数是配合Synchronized锁实现线程间同步协作的功能,AQS的ConditionObject条件变量也提供这样的功能,通过ConditionObject的await和signal两类函数完成。
不同于Synchronized锁,一个AQS可以对应多个条件变量,而Synchronized只有一个。
如下图所示,ConditionObject内部维护着一个单向条件队列,不同于CHL队列,
条件队列只入队执行await的线程节点,并且加入条件队列的节点,不能在CHL队列
;
条件队列出队的节点,会入队到CHL队列。
当某个线程执行了ConditionObject的await函数,阻塞当前线程,线程会被封装成Node节点添加到条件队列的末端,其他线程执行ConditionObject的signal函数,会将条件队列头部线程节点转移到CHL队列参与竞争资源,
具体流程如下图:
最后补充下,条件队列Node类是使用nextWaiter变量指向下个节点,并且因为是单向队列,所以prev与next变量都是null。
四、进阶
AQS
采用了模板方法设计模式,提供了两类模板,一类是独占式模板,另一类是共享形模式,对应的模板函数如下:
独占式
- acquire获取资源
- release释放资源
共享式
- acquireShared获取资源
- releaseShared释放资源
独占式获取资源
acquire是个模板函数,模板流程就是线程获取共享资源,如果获取资源成功,线程直接返回,否则进入CLH队列,直到获取资源成功为止,且整个过程忽略中断的影响,acquire函数代码如下:
- 执行tryAcquire函数,tryAcquire是由子类实现,代表获取资源是否成功,如果资源获取失败,执行下面的逻辑;
- 执行addWaiter函数(前面已经介绍过),根据当前线程创建出独占式节点,并入队CLH队列;
- 执行acquireQueued函数,自旋阻塞等待获取资源;
- 如果acquireQueued函数中获取资源成功,根据线程是否被中断状态,来决定执行线程中断逻辑;
acquire函数的大致流程都清楚了,下面来分析下acquireQueued函数,线程封装成节点后,是如何自旋阻塞等待获取资源的,代码如下:
final boolean acquireQueued(final Node node, int arg) {
//异常状态,默认是
boolean failed = true;
try {
//该线程是否中断过,默认否
boolean interrupted = false;
for (;;) {//自旋
//获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是首节点,获取资源(子类实现)
if (p == head && tryAcquire(arg)) {
//获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
setHead(node);
//原来首节点下个节点指向为null
p.next = null; // help GC
//非异常状态,防止指向finally逻辑
failed = false;
//返回线程中断状态
return interrupted;
}
/**
* 如果前驱节点不是首节点,先执行shouldParkAfterFailedAcquire函数,shouldParkAfterFailedAcquire做了三件事
* 1.如果前驱节点的等待状态是SIGNAL,返回true,执行parkAndCheckInterrupt函数,返回false
* 2.如果前驱节点的等待状态是CANCELLED,把CANCELLED节点全部移出队列(条件节点)
* 3.以上两者都不符合,更新前驱节点的等待状态为SIGNAL,返回false
*/
if (shouldParkAfterFailedAcquire(p, node) &&
//使用LockSupport类的静态方法park挂起当前线程,直到被唤醒,唤醒后检查当前线程是否被中断,返回该线程中断状态并重置中断状态
parkAndCheckInterrupt())
//该线程被中断过
interrupted = true;
}
} finally {
// 尝试获取资源失败并执行异常,取消请求,将当前节点从队列中移除
if (failed)
cancelAcquire(node);
}
}
核心流程图如下:
独占式释放资源
有获取资源,自然就少不了释放资源,AQS中提供了release模板函数来释放资源,模板流程就是线程释放资源成功,唤醒CLH队列的第二个线程节点(首节点的下个节点),代码如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {//释放资源成功,tryRelease子类实现
//获取头部线程节点
Node h = head;
if (h != null && h.waitStatus != 0) //头部线程节点不为null,并且等待状态不为0
//唤醒CHL队列第二个线程节点
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
//获取节点等待状态
int ws = node.waitStatus;
if (ws < 0)
//cas更新节点状态为0
compareAndSetWaitStatus(node, ws, 0);
//获取下个线程节点
Node s = node.next;
if (s == null || s.waitStatus > 0) { //如果下个节点信息异常,从尾节点循环向前获取到正常的节点为止,正常情况不会执行
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒线程节点
LockSupport.unpark(s.thread);
}
}
release逻辑非常简单,流程图如下:
共享式获取资源
acquireShared是个模板函数,模板流程就是线程获取共享资源,如果获取到资源,线程直接返回,否则进入CLH队列,直到获取到资源为止,且整个过程忽略中断的影响,acquireShared函数代码如下:
public final void acquireShared(int arg) {
/**
* 1.负数表示失败
* 2.0表示成功,但没有剩余可用资源
* 3.正数表示成功且有剩余资源
*/
if (tryAcquireShared(arg) < 0) //获取资源失败,tryAcquireShared子类实现
//自旋阻塞等待获取资源
doAcquireShared(arg);
}
doAcquireShared函数与独占式的acquireQueued函数逻辑基本一致,唯一的区别就是下图红框部分:
- 节点的标记是共享式
- 获取资源成功,还会唤醒后续资源,因为资源数可能>0,代表还有资源可获取,所以需要做后续线程节点的唤醒
共享式释放资源
AQS中提供了releaseShared模板函数来释放资源,模板流程就是线程释放资源成功,唤醒CHL队列的第二个线程节点(首节点的下个节点),代码如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//释放资源成功,tryReleaseShared子类实现
//唤醒后继节点
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
//获取头节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//如果头节点等待状态为SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//更新头节点等待状态为0
continue; // loop to recheck cases
//唤醒头节点下个线程节点
unparkSuccessor(h);
}
//如果后继节点暂时不需要被唤醒,更新头节点等待状态为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
与独占式释放资源区别不大,都是唤醒头节点的下个节点,就不做过多描述了。
五、实战
AQS定义了一套多线程访问共享资源的同步模板,解决了实现同步器时涉及的大量细节问题,能够极大地减少实现工作,现在我们基于AQS实现一个不可重入的独占锁,直接使用A Q S提供的独占式模板,只需明确state的语义与实现tryAcquire与tryRelease函数(获取资源与释放资源),在这里state为0表示锁没有被线程持有,state为1表示锁已经被某个线程持有,由于是不可重入锁,所以不需要记录持有锁线程的获取锁次数。
不可重入的独占锁代码如下:
public class NonReentrantLock implements Lock {
/**
* @Author 程序猿阿星
* @Description 自定义同步器
*/
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 锁是否被线程持有
*/
@Override
protected boolean isHeldExclusively() {
//0:未持有 1:已持有
return super.getState() == 1;
}
/**
* 获取锁
*/
@Override
protected boolean tryAcquire(int arg) {
if (arg != 1) {
//获取锁操作,是需要把state更新为1,所以arg必须是1
throw new RuntimeException("arg not is 1");
}
if (compareAndSetState(0, arg)) {//cas 更新state为1成功,代表获取锁成功
//设置持有锁线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 释放锁
*/
@Override
protected boolean tryRelease(int arg) {
if (arg != 0) {
//释放锁操作,是需要把state更新为0,所以arg必须是0
throw new RuntimeException("arg not is 0");
}
//清空持有锁线程
setExclusiveOwnerThread(null);
//设置state状态为0,此处不用cas,因为只有获取锁成功的线程才会执行该函数,不需要考虑线程安全问题
setState(arg);
return true;
}
/**
* 提供创建条件变量入口
*/
public ConditionObject createConditionObject() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
/**
* 获取锁
*/
@Override
public void lock() {
//Aqs独占式-获取资源模板函数
sync.acquire(1);
}
/**
* 获取锁-响应中断
*/
@Override
public void lockInterruptibly() throws InterruptedException {
//Aqs独占式-获取资源模板函数(响应线程中断)
sync.acquireInterruptibly(1);
}
/**
* 获取锁是否成功-不阻塞
*/
@Override
public boolean tryLock() {
//子类实现
return sync.tryAcquire(1);
}
/**
* 获取锁-超时机制
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
//Aqs独占式-获取资源模板函数(超时机制)
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
/**
* 释放锁
*/
@Override
public void unlock() {
//Aqs独占式-释放资源模板函数
sync.release(0);
}
/**
* 创建条件变量
*/
@Override
public Condition newCondition() {
return sync.createConditionObject();
}
}
NonReentrantLock定义了一个内部类Sync,Sync用来实现具体的锁操作,它继承了AQS,因为使用的是独占式模板,所以重写tryAcquire与tryRelease函数,另外提供了一个创建条件变量的入口,下面使用自定义的独占锁来同步两个线程对j++。
private static int j = 0;
public static void main(String[] agrs) throws InterruptedException {
NonReentrantLock nonReentrantLock = new NonReentrantLock();
Runnable runnable = () -> {
//获取锁
nonReentrantLock.lock();
for (int i = 0; i < 100000; i++) {
j++;
}
//释放锁
nonReentrantLock.unlock();
};
Thread thread = new Thread(runnable);
Thread threadTwo = new Thread(runnable);
thread.start();
threadTwo.start();
thread.join();
threadTwo.join();
System.out.println(j);
}
六、AQS简化流程图