java 并发框架 核心AQS(AbstractQueuedSynchronizer)

  • Post author:
  • Post category:java


类如其名,抽象的队列式的同步器,AQS定义了一套

多线程访问共享资源的同步器框架

,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch…

核心数据结构:


它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(Node为元素的双向链表结构)(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词,具体volatile的语义,在此不述。state的访问方式有三种:

  • getState()

  • setState()

  • compareAndSetState()

基础内部类Node,构成等待队列的元素类。


Node的源码

Node就是CLH队列的节点。Node在AQS中实现,它的数据结构如下:

private transient volatile Node head;    // CLH队列的队首
private transient volatile Node tail;    // CLH队列的队尾
​
// CLH队列的节点
static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    //定义线程的状态所对应的常量;
    
    static final int CANCELLED =  1;// 线程已被取消,对应的waitStatus的值
    static final int SIGNAL    = -1;// “当前线程的后继线程需要被unpark(唤醒)”,对应的waitStatus的值。 一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
    static final int CONDITION = -2;// 线程(处在Condition休眠状态)在等待Condition唤醒,对应的waitStatus的值
    static final int PROPAGATE = -3;// (共享锁)其它线程获取到“共享锁”,对应的waitStatus的值
    
    //定义线程的状态变量;
    volatile int waitStatus;// waitStatus为“CANCELLED, SIGNAL, CONDITION, PROPAGATE”时分别表示不同状态,若waitStatus=0,则意味着当前线程不属于上面的任何一种状态。
    
    volatile Node prev;// 前一节点
    volatile Node next;// 后一节点
    
    volatile Thread thread;// 节点所对应的线程
​
    // nextWaiter是“区别当前CLH队列是 ‘独占锁’队列 还是 ‘共享锁’队列 的标记”
    // 若nextWaiter=SHARED,则CLH队列是“独占锁”队列;
    // 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),则CLH队列是“共享锁”队列。
    Node nextWaiter;
​
    // “共享锁”则返回true,“独占锁”则返回false。
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
​
    // 返回前一节点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
​
    Node() {    // Used to establish initial head or SHARED marker
    }
​
    // 构造函数。thread是节点所对应的线程,mode是用来表示thread的锁是“独占锁”还是“共享锁”。
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
​
    // 构造函数。thread是节点所对应的线程,waitStatus是线程的等待状态。
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}


说明

: Node是CLH队列的节点,代表“等待锁的线程队列”。 (01) 每个Node都会一个线程对应。 (02) 每个Node会通过prev和next分别指向上一个节点和下一个节点(双向链表),这分别代表上一个等待线程和下一个等待线程。 (03) Node通过waitStatus保存线程的等待状态。 (04) Node通过nextWaiter来区分线程是“独占锁”线程还是“共享锁”线程。如果是“独占锁”线程,则nextWaiter的值为EXCLUSIVE;如果是“共享锁”线程,则nextWaiter的值是SHARED。

两种共享模式

AQS定义

两种资源共享方式

1.Exclusive(独占,只有一个线程能执行,如ReentrantLock)其需要实现acquire-release

2.Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。其需要实现acquireShared-releaseShared

不同的自定义同步器争用共享资源的方式也不同。

自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可

,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

方法 作用 返回值(用法)
isHeldExclusively() 该线程是否正在独占资源 只有用到condition才需要去实现它
tryAcquire(int) 独占方式:尝试获取资源 成功则返回true,失败则返回false
tryRelease(int) 独占方式:尝试释放资源 成功则返回true,失败则返回false
tryAcquireShared(int) 共享方式:尝试获取资源 负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源
tryReleaseShared(int) 共享方式:尝试释放资源 如果释放后允许唤醒后续等待结点返回true,否则返回false

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

流程分析:

现在分析获取锁释放锁的流程:依照acquire-release、acquireShared-releaseShared的次序来

独占锁的获取与释放:

获取:

流程图:

整体流程步骤:

  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;

  2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

  3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark()唤醒)会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

0. acquire(int)——是顶层入口,定义了独占模式的获取锁的流程

此方法是独占模式下线程获取共享资源(锁)的顶层入口。如果获取到资源,线程直接返回,否则进入

等待队列

,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:

