Java同步器框架-AQS原理&源码解析

  • Post author:
  • Post category:java


一、自己实现一个简单的同步器

Java提供了

wait()

,

notify()

,

notifyAll()

方法来支持线程之间的通信。这3个方法是Object类中定义的,因为在java中,Object是所有类的基类,所以所有的类都有这3个方法。

我们可以某个类上执行wait方法让执行线程进入休眠状态,然后另外一个线程在这个对象上执行notify或者notifyAll() 来唤醒正在等待的线程。

下面我们通过

wait()



notify()

方法实现一个简单的线程同步器

public class DemoTest {
    private static void demo() {
        //定义个空对象,后面我们将在这个对象上面执行wait和notify操作
        final Object lock = new Object();
        Thread workerA = new Thread(new Runnable() {
            public void run() {
                //先获取到lock 对象的锁
                synchronized (lock) {
                    System.out.println("A first");
                    try {
                        //在lock对象上执行wait()方法,让其进入休眠,等待有人唤醒自己
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    for (int i = 0; i < 3; i++) {
                        System.out.println("A" + i);
                    }
                }
            }
        });
        Thread workerB = new Thread(new Runnable() {
            public void run() {
                //获取lock的锁
                synchronized (lock) {
                    for (int i = 0; i < 3; i++) {
                        System.out.println("B" + i);
                    }
                    //唤醒正在lock对象上等待的线程
                    lock.notify();
                }
            }
        });
        workerA.start();
        workerB.start();
    }

    public static void main(String[] args) {
        demo();
    }
}

在上面的程序中,我们让线程A先获取到lock锁,然后进入同步块,打印出

A first

后执行wait()进入休眠等待有人唤醒它。

值得一提的是,执行

lock.wait()

方法会释放手中持有的lock对象上面的锁。另外,执行wait()方法的前提也是必须先获得lock对象的锁,不然调用wait()方法会抛出异常。

然后B线程开始执行,由于A线程已经释放lock锁并进入休眠。B线程获得线程然后获取到lock锁。执行打印的语句,然后调用

lock.notify()

唤醒正在lock上等待的线程,也就是线程A。然后线程B释放锁退出同步块,线程A被唤醒后继续执行工作。


细心的读者可能会发现,上面的程序会有一个问题。就是假如线程B先执行,那么线程B先调用了lock.notify()方法,之后线程A再进入同步块执行lock.wait()方法,线程A可能就永远等待下去,因为它永远都无法收到唤醒的信号了。

另外,上面的例子只是2个线程的通信。如果涉及到多个线程之间的通信,同步器的实现会更加复杂。需要考虑到线程之间的竞争,同时还要兼顾性能、稳定性。因此,设计一个多个线程的同步器对开发者而言难度较大。不过,Java内置了AQS同步器框架解决了这个问题,我们只需要继承AQS类,然后根据自己的业务场景实现几个关键方法,就可以得到一个高效稳定的同步器了。

二、AQS的实现原理

AQS说是一个同步器框架,但它其实就是一个Java类:

AbstractQueuedSynchronizer

,它是抽象类。我们继承它并实现几个方法,就可以得到一个功能完好的同步器。在java的juc包中,许多同步器都是基于AQS来实现的。如

ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore

等类都是使用AQS来实现同步操作的。另外,java线程池的Worker实现中,也用到了AQS。

AQS的原理不是很难,它维护了一个状态state以及一个CLH队列。在不同的业务场景下,state也可以理解为资源,CLH队列是一个双端队列,每个节点存放了正在等待获取资源的线程。假设现在有线程通过调用

acquire()

方法尝试获取资源(state),如果资源不够,AQS就会将这个线程封装成一个Node插入到CLH队列的尾部,然后这个线程进入休眠,等待被唤醒。如果一个又一个线程执行

acquire()

并且资源不足,在CLH队列排队的线程就会越来越多。

之后如果有其他线程执行

release()

释放一部分资源。AQS在释放完资源就会唤醒CLH队列头部的那个线程,所以CLH队列是一个先到先服务的队列。队列中的线程被唤醒后,就会检查资源是否满足自己的要求,如果够了,就立刻返回,不然的话继续阻塞。

另外,AQS还提供了独占模式和共享模式。在共享模式下,执行

releaseShare()

的时候,唤醒CLH队列头部的线程,队列头部的线程消耗一定资源后,发现资源还有剩余(state>0),就会继续唤醒它的下一个等待的线程,也就是它的后驱节点。而独占模式就不会。

说了这么多,可能光看文字描述会比较难理解。下面我们来看一下源码就明白了

三、AQS源码解析

看AQS源码我们一般关注4个方法,

acquire(int arg)



acquireShared(int arg)



release(int arg)



releaseShared(int arg)

。方法后缀带share的说明是共享模式,不然就是独占模式。

CLH队列解析

在看这些方法的源码之前,先了解一下CLH的结构以及部分源码。

CLH队列是一个FIFO队列,队列的每个节点都封装了线程,以及前驱节点和后驱节点,另外节点还维护了一个等待状态waitStatus,用于描述节点当前的状态。

static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        //取消状态
        static final int CANCELLED =  1;
        //等待触发状态
        static final int SIGNAL    = -1;
        //等待条件状态
        static final int CONDITION = -2;
        //需要向后传播状态,在共享模式下使用
        static final int PROPAGATE = -3;
        //当前节点状态
        volatile int waitStatus;
        //前驱节点
        volatile Node prev;
        //后驱节点
        volatile Node next;
        //节点持有的线程
        volatile Thread thread;
        Node nextWaiter;
        ...
}

