【Java并发编程系列】高并发工具类之线程协作工具类

  • Post author:
  • Post category:java




一、CountDownLatch


CountDownLatch

是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。


CountDownLatch

是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减

1

。当计数器到达

0

时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。

CountDownLatch原理

现在有这样一个运动会场景,总共有

5

名运动员,

1

名裁判,比赛开始前,所有运动员需等待裁判打出发令枪,然后开始跑,等所有运动员跑完后,裁判需要号召全体运动员集合

public class SportsMeeting{
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(5);

        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("运动员" + no + "准备完毕");
                        begin.await();
                        System.out.println("运动员" + no + "正在跑!");
                        Thread.sleep(new Random().nextInt(2000));
                        System.out.println("运动员" + no + "跑到终点!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        end.countDown();
                    }
                }

            };
            poolExecutor.execute(runnable);
        }

        /* 裁判准备发令枪 */
        Thread.sleep(5000);
        System.out.println("---------发令枪响,比赛开始!---------");
        begin.countDown();
        end.await();
        System.out.println("---------比赛结束,所有参赛人员请集合!---------");

        poolExecutor.shutdown();
    }
}
运动员5准备完毕
运动员4准备完毕
运动员1准备完毕
运动员2准备完毕
运动员3准备完毕
---------发令枪响,比赛开始!---------
运动员4正在跑!
运动员2正在跑!
运动员1正在跑!
运动员5正在跑!
运动员3正在跑!
运动员5跑到终点!
运动员1跑到终点!
运动员2跑到终点!
运动员4跑到终点!
运动员3跑到终点!
---------比赛结束,所有参赛人员请集合!---------

ps:计数器必须大于

0

,计数器等于0的时候,调用

await

方法时就不会阻塞当前线程。

CountDownLatch

不能重新初始化或者修改

CountDownLatch

内部计数器的值



二、CyclicBarrier


CyclicBarrier

的字面意思是可循环使用(

Cyclic

)的屏障(

Barrier

)。它要做的事情就是让一组线程到达一个

屏障

(也可以叫

同步点

)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行


CyclicBarrier

有两组构造函数

public CyclicBarrier(int parties) {
        this(parties, null);
    }
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

第二组构造函数的不同之处就是当所有线程到达屏障时,优先执行

barrierAction

线程,方便处理更复杂的业务场景

前面介绍了

CountDownLatch

,这两者的主要区别是:


  • CountDownLatch

    是一次性的,

    CyclicBarrier

    是可循环利用的

  • CountDownLatch

    参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束,主要用于事件。

    CyclicBarrier


    参主要用于线程。

现在有这样一个场景,有一组学生总共

15

个人需要乘车前往一个旅游景点,但是一辆车一次性只能乘坐

5

名同学,所以只能分批次乘车,而且每一批必须等所有人到齐了才能发车

public class Ride{
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("---------这波人都到了,乘车出发!---------");
            }
        });

        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,10,60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10));
        for (int i = 0; i < 15; i++) {
            final int no = i+1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("同学"+no+"正在前往指定地点");
                        Thread.sleep(new Random().nextInt(4000));
                        System.out.println("同学"+no+"到指定地点,正在等待其他人");
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            };
            poolExecutor.execute(runnable);
        }
        poolExecutor.shutdown();
    }
}
同学3正在前往指定地点
同学1正在前往指定地点
同学5正在前往指定地点
同学4正在前往指定地点
同学2正在前往指定地点
同学5到指定地点,正在等待其他人
同学3到指定地点,正在等待其他人
同学1到指定地点,正在等待其他人
同学4到指定地点,正在等待其他人
同学2到指定地点,正在等待其他人
---------这波人都到了,乘车出发!---------
同学6正在前往指定地点
同学7正在前往指定地点
同学10正在前往指定地点
同学9正在前往指定地点
同学8正在前往指定地点
同学8到指定地点,正在等待其他人
同学10到指定地点,正在等待其他人
同学6到指定地点,正在等待其他人
同学9到指定地点,正在等待其他人
同学7到指定地点,正在等待其他人
---------这波人都到了,乘车出发!---------
同学11正在前往指定地点
同学12正在前往指定地点
同学14正在前往指定地点
同学13正在前往指定地点
同学15正在前往指定地点
同学11到指定地点,正在等待其他人
同学13到指定地点,正在等待其他人
同学14到指定地点,正在等待其他人
同学12到指定地点,正在等待其他人
同学15到指定地点,正在等待其他人
---------这波人都到了,乘车出发!---------



三、Semaphore


Semaphore

(信号量)是用来控制同时访问特定资源的线程数量,通过协调各个线程,给各个线程发放“通行证”,以保证合理地使用公共资源


Semaphore

的常见使用场景:


Semaphore

的使用场景主要用于流量控制,比如数据库连接,同时使用的数据库连接会有数量限制,数据库连接不能超过一定的数量,当连接到达了限制数量后,后面的线程只能排队等前面的线程释放数据库连接后才能获得数据库连接。

再比如交通公路上的红绿灯,绿灯亮起时只能让

100

辆车通过,红灯亮起不允许车辆通过。

再比如停车场的场景中,一个停车场有有限数量的车位,同时能够容纳多少台车,车位满了之后只有等里面的车离开停车场外面的车才可以进入。


Semaphore原理图:


Semaphore原理图

来解释一下 Semaphore ,Semaphore 有一个初始容量,这个初始容量就是 Semaphore 所能够允许的信号量。在调用 Semaphore 中的 acquire 方法后,Semaphore 的容量 -1,相对的在调用 release 方法后,Semaphore 的容量 + 1,在这个过程中,计数器一直在监控 Semaphore 数量的变化,等到流量超过 Semaphore 的容量后,多余的流量就会放入等待队列中进行排队等待。等到 Semaphore 的容量允许后,方可重新进入。

Semaphore 所控制的流量其实就是一个个的线程,因为并发工具最主要的研究对象就是线程。

现在有这样一个场景,需要读取几万个文件的数据,并且将文件数据写入到数据库中,但是数据库的连接数只有10个,所以这时必须控制只有10个线程拿到数据库连接资源去操作数据库

public class SavaMassiveData{
    private static final int ThreadCount = 30;
    private static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 12, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    private static Semaphore semaphore = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < ThreadCount; i++) {
            poolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        System.out.println("save data");
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        poolExecutor.shutdown();
    }
}

其他

Semaphore

方法


  • drainPermits

    : 获取并退还所有立即可用的许可,其实相当于使用 CAS 方法把内存值置为 0

  • reducePermits

    :和 nonfairTryAcquireShared 方法类似,只不过nonfairTryAcquireShared 是使用 CAS 使内存值 + 1,而 reducePermits 是使内存值 – 1 。

  • isFair

    :对 Semaphore 许可的争夺是采用公平还是非公平的方式,对应到内部的实现就是 FairSync 和NonfairSync

  • hasQueuedThreads

    :当前是否有线程由于要获取 Semaphore 许可而进入阻塞。

  • getQueuedThreads

    :返回一个包含了等待获取许可的线程集合。

  • getQueueLength

    :获取正在排队而进入阻塞状态的线程个数



版权声明:本文为qq_43442335原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。