1 public final void acquire(int arg) {
2     if (!tryAcquire(arg) &&
3         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4         selfInterrupt();
5 }

那么在这个函数中分别调用了tryAcquire(),addWaiter(),acquireQueued(),selfInterrupt();下面分别来看AQS中的实现。

1.tryAcquire(int)——对应lock中的tryLock()的语义

此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。

1     protected boolean tryAcquire(int arg) {
2         throw new UnsupportedOperationException();
3     }

特点:AQS只是一个框架,无具体实现(要使用必须覆写),具体资源的获取/释放方式交由自定义同步器去实现;AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过state的get/set/CAS)。至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。

非abstract的原因:独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。

2.addWaiter(Node)——将当前线程加入到等待队列的队尾,并返回当前线程所在的结点

源码:

 1 private Node addWaiter(Node mode) {
 2     //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
 3     Node node = new Node(Thread.currentThread(), mode);
 4     
 5     //尝试快速方式直接放到队尾。
 6     Node pred = tail;
 7     if (pred != null) {
 8         node.prev = pred;
 9         if (compareAndSetTail(pred, node)) {
10             pred.next = node;
11             return node;
12         }
13     }
14     
15     //上一步失败则通过enq入队。
16     enq(node);
17     return node;
18 }

Node结点是对每一个访问同步代码的线程的封装,其包含了需要同步的线程本身以及线程的状态,如是否被阻塞,是否等待唤醒,是否已经被取消等。变量waitStatus则表示当前被封装成Node结点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。

语义
CANCELLED[1] 当前线程已被取消
SIGNAL[-1] “当前线程的后继线程需要被unpark(唤醒)”。一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
CONDITION[-2] 当前线程(处在Condition休眠状态)在等待Condition唤醒
PROPAGATE[-3] (共享锁)其它线程获取到“共享锁”
[0] 当前线程不属于上面的任何一种状态

AQS在判断状态时,通过用waitStatus>0表示取消状态,而waitStatus<0表示有效状态。

在这个方法中调用了enq(node)

enq(Node)——将node加入队尾

 1 private Node enq(final Node node) {
 2     //CAS"自旋",直到成功加入队尾
 3     for (;;) {
 4         Node t = tail;
 5         if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
 6             if (compareAndSetHead(new Node()))
 7                 tail = head;
 8         } else {//正常流程,放入队尾
 9             node.prev = t;
10             if (compareAndSetTail(t, node)) {
11                 t.next = node;
12                 return t;
13             }
14         }
15     }
16 }


特点:CAS自旋volatile变量

,是一种很经典的用法。

acquireQueued(Node, int)——在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回

现在线程获取资源失败,已经被放入等待队列尾部了。

下一步:

进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了

。——acquireQueued()就是干这件事:

在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回

