PriorityBlockingQueue和DelayQueue队列剖析

  • Post author:
  • Post category:其他




阻塞队列BlockingQueue数组和链表的实现

中剖析了基于数组和链表的实现的阻塞队列,接下来来剖析一下与优先级相关的阻塞队列PriorityBlockingQueue和DelayQueue。在介绍之前需要了解优先级队列相关的知识,以下是优先级队列的相关知识链接:


源码阅读(12):Java中主要的Queue、Deque结构——PriorityQueue集合(上)



源码阅读(13):Java中主要的Queue、Deque结构——PriorityQueue集合(中)


当然优先级队列在JAVA中实现有PriorityQueue,该优先级队列是基于数据实现的。那么接下来就基于已经了解优先级队列的实现源码来剖析PriorityBlockingQueue和DelayQueue。



PriorityBlockingQueue优先级阻塞队列

在了解基于数组实现的优先级队列的原理后,再看PriorityBlockingQueue就相对简单,PriorityBlockingQueue相对于PriorityQueue只是增加了锁及条件队列。PriorityBlockingQueue是一个无界的阻塞队列,其可以无限扩大(不是绝对。由于其是基于数组实现,而且其数量也是使用int类型记录,所以队列也是有界的)。

private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8

PriorityBlockingQueue的实现与ArrayBlockingQueue的实现大同小异,只是使用的数据结构不同,还有就是PriorityBlockingQueue放元素的线程不会放入条件队列中,而是在获取到锁后即可添加元素。这里就不再对源码进行剖析,剖析只是队列PriorityQueue的再次梳理,这里提供一个PriorityBlockingQueue的使用示例。

实体类:

@Data
@Builder
@ToString
public class User {
    private String name;
    private int age;
}

自定义比较器:

public class UserComparator implements Comparator<User> {
    @Override
    public int compare(User o1, User o2) {
        if (null == o1 && null == o2)
            return 0;

        if (null == o1)
            return -1;

        if (null == o2)
            return 1;

        return o1.getAge() - o2.getAge();
    }
}

示例代码:

public class PriorityBlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        PriorityBlockingQueue<User> users = new PriorityBlockingQueue<>(10, new UserComparator());
        User user = new User("wang", 3);
        users.offer(user);
        User user1 = new User("xiaoming", 8);
        users.offer(user1);
        User user2 = new User("lili", 7);
        users.offer(user2);

        while(true) {
            System.out.println(users.take());
            if (users.isEmpty())
                break;
        }
    }
}

如果是实体类实现Comparable接口,则在实现的方法compareTo中进行比较即可:

public class User implements Comparable<User> {
    private String name;
    private int age;

    @Override
    public int compareTo(User o) {
        // 计较逻辑
        return 0;
    }
}



DelayQueue延迟队列

DelayQueue也是基于优先级堆实现的可阻塞的无界队列,与PriorityBlockingQueue内部使用数组并且自己实现优先级算法不同,在DelayQeueu中维护了一个类型为PriorityQueue的属性,使用PriorityQueue的api实现优先级。在其内部维护一个ReentrantLock实例,用于元素存放和读取的串行执行,因为每次添加和获取都是操作的同一个PriorityQueue实例,这点与ArrayBlcokingQueue相同,与LinkedBlockingQueue不同。相应的也维护了一个Condition实例avaliable用于阻塞或者唤醒获取元素的线程,同时也实现元素延迟获取的功能。需要注意的是使用DelayQueue队列时,其元素必须实现Delayed接口,我们开查看一下

Delayed接口:

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

我们可以看到Delayed继承了Comparable,在之前我们了解到,使用ProrityQueue要么实现Comparable接口,要么提供比较器Comparator的实例,而这里必须实现Delayed接口,那么就等于是实现了Comparable接口,用于比较优先级使用,而getDelay方法则是用于元素延迟出队的实现。Delayed的getDelay就是实现延迟的关键,当从队列中获取一个优先级获取最优先的元素时,同时需要调用元素的getDelay方法进行判断,判断是否需要延迟,如果需要延迟则不返回元素,直到达到指定的延迟时间。其中take方法如下:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

在take方法中,是通过不停的循环进行判断,每次调用元素的getDelay方法进行判断,当到达指定的延迟时间之后,将元素返回。关于其他方法都是基于PriorityQueue进行的,这里就不再赘述。接下来通过一个示例来展示一下DelayQueue的使用。

接下来来看一下实体类:

@ToString
public class MovieTicket implements Delayed {

    private String name;
    private long create;
    // 延迟时间是毫秒
    private long delay;
    private long expire;

    public MovieTicket(String name, long delay) {
        this.name = name;
        this.delay = delay;
        this.create = System.currentTimeMillis();
        this.expire = create + this.delay;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        // 根据当前时间判断存活时间还有多久
        return  unit.convert((expire - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        // 入队比较不应该比较其delay属性,而应该比较两个对象的存活时间
        // 存活时间越短,就越需要靠前提早出队
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

示例代码:

public class DelayQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:MM:SS");

        DelayQueue<MovieTicket> queue = new DelayQueue<MovieTicket>();

        queue.add(new MovieTicket("钢铁侠", 10000));
        queue.add(new MovieTicket("魔戒", 3000));
        queue.add(new MovieTicket("白雪公主",5000));

        Thread.sleep(6000);
        queue.add(new MovieTicket("流浪地球",1000));
        System.out.println(format.format(new Date()) + ": 开始取票");
        while (true) {
            System.out.println(format.format(new Date()) + " : " + queue.take());
            if (queue.isEmpty()) {
                break;
            }
        }

    }
}

结果:

2020-11-08 11:11:663: 开始取票
2020-11-08 11:11:667 : MovieTicket(name=魔戒, create=1604807914658, delay=3000, expire=1604807917658)
2020-11-08 11:11:668 : MovieTicket(name=白雪公主, create=1604807914659, delay=5000, expire=1604807919659)
2020-11-08 11:11:668 : MovieTicket(name=流浪地球, create=1604807920663, delay=1000, expire=1604807921663)
2020-11-08 11:11:672 : MovieTicket(name=钢铁侠, create=1604807914658, delay=10000, expire=1604807924658)

这里需要思考一下为什么“流浪地球”不是在第一个。

PriorityBlockingQueue和DelayQueue相对来说也没有特别的复杂,数量掌握了PriorityQueue的实现原理后,PriorityBlockingQueue和DelayQueue只是在PriorityQueue上添加了一些特性从而提供了各种功能,所以说啊数据结构还是很重要滴。



版权声明:本文为pang5356原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。