前言
Abstract抽象的、Ownable拥有、Synchronizer同步器、Queued队列。
AbstractOwnableSynchronizer简称为AOS。
AbstractQueuedSynchronizer简称为AQS。
AbstractOwnableSynchronizer(AOS)
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
protected AbstractOwnableSynchronizer() { }
//私有的不会被序列化的独占thread
private transient Thread exclusiveOwnerThread;
//set
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
//get
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
由于AOS是一个抽象类不能直接实例化,我们定义一个子类实例化
public class AosClient extends AbstractOwnableSynchronizer {
public static void main(String[] args) {
AosClient client = new AosClient();
client.setExclusiveOwnerThread(Thread.currentThread());
Thread exclusiveOwnerThread = client.getExclusiveOwnerThread();
System.out.println(exclusiveOwnerThread);
}
}
输出
Thread[main,5,main]
AOS比较简单,只有一个参数线程,可以通过get和set来设置这个线程。
AbstractQueuedSynchronizer(AQS)
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {}
AQS继承了AOS,又因为本身是一个抽象类无法直接进行实例化。
实例化
由于AQS也是一个抽象类不能直接实例化,所以我们定义一个子类实例化
public class AqsClient extends AbstractQueuedSynchronizer {
public static void main(String[] args) {
AqsClient client = new AqsClient();
}
}
不同包子类可以直接调用父类中的public和protected方法。
AQS 源码注释
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
AQS可以认为是一个framework 框架,实现阻塞锁和相关同步器。依赖 FIFO 等待队列。
Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class.
子类应该被定义为non-public internal helper(非公共内部)类,用于在实现同步属性。
AQS子类
//ReentrantLock
abstract static class Sync extends AbstractQueuedSynchronizer
//ReentrantReadWriteLock
abstract static class Sync extends AbstractQueuedSynchronizer
//CountDownLatch
private static final class Sync extends AbstractQueuedSynchronizer
//Semaphore
abstract static class Sync extends AbstractQueuedSynchronizer
这些类都是通过定义静态内部类的形式类使用AQS。在Sync的内部重写tryAcquire、tryRelease等方法,这些方法都有一个特点,那就是在AQS中没有实现,只会抛出一个UnsupportedOperationException,例如tryRelease
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
例如CountDownLatch重写的tryAcquireShared
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
然后在另一个方法await中调用
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
tryAcquireSharedNanos中调用了tryAcquireShared(arg),如果方法没有实现就会抛出异常。
自定义 Sync
我们来定义一个Sync,模仿CountDownLatch
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
logger.info("Sync 初始化...");
setState(count);
}
int getCount() {
return getState();
}
}
在Sync的内部根据权限设计,可以访问protected和public级别的方法。而在Sync的外部我们只关心public方法。
AQS public方法
如下图,首先我们来整体感受一下所有的public方法
idea中小绿锁代表public,左上角的白色代表final。也就是所有的public均为final。
acquire
以独占模式获取锁,并且忽略中断,实现至少调用一次tryAcquire,成功后返回,否则线程将会排队,可能重复阻塞或者取消阻塞知道调用tryAcquire成功,此方法可以用来实现加锁。
public final void acquire(int arg) {
if (!tryAcquire(arg))
acquire(null, arg, false, false, false, 0L);
}
tryAcquire
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
acquireInterruptibly
由于acquire不支持中断,所以该模式支持了响应中断
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted() ||
(!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
throw new InterruptedException();
}
acquireShared
类似于acquire,不同是以共享模式获取锁。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
acquire(null, arg, true, false, false, 0L);
}
参数对比第一个是acquire,第二个是acquireShared,两者只有第三个参数也就是boolean shared 发生变化。
acquire(null, arg, false, false, false, 0L);
acquire(null, arg, true, false, false, 0L);
acquireSharedInterruptibly
类似于第二个方法acquireInterruptibly,区别是一个是独占模式,这个是共享模式。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted() ||
(tryAcquireShared(arg) < 0 &&
acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
}
getWaitingThreads
获取等待线程并且返回一个集合
public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
//判断条件所属
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitingThreads();
}
实现中调用了 condition.getWaitingThreads() 这个方法。
getWaitQueueLength
获取等待队列长度
public final int getWaitQueueLength(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitQueueLength();
}
hasContended
判断 head 是否为空
public final boolean hasContended() {
return head != null;
}
hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {
Thread first = null; Node h, s;
if ((h = head) != null && ((s = h.next) == null ||
(first = s.waiter) == null ||
s.prev == null))
first = getFirstQueuedThread(); // retry via getFirstQueuedThread
return first != null && first != Thread.currentThread();
}
hasQueuedThreads
public final boolean hasQueuedThreads() {
for (Node p = tail, h = head; p != h && p != null; p = p.prev)
if (p.status >= 0)
return true;
return false;
}
hasWaiters
public final boolean hasWaiters(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.hasWaiters();
}
isQueued
判断给定线程是否在队列中
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
if (p.waiter == thread)
return true;
return false;
}
owns
public final boolean owns(ConditionObject condition) {
return condition.isOwnedBy(this);
}
release
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
signalNext(head);
return true;
}
return false;
}
tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (!Thread.interrupted()) {
if (tryAcquire(arg))
return true;
if (nanosTimeout <= 0L)
return false;
int stat = acquire(null, arg, false, true, true,
System.nanoTime() + nanosTimeout);
if (stat > 0)
return true;
if (stat == 0)
return false;
}
throw new InterruptedException();
}
tryAcquireSharedNanos
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (!Thread.interrupted()) {
if (tryAcquireShared(arg) >= 0)
return true;
if (nanosTimeout <= 0L)
return false;
int stat = acquire(null, arg, true, true, true,
System.nanoTime() + nanosTimeout);
if (stat > 0)
return true;
if (stat == 0)
return false;
}
throw new InterruptedException();
}
AQS protected方法
分为 final 和 非 final 方法,其中 3,5,6,7,8方法可以由子类进行一个重写。
final
compareAndSetState
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSetInt(this, STATE, expect, update);
}
getState
protected final int getState() {
return state;
}
setState
protected final void setState(int newState) {
state = newState;
}
非final(可重写)
tryAcquire
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryAcquireShared
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
tryRelease
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
tryReleaseShared
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
isHeldExclusively
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
AQS default方法
图标显示为一个圆圈的访问级别为default默认隔离接别(又称包隔离级别)。根据图标可以看出这里的方法不是final就是static的都是可以用但是不能改的形式。
acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
apparentlyFirstQueuedIsExclusive
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
fullyRelease
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
selfInterrupt
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
transferAfterCancelledWait
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
transferForSignal
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
AQS private方法
addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
cancelAcquire
private int cancelAcquire(Node node, boolean interrupted,
boolean interruptible) {
if (node != null) {
node.waiter = null;
node.status = CANCELLED;
if (node.prev != null)
cleanQueue();
}
if (interrupted) {
if (interruptible)
return CANCELLED;
else
Thread.currentThread().interrupt();
}
return 0;
}
casTail
private boolean casTail(Node c, Node v) {
return U.compareAndSetReference(this, TAIL, c, v);
}
cleanQueue
private void cleanQueue() {
for (;;) { // restart point
for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
if (q == null || (p = q.prev) == null)
return; // end of list
if (s == null ? tail != q : (s.prev != q || s.status < 0))
break; // inconsistent
if (q.status < 0) { // cancelled
if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
q.prev == p) {
p.casNext(q, s); // OK if fails
if (p.prev == null)
signalNext(p);
}
break;
}
if ((n = p.next) != q) { // help finish
if (n != null && q.prev == p) {
p.casNext(n, q);
if (p.prev == null)
signalNext(p);
}
break;
}
s = q;
q = q.prev;
}
}
}
signalNext
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING);
LockSupport.unpark(s.waiter);
}
}
signalNextIfShared
private static void signalNextIfShared(Node h) {
Node s;
if (h != null && (s = h.next) != null &&
(s instanceof SharedNode) && s.status != 0) {
s.getAndUnsetStatus(WAITING);
LockSupport.unpark(s.waiter);
}
}
tryInitializeHead
private void tryInitializeHead() {
Node h = new ExclusiveNode();
if (U.compareAndSetReference(this, HEAD, null, h))
tail = h;
}
AQS内部类
Node
Wait queue node class.
The wait queue is a variant of a “CLH” (Craig, Landin, and Hagersten) lock queue.
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
//前一个节点
volatile Node prev;
//后一个节点
volatile Node next;
//当前节点线程
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
ConditionObject
public class ConditionObject implements Condition, java.io.Serializable {}
Condition
用于线程的等待和唤醒。
public interface Condition {
//等待可中断
void await() throws InterruptedException;
//给定时间等待可中断
boolean await(long time, TimeUnit unit) throws InterruptedException;
// or the specified waiting time elapses. 经过等待指定时间
long awaitNanos(long nanosTimeout) throws InterruptedException;
//不响应中断
void awaitUninterruptibly();
//等待直到
boolean awaitUntil(Date deadline) throws InterruptedException;
//随机唤醒
void signal();
//唤醒全部
void signalAll();
}
ConditionObject构造函数
public ConditionObject() { }
ConditionObject字段
private static final long serialVersionUID = 1173984872572414699L;
//第一个节点
private transient ConditionNode firstWaiter;
//最后一个节点
private transient ConditionNode lastWaiter;
ConditionObject属性
//获取等待队列长度
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
if ((w.status & COND) != 0)
++n;
}
return n;
}
//获取等待线程
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<>();
for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
if ((w.status & COND) != 0) {
Thread t = w.waiter;
if (t != null)
list.add(t);
}
}
return list;
}
ConditionObject方法
ConditionObject.hasWaiters
判断队列有无等待线程
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
if ((w.status & COND) != 0)
return true;
}
return false;
}
ConditionObject.isOwnedBy
如果此条件是由给定的同步对象创建的,则返回true。
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
ConditionObject.canReacquire
如果最初放置在条件队列中的节点现在可以在同步队列中重新获取,则返回true。
private boolean canReacquire(ConditionNode node) {
return node != null && node.prev != null && isEnqueued(node);
}
ConditionObject.doSignal
移除一个或所有等待者并将其转移到同步队列。
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
if ((firstWaiter = next) == null)
lastWaiter = null;
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
enqueue(first);
if (!all)
break;
}
first = next;
}
}
ConditionObject.enableWait
将节点添加到条件列表并释放锁。
private int enableWait(ConditionNode node) {
if (isHeldExclusively()) {
node.waiter = Thread.currentThread();
node.setStatusRelaxed(COND | WAITING);
ConditionNode last = lastWaiter;
if (last == null)
firstWaiter = node;
else
last.nextWaiter = node;
lastWaiter = node;
int savedState = getState();
if (release(savedState))
return savedState;
}
node.status = CANCELLED; // lock not held or inconsistent
throw new IllegalMonitorStateException();
}
ConditionObject.unlinkCancelledWaiters
从条件队列中取消给定节点和其他非等待节点的链接,除非已取消链接。
private void unlinkCancelledWaiters(ConditionNode node) {
if (node == null || node.nextWaiter != null || node == lastWaiter) {
ConditionNode w = firstWaiter, trail = null;
while (w != null) {
ConditionNode next = w.nextWaiter;
if ((w.status & COND) == 0) {
w.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
} else
trail = w;
w = next;
}
}
}
AQS字段
static final int WAITING = 1; // must be 1
static final int CANCELLED = 0x80000000; // must be negative
static final int COND = 2; // in a condition wait
//
private transient volatile Node head;
private transient volatile Node tail;
// Unsafe
private static final Unsafe U = Unsafe.getUnsafe();
private static final long STATE
= U.objectFieldOffset(AbstractQueuedSynchronizer.class, "state");
private static final long HEAD
= U.objectFieldOffset(AbstractQueuedSynchronizer.class, "head");
private static final long TAIL
= U.objectFieldOffset(AbstractQueuedSynchronizer.class, "tail");
//序列化id
private static final long serialVersionUID = 7373984972572414691L;
AQS属性
state
private volatile int state;
protected final void setState(int newState) {
state = newState;
}
protected final int getState() {
return state;
}
isHeldExclusively
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
getSharedQueuedThreads
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<>();
for (Node p = tail; p != null; p = p.prev) {
if (p instanceof SharedNode) {
Thread t = p.waiter;
if (t != null)
list.add(t);
}
}
return list;
}
getQueueLength
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.waiter != null)
++n;
}
return n;
}
getQueuedThreads
获取队列线程并返回一个集合
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.waiter;
if (t != null)
list.add(t);
}
return list;
}
getFirstQueuedThread
获取队列第一个线程
public final Thread getFirstQueuedThread() {
Thread first = null, w; Node h, s;
if ((h = head) != null && ((s = h.next) == null ||
(first = s.waiter) == null ||
s.prev == null)) {
//
for (Node p = tail, q; p != null && (q = p.prev) != null; p = q)
if ((w = p.waiter) != null)
first = w;
}
return first;
}
getExclusiveQueuedThreads
获取独占队列线程
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<>();
for (Node p = tail; p != null; p = p.prev) {
//如果节点不是共享节点
if (!(p instanceof SharedNode)) {
Thread t = p.waiter;
if (t != null)
list.add(t);
}
}
return list;
}
AQS 子类
juc包下主要的类有CountDownLatch、ThreadPoolExecutor、ReentrantReadWriteLock、ReentrantLock等类。
AQS对于LockSupport的使用
LockSupport.unpark
private void unparkSuccessor(Node node) {
LockSupport.unpark(s.thread);
}
final boolean transferForSignal(Node node) {
LockSupport.unpark(node.thread);
}
LockSupport.parkXX相关方法
private void unparkSuccessor(Node node)
private final boolean parkAndCheckInterrupt()
private boolean doAcquireNanos(int arg, long nanosTimeout)
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
final boolean transferForSignal(Node node)
public final void awaitUninterruptibly()
public final void await()
public final long awaitNanos(long nanosTimeout)
public final boolean awaitUntil(Date deadline)
public final boolean await(long time, TimeUnit unit)
AQS对于Unsafe的使用
搜索 Unsafe. 有11处用法。
1个protected 方法
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
1个获得unsafe实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
5个使用unsafe获得字段偏移objectFieldOffset
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
4个私有CAS方法
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}