。这个函数非常关键,还是上源码吧:通过自旋实现

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;//标记是否成功拿到资源
        try {
            boolean interrupted = false;//标记等待过程中是否被中断过
            for (;;) {
                final Node p = node.predecessor();//拿到前驱
                if (p == head && tryAcquire(arg)) {//如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
                    setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
                    p.next = null; // help GC 断开p与所有GC ROOt的链接,方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
                    failed = false;
                    return interrupted;//返回等待过程中是否被中断过
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//如果自己可以休息了,就进入waiting状态,直到被unpark()
                    interrupted = true;//如果等待过程中被中断过(第二个条件来判断的),哪怕只有那么一次,就将interrupted标记为true
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

到这里了,我们先不急着总结acquireQueued()的函数流程,

这里逻辑的实现中使用了两个函数shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()

shouldParkAfterFailedAcquire(Node, Node)——用于检查状态

看看自己是否真的可以去休息了(进入waiting状态),万一队列前边的线程都放弃了只是瞎站着,那也说不定。

 // 返回“当前线程是否应该阻塞”
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前继节点的状态
    int ws = pred.waitStatus;
    // 如果前继节点是SIGNAL状态,则意味这当前线程需要被unpark唤醒。此时,返回true。
    if (ws == Node.SIGNAL)
        return true;
    // 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点”  为  “‘原前继节点’的前继节点”。
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

shouldParkAfterFailedAcquire()通过以下规则,判断“当前线程”是否需要被阻塞。

规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。 ​ 规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。 ​ 规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。

如果“规则1”发生,即“前继节点是SIGNAL”状态,则意味着“当前线程”需要被阻塞。接下来会调用parkAndCheckInterrupt()阻塞当前线程,直到当前先被唤醒才从parkAndCheckInterrupt()中返回。

parkAndCheckInterrupt()

parkAndCheckInterrupt()在AQS中实现,源码如下:

private final boolean parkAndCheckInterrupt() {
    // 通过LockSupport的park()阻塞“当前线程”。
    LockSupport.park(this);
    // 返回线程的中断状态。
    return Thread.interrupted();
}


说明

:parkAndCheckInterrupt()的作用是阻塞当前线程,并且返回“线程被唤醒之后”的中断状态。 它会先通过LockSupport.park()阻塞“当前线程”,然后通过Thread.interrupted()返回线程的中断状态。

这里介绍一下线程被阻塞之后如何唤醒。一般有2种情况:

第1种情况

:unpark()唤醒。“前继节点对应的线程”使用完锁之后,通过unpark()方式唤醒当前线程。

第2种情况

:中断唤醒。其它线程通过interrupt()中断当前线程。


补充

:LockSupport()中的park(),unpark()的作用 和 Object中的wait(),notify()作用类似,是阻塞/唤醒。 它们的用法不同,park(),unpark()是轻量级的,而wait(),notify()是必须先通过Synchronized获取同步锁。 park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:

1)被unpark();

2)被interrupt();需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。

了解了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()函数之后。我们接着分析acquireQueued()的for循环部分。


再次tryAcquire()

final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
}


说明

: (01) 通过node.predecessor()获取前继节点。predecessor()就是返回node的前继节点 (02) p == head && tryAcquire(arg) 首先,判断“前继节点”是不是CHL表头。如果是的话,则通过tryAcquire()尝试获取锁。 其实,这样做的目的是为了“让当前线程获取锁”,但是为什么需要先判断p==head呢?理解这个对理解“公平锁”的机制很重要,因为这么做的原因就是为了保证公平性! (a) 前面,我们在shouldParkAfterFailedAcquire()我们判断“当前线程”是否需要阻塞; (b) 接着,“当前线程”阻塞的话,会调用parkAndCheckInterrupt()来阻塞线程。当线程被解除阻塞的时候,我们会返回线程的中断状态。而线程被解决阻塞,可能是由于“线程被中断”,也可能是由于“其它线程调用了该线程的unpark()函数”。 (c) 再回到p==head这里。如果当前线程是因为其它线程调用了unpark()函数而被唤醒,那么唤醒它的线程,应该是它的前继节点所对应的线程(关于这一点,后面在“释放锁”的过程中会看到)。 OK,是前继节点调用unpark()唤醒了当前线程! 此时,再来理解p==head就很简单了:当前继节点是CLH队列的头节点,并且它释放锁之后;就轮到当前节点获取锁了。然后,当前节点通过tryAcquire()获取锁;获取成功的话,通过setHead(node)设置当前节点为头节点,并返回。 总之,如果“前继节点调用unpark()唤醒了当前线程”并且“前继节点是CLH表头”,此时就是满足p==head,也就是符合公平性原则的。否则,如果当前线程是因为“线程被中断”而唤醒,那么显然就不是公平了。这就是为什么说p==head就是保证公平性!


小结

:acquireQueued()的作用就是“当前线程”会根据公平性原则进行阻塞等待,直到获取锁为止;并且返回当前线程在等待过程中有没有并中断过。

selfInterrupt()

selfInterrupt()是AQS中实现,源码如下:

private static void selfInterrupt() {
    Thread.currentThread().interrupt();
}


说明

:selfInterrupt()的代码很简单,就是“当前线程”自己产生一个中断。但是,为什么需要这么做呢? 这必须结合acquireQueued()进行分析。如果在acquireQueued()中,当前线程被中断过,则执行selfInterrupt();否则不会执行。

在acquireQueued()中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”! 也就是说,在该线程“成功获取锁并真正执行起来”之前,它的中断会被忽略并且中断标记会被清除! 因为在parkAndCheckInterrupt()中,我们线程的中断状态时调用了Thread.interrupted()。该函数不同于Thread的isInterrupted()函数,isInterrupted()仅仅返回中断状态,而interrupted()在返回当前中断状态之后,还会清除中断状态。 正因为之前的中断状态被清除了,所以这里需要调用selfInterrupt()重新产生一个中断!