节点大概会构成下图的双向链表。黄色表示头部节点,一般该节点是一个空节点,或者可以理解为正在持有资源的线程所属的节点,然后下一个节点就是等待中的节点。

CLH队列

节点状态一般分为4种:

1. waitStatus = (CANCELLED = 1) ——表示这个节点因为某些原因被取消掉了

2. waitStatus = (SIGNAL= -1) ——表示这个节点正在等待触发状态,只有前驱节点是这个状态的时候,当前节点才会进入休眠状态。

3. waitStatus = (CONDITION = -2) ——表示这个节点等待某个条件

4. waitStatus = (PROPAGATE= -1) ——表示这个节点被唤醒后需要向后继续传播,唤醒下一个节点

因为多线程竞争的原因,CLH队列插入节点和设置节点都采用CAS+自旋的方式来完成。

独占模式源码解析

独占模式主要使用

acquire()



release()

方法。我们先来看一下

acquire()

获取资源的方法的源码。

//tryAcquire(arg)是一个抽象方法,给用户实现具体的逻辑的
//如果tryAcquire(arg)返回false,说明获取资源失败,就尝试将当前线程加入到CLH队列中,然后当前线程进入阻塞
public final void acquire(int arg) {
        //Node.EXCLUSIVE表示这是独占模式
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}
private Node addWaiter(Node mode) {
        //将当前执行线程封装成节点
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        //用CAS操作尝试将生成的node插入到CLH队尾
        if (pred != null) {
            node.prev = pred;
            //如果设置成功了,就直接返回生成的这个Node
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //如果前面用CAS插入失败了,就用自旋重试的方式将Node插入到队尾
        enq(node);
        return node;
}
private Node enq(final Node node) {
        //用自旋+cas的方式设置node到CLH队列尾部
        for (;;) {
            Node t = tail;
            //如果队列尾部是null,说明整个队列尚未初始化,就要初始化一下,设置队列头部
            //设置完后再自旋一次尝试添加node到队列尾部
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
}
//让前面生成的Node进入休眠
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            //保存中断标志位
            boolean interrupted = false;
            //进入自旋,
            for (;;) {
                //获取当前node节点的前驱节点
                final Node p = node.predecessor();
                //如果前驱节点是头部节点,就可以再次尝试获取资源
                //注意,这里是一个自旋的循环体,所以插入的线程在休眠一段时间后醒来就可以去检查自己是不是第二个节点
                //如果已经排到第二个了就可以尝试获取资源了
                if (p == head && tryAcquire(arg)) {
                    //如果获取资源成功了,就将node节点设置为头部节点
                    setHead(node);
                    //为了帮助GC
                    p.next = null; 
                    failed = false;
                    //返回中断表示
                    return interrupted;
                }
                //如果是排在第二个以后,就没必要获取资源了,因为轮不到你
                //shouldParkAfterFailedAcquire(p, node)判断当前线程是否需要进入休眠
                //parkAndCheckInterrupt()方法是让线程进入休眠并且检查中断的发生
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //如果failed标志位是true,也就是因为某些原因失败了
            if (failed)
                //将node节点设置为Node.CANCELLED,并将其移除CLH队列
                cancelAcquire(node);
        }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //如果前驱节点的状态是Node.SIGNAL,就说明node节点需要进入休眠
        if (ws == Node.SIGNAL)
            return true;
         //如果前驱节点的状态大于0,也就是Node.CANCELLED状态,那说明前驱节点已经被取消了
        if (ws > 0) {
            //将取消的前驱节点从CLH队列中移除
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //不然就设置前驱的状态为Node.SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
}


acquire()

的逻辑就是先执行用户实现的

tryAcquire(arg)

方法,如果返回false,就当前线程封装成一个CLH的节点然后插入到CLH队列中去,并让当前线程进入休眠。

我们再来看一下

release()

的源码

public final boolean release(int arg) {
        //tryRelease(arg)方法也是一个抽象方法,交给用户来实现
        if (tryRelease(arg)) {
            //如果tryRelease(arg)返回true,就获取CLH的头部节点,然后唤醒它的后驱节点
            Node h = head;
            //要先判断h不是null,并且状态被设置过
            //如果这两个条件不成立,说明没有后驱节点正在等待中,就没必要唤醒后驱节点了
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}
private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        //获取当前节点的状态,如果小于0,说明节点不是取消状态,那就把节点状态设置为0,也就是无状态
        //就尝试设置节点的状态为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //获取后驱节点
        Node s = node.next;
        //如果后驱节点的状态大于0,说明后驱节点被取消了,那就取找其他的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            //从CLH队列尾部开始找,找到最后一个,也就是排在最前面的没有被取消的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //唤醒那个没有被取消的后驱节点
        if (s != null)
            LockSupport.unpark(s.thread);
}


release()

的方法其实很简单,先执行

tryRelease(arg)

,让用户自己实现释放资源的逻辑。之后如果这里返回true,表示有资源可以用,它就会从CLH队列中获取头部的下一个没被取消的后驱节点,然后唤醒它。

所以,

acquire()



release()

其实就是获取资源和释放资源的方法。当某个线程获取资源的时候,如果资源不够,就把这个线程放入CLH队列中去等待。然后其他线程是否资源,有资源空出来了,就唤醒CLH队列中排在最前面的那个线程,告诉它有资源了。

共享模式源码解析

共享模式主要是

acquireShared(int arg)



releaseShared(int arg)



我们先看一下

acquireShared(int arg)

的源码

public final void acquireShared(int arg) {
        //tryAcquireShared(arg)是一个抽象方法,留给用户自己实现
        //和独占模式不一样的是,tryAcquireShared(arg)返回的是一个int值
        //如果tryAcquireShared(arg)返回的小于0,表示当前已经没有资源可以用了
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
        //往CLH队列尾部添加一个节点,节点的模式为SHARED
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            //保存中断标志
            boolean interrupted = false;
            //开始自旋
            for (;;) {
                //先获取前驱节点
                final Node p = node.predecessor();
                //如果前驱节点是头部节点,说明此时自己排在第一位了。因为头部节点是持有资源的那个线程的节点
                if (p == head) {
                    //再尝试获取一下资源
                    int r = tryAcquireShared(arg);
                    //如果获取到资源了
                    if (r >= 0) {
                        //将自己设置为CLH的头部,并且向后传播唤醒自己的下一个节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        //如果发生中断了,就要恢复中断
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //和独占模式一样,先判断是否需要挂起线程,需要的话就挂起线程,不需要的话就继续自旋下去
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}
private void setHeadAndPropagate(Node node, int propagate) {
        //先将当前节点设置为队列头部节点
        Node h = head; 
        setHead(node);
         //判断是否需要向下传播
        if (propagate > 0 || h == null || h.waitStatus < 0) {
            //获得下一个节点
            Node s = node.next;
            //如果下一个节点是一个共享节点,就唤醒执行doReleaseShared()方法。也就是尝试唤醒下一个节点(如果有足够资源的话)
            if (s == null || s.isShared())
                //这个方法下面再解析
                doReleaseShared();
        }
}

从上面的源码我们可以看到,共享模式获取资源的源码和独占模式差不多。就是有两点不一样。一、独占模式的tryAcquire(int arg)方法只要返回true或者false就好了。共享模式会返回一个int,来表示资源的数量。这也是为了共享模式下传播行为。二、一个节点被唤醒后,独占模式是单纯的设置自己为CLH队列头部。而在共享模式下,除了设置队列头部以外,如果资源还有剩余,还会继续尝试唤醒下一个节点。

再来看一下

release()

方法的源码

    public final boolean releaseShared(int arg) {
        //tryReleaseShared(arg)是一个抽象方法,交给用户实现
        //具体的释放资源的逻辑也是在这个方法中实现
        //这里如果返回true,表示还有资源剩余
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    private void doReleaseShared() {
        for (;;) {
            //先获取队列的头部
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //如果当前节点状态是SIGNAL,说明有后驱节点正在等待,就去唤醒后驱节点
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                //如果头部节点的状态是0,那用CAS将其设置为传播的状态
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;            
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

在释放资源方面,共享模式的逻辑和独占模式差不多。都是简单唤醒头部节点的下一个节点,也就是CLH队列的第二个节点。

看完共享模式的整个代码,我们发现其实它和独占模式的唯一区别,就是当一个等待中的节点被唤醒时,独占模式的节点比较自私,不会去唤醒下一个排队节点。共享模式下的节点如果被唤醒了,就会继续通知自己的下一个排队节点,让它自己去检查资源是否够了。

四、总结

AQS的原理其实很简单,但是我们自己要去实现也有一定的难度,因为要考虑的情况比较多:并发、中断、性能等等。Doug Lea大神在AQS中把CAS玩的出神入化,这个类也是学习并发的很好的例子。

另外,单单看这个类而且用到实际用途中可能还是比较难以理解,读者可以结合

ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore

等类的实现原理来看AQS,就很容易理解了。

下面附上我写的一篇CountDownLatch、Semaphore、ReentrantLock 的文章


CountDownLatch & Semaphore 实现原理详解



ReentrantLock 实现原理详解

最后,如果哪里有写的不对的地方,烦请指出,感激不尽!



版权声明:本文为u013332124原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。