CountDownLatch 小记

  • Post author:
  • Post category:其他



目录


类结构分析


countDown方法详解


await方法详解



CountDownLatch


是JDK提供的一个同步工具,它可以让一个或多个线程等待,一直等到其他线程中执行完成一组操作。

类结构分析


CountDownLatch 的内部结构很简单,主要有



countdown()









await




()



两个方法,CountDownLatch


在初始化时,需要指定定一个整数作为计数器。当调用



countDown



方法时,计数器会被减1;当调用



await



方法时,如果计数器大于0时,线程会被阻塞,一直到计数器被



countDown



方法减到0时,线程才会继续执行。计数器是无法重置的,当计数器被减到0时,调用



await



方法都会直接返回。


还有一个很重要的sync的内部类:

public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
           //调用父类的方法
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {//死循环,如果CAS操作失败就会不断继续尝试。
                int c = getState();//获取当前计数器的值。
                if (c == 0)// 计数器为0时,就直接返回。
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))// 使用CAS方法对计数器进行减1操作
                    return nextc == 0;//如果操作成功,返回计数器是否为0
            }
        }
    }

    private final Sync sync;

    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

//代码省略...

}

由上代码片段可知,当我们在实例化一个

CountDownLatch

时,需要传递一个int类型的参数,这个count参数由sync传递给了AbstractQueuedSynchronizer的state属性。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private volatile int state;

    /**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a {@code volatile} read.
     * @return current state value
     */
    protected final int getState() {
        return state;
    }

    /**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a {@code volatile} write.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }
//代码省略...

}


countDown方法详解

调用AQS里的模板方法

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//对计数器进行减一操作
            doReleaseShared();//如果计数器为0,唤醒被await方法阻塞的所有线程
            return true;
        }
        return false;
}
 protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {//死循环,如果CAS操作失败就会不断继续尝试。
                int c = getState();//获取当前计数器的值。
                if (c == 0)// 计数器为0时,就直接返回。
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))// 使用CAS方法对计数器进行减1操作
                    return nextc == 0;//如果操作成功,返回计数器是否为0
            }
        }

 private void doReleaseShared() {
        //唤醒头节点的后继节点
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //头节点状态如果SIGNAL,则状态重置为0,并调用unparkSuccessor唤醒下个节点
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                //被唤醒的节点状态会重置成0,在下一次循环中被设置成PROPAGATE状态,代表状态要向后传播
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }


await方法详解

  public void await() throws InterruptedException {
        //模板方法
        sync.acquireSharedInterruptibly(1);
    }

判断计数器是否为0,如果不为0则阻塞当前线程

阻塞内容详见:

【并发编程】CountDownLatch 源码分析_街灯下的小草的博客-CSDN博客

protected int tryAcquireShared(int acquires) {
   return (getState() == 0) ? 1 : -1;
}

由此分析可知,countdownlatch的原理就是利用AQS的state标识,来判断是否执行被阻塞的(调用了await方法的线程)线程!

await():阻塞当前线程,将当前线程加入AQS的阻塞队列;

countDown():对计数器进行递减操作,当计数器递减至0时(CAS实现),当前线程会取唤醒阻塞队列中的所有线程。

功能代码demo:

比如张三、李四和王五几个人约好去饭店一起去吃饭,这几个人都是比较绅士,要等到所有人都到齐以后才让服务员上菜。这种场景就可以用到CountDownLatch

顾客:

package com.cjian.countdownlatch;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class Customer implements Runnable {

    private String name;

    private CountDownLatch latch;

    public Customer(CountDownLatch latch, String name) {
        this.latch = latch;
        this.name = name;
    }

    public void run() {
        try {
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
            Random random = new Random();

            System.out.println(sdf.format(new Date()) + " " + name + "出发去饭店");
            Thread.sleep((long) (random.nextDouble() * 3000) + 1000);
            System.out.println(sdf.format(new Date()) + " " + name + "到了饭店");
            latch.countDown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

服务员:

package com.cjian.countdownlatch;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

public class Waitress implements Runnable{

    private String name;
    private CountDownLatch latch;

    public Waitress(CountDownLatch latch, String name) {
        this.latch = latch;
        this.name = name;
    }

    public void run() {
        try {
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
            System.out.println(sdf.format(new Date()) + " " + name  + "等待顾客");
            latch.await();
            //服务员只等待三秒
            //latch.await(3, TimeUnit.SECONDS);
            System.out.println(sdf.format(new Date()) + " " + name  + "开始上菜");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

测试:

package com.cjian.countdownlatch;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchTester {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);

        List<Thread> threads = new ArrayList<Thread>();
        threads.add(new Thread(new Customer(latch, "张三")));
        threads.add(new Thread(new Customer(latch, "李四")));
        threads.add(new Thread(new Customer(latch, "王五")));
        for (Thread thread : threads) {
            thread.start();
        }

        Thread.sleep(100);
        new Thread(new Waitress(latch, "♥小芳♥")).start();

        for (Thread thread : threads) {
            thread.join();
        }
    }

}

运行结果:

可以看到,服务员小芳在调用

await

方法时一直阻塞着,一直等到三个顾客都调用了

countDown

方法才继续执行。

如果有一个顾客迟迟没到,饭店都打样了,也不能一直等啊,应该这么办?可以使用await的另一个重载方法,latch.await(3, TimeUnit.SECONDS);

测试结果:

本文章参考:

腾讯面试居然跟我扯了半小时的CountDownLatch_万猫学社的博客-CSDN博客



版权声明:本文为cj_eryue原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。