小结

:selfInterrupt()的作用就是当前线程自己产生一个中断。

释放锁:

release(int)——unlock()的语义

acquire()是上锁的顶层接口 对应的独占模式下线程释放共享资源的顶层入口就是release()操作。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

源码:

 public final boolean release(int arg) {
     if (tryRelease(arg)) {
         Node h = head;//找到头结点
         if (h != null && h.waitStatus != 0)//找到第二个节点,并唤醒
             unparkSuccessor(h);//唤醒等待队列里的下一个线程
         return true;
     }
     return false;
 }

调用tryRelease()来释放资源,释放资源后会唤醒其后继节点(后继节点重新尝试获取同步方法)。有一点需要注意的是,

它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease()的时候要明确这一点!!

tryRelease(int)

此方法尝试去释放指定量的资源。下面是tryRelease()的源码:

 protected boolean tryRelease(int arg) {
     throw new UnsupportedOperationException();
 }

跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。

语义规定(重写时注意):

1.已经彻底释放资源(state=0),要返回true

2.否则返回false

unparkSuccessor(Node)——唤醒等待队列中下一个线程

 private void unparkSuccessor(Node node) {
     //这里,node一般为当前线程所在的结点。
     int ws = node.waitStatus;
     if (ws < 0)//置零当前线程所在的结点状态,允许失败。
         compareAndSetWaitStatus(node, ws, 0);
 
     Node s = node.next;//找到下一个需要唤醒的结点s
     if (s == null || s.waitStatus > 0) {//如果为空或已取消
         s = null;
         for (Node t = tail; t != null && t != node; t = t.prev)
             if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
                 s = t;
     }
     if (s != null)
         LockSupport.unpark(s.thread);//唤醒
 }

用unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了!!And then, DO what you WANT!

独占方式的总结:

在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入这个队列,并在队列中自旋;移出队列(或停止自旋)的条件是前驱节点为头节点并且成功的获取同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法来释放同步状态,然后唤醒头节点的后继节点。

再来看共享锁的获取锁和释放锁的过程:

acquireShared(int)

共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。

 public final void acquireShared(int arg) {//指定初始量arg
     if (tryAcquireShared(arg) < 0)
         doAcquireShared(arg);
 }

这里acquireShared()的流程就是:

  1. tryAcquireShared()尝试获取资源,成功则直接返回;

  2. 失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。

tryAcquireShared()

 protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
 }

这里tryAcquireShared()依然需要自定义同步器去实现。

返回值的语义:

1.负值代表获取失败;

2.0代表获取成功,但没有剩余资源;

3.正数表示获取成功,还有剩余资源,其他线程还可以去获取。

doAcquireShared(int)——doAcquireShared()的作用是获取共享锁

将当前线程加入等待队列尾部并自旋,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回doAcquireShared()**

private void doAcquireShared(int arg) {
    // addWaiter(Node.SHARED)的作用是,创建“当前线程”对应的节点,并将该线程添加到CLH队列中。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取“node”的前一节点
            final Node p = node.predecessor();
            // 如果“当前线程”是CLH队列的表头,则尝试获取共享锁。
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 如果“当前线程”不是CLH队列的表头,则通过shouldParkAfterFailedAcquire()判断是否需要等待,
            // 需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。若阻塞等待过程中,线程被中断过,则设置interrupted为true。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


说明

:doAcquireShared()的作用是获取共享锁。 它会首先创建线程对应的CLH队列的节点,然后将该节点添加到CLH队列中。CLH队列是管理获取锁的等待线程的队列。 如果“当前线程”是CLH队列的表头,则尝试获取共享锁;否则,则需要通过shouldParkAfterFailedAcquire()判断是否阻塞等待,需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。 doAcquireShared()会通过for循环,不断的进行上面的操作;目的就是获取共享锁。

需要注意的是:doAcquireShared()在每一次尝试获取锁时,是通过tryAcquireShared()来执行的!

shouldParkAfterFailedAcquire(), parkAndCheckInterrupt()等函数上面详细介绍过,这里就不再重复说明了。跟acquireQueued()很相似,其实流程并没有太大区别。只不过这里将补中断的selfInterrupt()放到doAcquireShared()里了,而独占模式是放到acquireQueued()之外,其实都一样。

在自旋的过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,返回值大于0,表是该次获取同步状态成功并从自旋中退出。

跟独占模式比,还有一点区别是,这里只有线程是head.next时(“老二”),才会去尝试获取资源,有剩余的话还会唤醒之后的队友。那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢,还是不让?答案是否定的!老二会继续park()等待其他线程释放资源,也更不会去唤醒老三和老四了。独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。当然,这并不是问题,只是AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)。

