目录
CountDownLatch
是JDK提供的一个同步工具,它可以让一个或多个线程等待,一直等到其他线程中执行完成一组操作。
类结构分析
CountDownLatch 的内部结构很简单,主要有
countdown()
和
await
()
两个方法,CountDownLatch
在初始化时,需要指定定一个整数作为计数器。当调用
countDown
方法时,计数器会被减1;当调用
await
方法时,如果计数器大于0时,线程会被阻塞,一直到计数器被
countDown
方法减到0时,线程才会继续执行。计数器是无法重置的,当计数器被减到0时,调用
await
方法都会直接返回。
还有一个很重要的sync的内部类:
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
//调用父类的方法
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {//死循环,如果CAS操作失败就会不断继续尝试。
int c = getState();//获取当前计数器的值。
if (c == 0)// 计数器为0时,就直接返回。
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))// 使用CAS方法对计数器进行减1操作
return nextc == 0;//如果操作成功,返回计数器是否为0
}
}
}
private final Sync sync;
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
//代码省略...
}
由上代码片段可知,当我们在实例化一个
CountDownLatch
时,需要传递一个int类型的参数,这个count参数由sync传递给了AbstractQueuedSynchronizer的state属性。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
//代码省略...
}
countDown方法详解
调用AQS里的模板方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//对计数器进行减一操作
doReleaseShared();//如果计数器为0,唤醒被await方法阻塞的所有线程
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {//死循环,如果CAS操作失败就会不断继续尝试。
int c = getState();//获取当前计数器的值。
if (c == 0)// 计数器为0时,就直接返回。
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))// 使用CAS方法对计数器进行减1操作
return nextc == 0;//如果操作成功,返回计数器是否为0
}
}
private void doReleaseShared() {
//唤醒头节点的后继节点
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//头节点状态如果SIGNAL,则状态重置为0,并调用unparkSuccessor唤醒下个节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//被唤醒的节点状态会重置成0,在下一次循环中被设置成PROPAGATE状态,代表状态要向后传播
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
await方法详解
public void await() throws InterruptedException {
//模板方法
sync.acquireSharedInterruptibly(1);
}
判断计数器是否为0,如果不为0则阻塞当前线程
阻塞内容详见:
【并发编程】CountDownLatch 源码分析_街灯下的小草的博客-CSDN博客
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
由此分析可知,countdownlatch的原理就是利用AQS的state标识,来判断是否执行被阻塞的(调用了await方法的线程)线程!
await():阻塞当前线程,将当前线程加入AQS的阻塞队列;
countDown():对计数器进行递减操作,当计数器递减至0时(CAS实现),当前线程会取唤醒阻塞队列中的所有线程。
功能代码demo:
比如张三、李四和王五几个人约好去饭店一起去吃饭,这几个人都是比较绅士,要等到所有人都到齐以后才让服务员上菜。这种场景就可以用到CountDownLatch
顾客:
package com.cjian.countdownlatch;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Customer implements Runnable {
private String name;
private CountDownLatch latch;
public Customer(CountDownLatch latch, String name) {
this.latch = latch;
this.name = name;
}
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
Random random = new Random();
System.out.println(sdf.format(new Date()) + " " + name + "出发去饭店");
Thread.sleep((long) (random.nextDouble() * 3000) + 1000);
System.out.println(sdf.format(new Date()) + " " + name + "到了饭店");
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
服务员:
package com.cjian.countdownlatch;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
public class Waitress implements Runnable{
private String name;
private CountDownLatch latch;
public Waitress(CountDownLatch latch, String name) {
this.latch = latch;
this.name = name;
}
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
System.out.println(sdf.format(new Date()) + " " + name + "等待顾客");
latch.await();
//服务员只等待三秒
//latch.await(3, TimeUnit.SECONDS);
System.out.println(sdf.format(new Date()) + " " + name + "开始上菜");
} catch (Exception e) {
e.printStackTrace();
}
}
}
测试:
package com.cjian.countdownlatch;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTester {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
List<Thread> threads = new ArrayList<Thread>();
threads.add(new Thread(new Customer(latch, "张三")));
threads.add(new Thread(new Customer(latch, "李四")));
threads.add(new Thread(new Customer(latch, "王五")));
for (Thread thread : threads) {
thread.start();
}
Thread.sleep(100);
new Thread(new Waitress(latch, "♥小芳♥")).start();
for (Thread thread : threads) {
thread.join();
}
}
}
运行结果:
可以看到,服务员小芳在调用
await
方法时一直阻塞着,一直等到三个顾客都调用了
countDown
方法才继续执行。
如果有一个顾客迟迟没到,饭店都打样了,也不能一直等啊,应该这么办?可以使用await的另一个重载方法,latch.await(3, TimeUnit.SECONDS);
测试结果: