AbstractQueuedSynchronizer详解(一)同步器分析

  • Post author:
  • Post category:其他



目录


1、下面讲几个多线程方面的概念:


(1)、临界资源:


(2)、阻塞的定义


(3)、临界区


(4)、同步器


(5)、上述同步器的特点:


(6)、如果需要实现一个同步器,他需要具备哪些特点:


(7)、jkd中同步器就是使用AbstractQueuedSynchronizer来实现


(8)、AQS的组件


(9)、公平方式:


2、AQS相关概念:


3、队列同步器接口


4、AQS内部实现:


5、acquire独占式同步状态获取与释放:


(1)、支持不响应中断获取独占锁:(acquire)


(2)、支持响应中断获取独占锁:(acquireInterruptibly),超时获取独占锁(tryAcquireNanos);


6、acquireShared共享式同步状态获取与释放:


7、doAcquireNanos独占式超时获取同步状态


1、下面讲几个多线程方面的概念:

(1)、临界资源:

为了解决竞争条件带来的问题,我们可以对资源上锁。多个线程共同读写的资源称为共享资源,也叫 临界资源。临界资源是一次仅允许一个进程使用的共享资源。各进程采取互斥的方式,实现共享的资源称作临界资源。属于临界资源的硬件有,打印机,磁带机等;软件有消息队列,变量,数组,缓冲区等。诸进程间采取互斥方式,实现对这种资源的共享。

(2)、阻塞的定义

阻塞是线程在满足某种条件之前暂时停止运行,而被动释放出CPU资源。进入该状态的线程不会主动进入线程队列等待CPU资源,而需要等待满足条件后被唤醒,才能让该线程重新进入到线程队列中排队等待CPU资源。一般造成线程阻塞的原因有:等待获取一个已经被其他线程持有的排他锁、等待某一操作结束、等待某一个时间段。线程阻塞后会被挂起,此时会处于BLOCKED、WAITING、TIMED_WAITING。线程阻塞后会让出CPU资源,这是为避免在自旋上浪费过多的CPU资源,是忙等待(busy wait)的一种优化。

(3)、临界区

涉及操作临界资源的代码区域称为 临界区(Critical Section)。同一时刻,只能有一个线程进入临界区。我们把这种情况称为互斥,即不允许多个线程同时对共享资源进行操作。

临界区是如何实现互斥的?

进入临界区前,需要先获得互斥锁。如果已经有线程正在使用资源,那么需要一直等待,直到其它线程归还互斥锁。

操作完共享资源之后,即退出临界区时,需要归还互斥锁,以便其它等待使用该资源的线程能够进入临界区。

(4)、同步器

多线程编程中,协调多个线程对同一临界资源进行读写的机制

Java常见同步器?ReentrantLock、信号量、CountdownLatch等

(5)、上述同步器的特点:

不自旋;

可超时可中断的等待;

共享模式(允许多个线程访问同一个锁);

或排他模式(线程独占锁);

公平或非公平方式(公平:先访问先获取锁);

(6)、如果需要实现一个同步器,他需要具备哪些特点:

状态的管理:决定是否能进入临界区;

–compareAndSet(expected,new)

入队和出队:等待进入临界区的线程入队和出队的操作;

–非阻塞队列Quene

阻塞或取消阻塞的操作;

–LockSupport park()  unpark()

(7)、jkd中同步器就是使用AbstractQueuedSynchronizer来实现

为什么叫Abstract呢,因为作者并不是希望你直接使用他,而是希望你继承

重写他的模板方法完成锁的一些功能实现;

(8)、AQS的组件

一个先入先出FIFO队列,双向队列来存储等待线程的信息:

基于CLH队列锁算法的变种,来实现;

不是自旋:叫醒,头节点释放锁后唤醒它后面的节点(双向链表next来实现);

双向链表:prev前驱,next后驱(为了实现可取消、可中断的锁,用双向链表prev来实现取消中断线程操作);

CLH是什么?

CLH是基于链表的的可扩展、高性能、公平的自旋锁:

申请线程在前驱节点的属性上自旋;

不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋

(9)、公平方式:

新进来的线程节点,都插入到队列的尾部排队获取锁;

(10)、非公平方式:

进来的时候有机会和队列排队中的第二个节点抢占一次锁,

如果抢占成功则返回true,如果不成功则排队到队列尾部;

2、AQS相关概念:

队列同步器AbstractQueuedSynchronizer(简称同步器AQS),是用来构建锁或者其他同步组件的基础框架,它使用了一个int全局变量state表示同步状态,