setHeadAndPropagate(Node, int)

 1 private void setHeadAndPropagate(Node node, int propagate) {
 2     Node h = head; 
 3     setHead(node);//head指向自己
 4      //如果还有剩余量,继续唤醒下一个邻居线程
 5     if (propagate > 0 || h == null || h.waitStatus < 0) {
 6         Node s = node.next;
 7         if (s == null || s.isShared())
 8             doReleaseShared();
 9     }
10 }

此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式!

小结

acquireShared()的流程:

  1. tryAcquireShared()尝试获取资源,成功则直接返回;

  2. 失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。

其实跟acquire()的流程大同小异,只不过多了个

自己拿到资源后,还会去唤醒后继队友的操作(这才是共享嘛)

releaseShared()

共享模式下线程释放共享资源的顶层入口。会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。下面是releaseShared()的源码:

1 public final boolean releaseShared(int arg) {
2     if (tryReleaseShared(arg)) {//尝试释放资源
3         doReleaseShared();//唤醒后继结点
4         return true;
5     }
6     return false;
7 }

释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。

doReleaseShared()

此方法主要用于唤醒后继。

 1 private void doReleaseShared() {
 2     for (;;) {//自旋
 3         Node h = head;
 4         if (h != null && h != tail) {
 5             int ws = h.waitStatus;
 6             if (ws == Node.SIGNAL) {
 7                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 8                     continue;
 9                 unparkSuccessor(h);//唤醒后继
10             }
11             else if (ws == 0 &&
12                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
13                 continue;
14         }
15         if (h == head)// head发生变化
16             break;
17     }
18 }

3.5 小结

本节我们详解了独占和共享两种模式下获取-释放资源(acquire-release、acquireShared-releaseShared)的源码,相信大家都有一定认识了。值得注意的是,acquire()和acquireShared()两种方法下,线程在等待队列中都是忽略中断的。AQS也支持响应中断的,acquireInterruptibly()/acquireSharedInterruptibly()即是,这里相应的源码跟acquire()和acquireShared()差不多,这里就不再详解了。

一些其他的在覆写tryAcquire()会用到的函数:


1. hasQueuedPredecessors()

hasQueuedPredecessors()在AQS中实现,源码如下:

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}


说明

: 通过代码,能分析出,hasQueuedPredecessors() 是通过判断”当前线程”是不是在CLH队列的队首,来返回AQS中是不是有比“当前线程”等待更久的线程。


2.compareAndSetState()

compareAndSetState()在AQS中实现。它的源码如下:

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}


说明

: 通过compareAndSwapInt()原子操作的方式使得当前线程状态改变;expect——>update。


3. setExclusiveOwnerThread()

setExclusiveOwnerThread()在AQS的父类AbstractOwnableSynchronizer中实现,它的源码如下:

// exclusiveOwnerThread是当前拥有“独占锁”的线程
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t) {
    exclusiveOwnerThread = t;
}


说明

:setExclusiveOwnerThread()的作用就是,设置线程t为当前拥有“独占锁”的线程。


4. getState(), setState()

getState()和setState()都在AQS中实现,源码如下:

// 锁的状态
private volatile int state;
// 设置锁的状态
protected final void setState(int newState) {
    state = newState;
}
// 获取锁的状态
protected final int getState() {
    return state;
}


说明

:state表示锁的状态,对于“独占锁”而已,state=0表示锁是可获取状态(即,锁没有被任何线程锁持有)。由于java中的独占锁是可重入的,state的值可以>1。



版权声明:本文为zuo_h_dr原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。