state初始值为0,线程加一次锁,state加1,重入锁时,state值再次加1,相反,释放重入锁的时候state值依次减1,减到0算释放完成;

/**
 * The synchronization state.
 */
private volatile int state;

通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象模板方法,来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch 、CyclicBarrier等)。

同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。

3、队列同步器接口

同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法,如下几个模板方法:

//独占式获取同步状态,如果当前线程获取同步状态成功,立即返回。否则,将//会进入同步队列等待,
//该方法将会重复调用重写的tryAcquire(int arg)方法
public final void acquire(int arg) 

//与acquire(int arg)基本相同,该方法支持响应中断。
public final void acquireInterruptibly(int arg)

//独占式释放同步状态,该方法会在释放同步状态后,将同步队列中第一个节点//包含的线程唤醒
public final boolean release(int arg) 

//获取共享锁
public final void acquireShared(int arg) 

//释放共享
public final boolean releaseShared(int arg) 

重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。

·getState():获取当前同步状态。

·setState(int newState):设置当前同步状态。

·compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。

队列同步器中的队列是Node节点,一会儿可以看到Node对象的属性。

4、AQS内部实现:

节点是构成同步队列的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部,同步队列的基本结构如图5-1所示。在图5-1中,同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。

试想一下,当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Nodeupdate),它需要传递当前线程

“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式

与之前的尾节点建立关联。同步器将节点加入到同步队列的过程如图5-2所示。

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点,该过程如图5-3所示。

在图5-3中,设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可。

内部含有两条 Queue(Sync Queue, Condition Queue), 抢占锁的Node节点线程在一个双向链表,条件等待队列在一个单向链表中,这两条 Queue 后面会详细说明;

+——+  prev +—–+       +—–+

head |      | < — > |      | < — > |      |  tail

+——+       +—–+       +—–+

支持共享和独占两种模式(共享模式时只用 Sync Queue, 独占模式有时只用 Sync Queue, 但若涉及 Condition, 则还有 Condition Queue);

5、acquire独占式同步状态获取与释放:

(1)、支持不响应中断获取独占锁:(acquire)

该方法为独占式获取同步状态,但是该方法对中断不敏感,也就是说由于线程获取同步状态失败加入到同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除;

先来看方法源码:


public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中等待的相关工作,其主要逻辑是:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的

唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

下面分析一下相关工作。首先是节点的构造以及加入同步队列:


private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

上述代码通过使用compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全添加。试想一下:如果使用一个普通的LinkedList来维护节点之间的关系,那么当一个线程获取了同步状态,而其他多个线程由于调用tryAcquire(int arg)方法获取同步状态失败而并发地被添加到LinkedList时,LinkedList将难以保证Node的正确添加,最终的结果可能是节点的数量有偏差,而且顺序也是混乱的。

在enq(final Node node)方法中,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。可以看出,enq(final Node node)方法将并发添加节点的请求通过CAS变得“串行化”了。

节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程):


final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在acquireQueued(final Node node,int arg)方法中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态,这是为什么?原因有两个,如下。

第一,头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。

第二,维护同步队列的FIFO原则。该方法中,节点自旋获取同步状态

在上图中,由于非首节点线程前驱节点出队或者被中断而从等待状态返回,随后检查自己的前驱是否是头节点,如果是则尝试获取同步状态。可以看到节点和节点之间在循环检查的过程中基本不相互通信,而是简单地判断自己的前驱是否为头节点,这样就使得节点的释放规则符合FIFO,并且也便于对过早通知的处理(过早通知是指前驱节点不是头节点的线程由于中断而被唤醒)。

独占式同步状态获取流程,也就是acquire(int arg)方法调用流程,如下图所示:

在上图中,前驱节点为头节点且能够获取同步状态的判断条件和线程进入等待状态是获取同步状态的自旋过程。当同步状态获取成功之后,当前线程从acquire(int arg)方法返回,如果对于锁这种并发组件而言,代表着当前线程获取了锁。

当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态),如下代码:


public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

该方法执行时,会唤醒头节点的后继节点线程,unparkSuccessor(Node node)方法使用LockSupport(在后面会介绍)来唤醒处于等待状态的线程。

分析了独占式同步状态获取和释放过程后,适当做个总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。

(2)、支持响应中断获取独占锁:(acquireInterruptibly),超时获取独占锁(tryAcquireNanos);

AQS的基本执行过程就是尝试获取锁,成功则返回,如果失败就进入同步队列进行锁资源的等待。基于这个流程可以看出队列跟队列中的节点应该是两个重点

6、acquireShared共享式同步状态获取与释放:

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。以文件的读写为例,如果一个程序在对文件进行读操作,那么这一时刻对于该文件的写操作均被阻塞,而读操作能够同时进行。写操作要求对资源的独占式访问,而读操作可以是共享式访问,两种不同的访问模式在同一时刻对文件或资源的访问情况,如图5-6所示。

在图5-6中,左半部分,共享式访问资源时,其他共享式的访问均被允许,而独占式访问被阻塞,右半部分是独占式访问资源时,同一时刻其他访问均被阻塞。

通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态,该方法代码如代码清单5-7所示。

代码清单5-7 同步器的acquireShared和doAcquireShared方法

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在acquireShared(int arg)方法中,同步器调用tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示能够获取到同步状态。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是tryAcquireShared(int arg)方法返回值大于等于0。可以看到,在doAcquireShared(int arg)方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出。

与独占式一样,共享式获取也需要释放同步状态,通过调用releaseShared(int arg)方法可以释放同步状态,该方法代码如代码清单5-8所示。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线程同时访问的并发组件(比如Semaphore),它和独占式主要区别在于tryReleaseShared(int arg)方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和CAS来保证的,因为释放同步状态的操作会同时来自多个线程。

7、doAcquireNanos独占式超时获取同步状态

通过调用同步器的doAcquireNanos(int arg,long nanosTimeout)方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则,返回false。该方法提供了传统Java同步操作(比如synchronized关键字)所不具备的特性。

在分析该方法的实现前,先介绍一下响应中断的同步状态获取过程。在Java 5之前,当一个线程获取不到锁而被阻塞在synchronized之外时,对该线程进行中断操作,此时该线程的中断标志位会被修改,但线程依旧会阻塞在synchronized上,等待着获取锁。在Java 5中,同步器提供了acquireInterruptibly(int arg)方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出InterruptedException。

超时获取同步状态过程可以被视作响应中断获取同步状态过程的“增强版”,

doAcquireNanos(int arg,long nanosTimeout)方法在支持响应中断的基础上,增加了超时获取的特性。针对超时获取,主要需要计算出需要睡眠的时间间隔nanosTimeout,为了防止过早通知,nanosTimeout计算公式为:nanosTimeout-=now-lastTime,其中now为当前唤醒时间,lastTime为上

次唤醒时间,如果nanosTimeout大于0则表示超时时间未到,需要继续睡眠nanosTimeout纳秒,反之,表示已经超时,该方法代码如代码清单5-9所示。


private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

该方法在自旋过程中,当节点的前驱节点为头节点时尝试获取同步状态,如果获取成功则从该方法返回,这个过程和独占式同步获取的过程类似,但是在同步状态获取失败的处理上有所不同。

如果当前线程获取同步状态失败,则判断是否超时(nanosTimeout小于等于0表示已经超时),如果没有超时,重新计算超时间隔nanosTimeout,然后使当前线程等待nanosTimeout纳秒(当已到设置的超时时间,该线程会从LockSupport.parkNanos(Objectblocker,long nanos)方法返回)。

如果nanosTimeout小于等于spinForTimeoutThreshold(1000纳秒)时,将不会使该线程进行超时等待,而是进入快速的自旋过程。原因在于,非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让nanosTimeout的超时从整体上表现得反而不精确。因此,在超时非常短的场景下,同步器会进入无条件的快速自旋。

独占式超时获取同步态的流程如图5-7所示。

从图5-7中可以看出,独占式超时获取同步状态doAcquireNanos(int arg,long nanosTimeout)和独占式获取同步状态acquire(int args)在流程上非常相似,其主要区别在于未获取到同步状态时的处理逻辑。acquire(int args)在未获取到同步状态时,将会使当前线程一直处于等待状态,而doAcquireNanos(int arg,long nanosTimeout)会使当前线程等待nanosTimeout纳秒,如果当前线程在nanosTimeout纳秒内没有获取到同步状态,将会从等待逻辑中自动返回。

以上内容大多摘要自:《Java并发编程的艺术》-方腾飞

其他参考贴:

https://www.jianshu.com/p/49b86f9cd7ab


https://juejin.im/post/5d0b3b55f265da1bc23f7dca

源码分析请看下一篇:


AbstractQueuedSynchronizer详解(二)同步器 与 ReentrantLock源码详细分析



版权声明:本文为lejustdoit原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。