在
阻塞队列BlockingQueue数组和链表的实现
中剖析了基于数组和链表的实现的阻塞队列,接下来来剖析一下与优先级相关的阻塞队列PriorityBlockingQueue和DelayQueue。在介绍之前需要了解优先级队列相关的知识,以下是优先级队列的相关知识链接:
源码阅读(12):Java中主要的Queue、Deque结构——PriorityQueue集合(上)
源码阅读(13):Java中主要的Queue、Deque结构——PriorityQueue集合(中)
当然优先级队列在JAVA中实现有PriorityQueue,该优先级队列是基于数据实现的。那么接下来就基于已经了解优先级队列的实现源码来剖析PriorityBlockingQueue和DelayQueue。
PriorityBlockingQueue优先级阻塞队列
在了解基于数组实现的优先级队列的原理后,再看PriorityBlockingQueue就相对简单,PriorityBlockingQueue相对于PriorityQueue只是增加了锁及条件队列。PriorityBlockingQueue是一个无界的阻塞队列,其可以无限扩大(不是绝对。由于其是基于数组实现,而且其数量也是使用int类型记录,所以队列也是有界的)。
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8
PriorityBlockingQueue的实现与ArrayBlockingQueue的实现大同小异,只是使用的数据结构不同,还有就是PriorityBlockingQueue放元素的线程不会放入条件队列中,而是在获取到锁后即可添加元素。这里就不再对源码进行剖析,剖析只是队列PriorityQueue的再次梳理,这里提供一个PriorityBlockingQueue的使用示例。
实体类:
@Data
@Builder
@ToString
public class User {
private String name;
private int age;
}
自定义比较器:
public class UserComparator implements Comparator<User> {
@Override
public int compare(User o1, User o2) {
if (null == o1 && null == o2)
return 0;
if (null == o1)
return -1;
if (null == o2)
return 1;
return o1.getAge() - o2.getAge();
}
}
示例代码:
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<User> users = new PriorityBlockingQueue<>(10, new UserComparator());
User user = new User("wang", 3);
users.offer(user);
User user1 = new User("xiaoming", 8);
users.offer(user1);
User user2 = new User("lili", 7);
users.offer(user2);
while(true) {
System.out.println(users.take());
if (users.isEmpty())
break;
}
}
}
如果是实体类实现Comparable接口,则在实现的方法compareTo中进行比较即可:
public class User implements Comparable<User> {
private String name;
private int age;
@Override
public int compareTo(User o) {
// 计较逻辑
return 0;
}
}
DelayQueue延迟队列
DelayQueue也是基于优先级堆实现的可阻塞的无界队列,与PriorityBlockingQueue内部使用数组并且自己实现优先级算法不同,在DelayQeueu中维护了一个类型为PriorityQueue的属性,使用PriorityQueue的api实现优先级。在其内部维护一个ReentrantLock实例,用于元素存放和读取的串行执行,因为每次添加和获取都是操作的同一个PriorityQueue实例,这点与ArrayBlcokingQueue相同,与LinkedBlockingQueue不同。相应的也维护了一个Condition实例avaliable用于阻塞或者唤醒获取元素的线程,同时也实现元素延迟获取的功能。需要注意的是使用DelayQueue队列时,其元素必须实现Delayed接口,我们开查看一下
Delayed接口:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
我们可以看到Delayed继承了Comparable,在之前我们了解到,使用ProrityQueue要么实现Comparable接口,要么提供比较器Comparator的实例,而这里必须实现Delayed接口,那么就等于是实现了Comparable接口,用于比较优先级使用,而getDelay方法则是用于元素延迟出队的实现。Delayed的getDelay就是实现延迟的关键,当从队列中获取一个优先级获取最优先的元素时,同时需要调用元素的getDelay方法进行判断,判断是否需要延迟,如果需要延迟则不返回元素,直到达到指定的延迟时间。其中take方法如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
在take方法中,是通过不停的循环进行判断,每次调用元素的getDelay方法进行判断,当到达指定的延迟时间之后,将元素返回。关于其他方法都是基于PriorityQueue进行的,这里就不再赘述。接下来通过一个示例来展示一下DelayQueue的使用。
接下来来看一下实体类:
@ToString
public class MovieTicket implements Delayed {
private String name;
private long create;
// 延迟时间是毫秒
private long delay;
private long expire;
public MovieTicket(String name, long delay) {
this.name = name;
this.delay = delay;
this.create = System.currentTimeMillis();
this.expire = create + this.delay;
}
@Override
public long getDelay(TimeUnit unit) {
// 根据当前时间判断存活时间还有多久
return unit.convert((expire - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
// 入队比较不应该比较其delay属性,而应该比较两个对象的存活时间
// 存活时间越短,就越需要靠前提早出队
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
示例代码:
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:MM:SS");
DelayQueue<MovieTicket> queue = new DelayQueue<MovieTicket>();
queue.add(new MovieTicket("钢铁侠", 10000));
queue.add(new MovieTicket("魔戒", 3000));
queue.add(new MovieTicket("白雪公主",5000));
Thread.sleep(6000);
queue.add(new MovieTicket("流浪地球",1000));
System.out.println(format.format(new Date()) + ": 开始取票");
while (true) {
System.out.println(format.format(new Date()) + " : " + queue.take());
if (queue.isEmpty()) {
break;
}
}
}
}
结果:
2020-11-08 11:11:663: 开始取票
2020-11-08 11:11:667 : MovieTicket(name=魔戒, create=1604807914658, delay=3000, expire=1604807917658)
2020-11-08 11:11:668 : MovieTicket(name=白雪公主, create=1604807914659, delay=5000, expire=1604807919659)
2020-11-08 11:11:668 : MovieTicket(name=流浪地球, create=1604807920663, delay=1000, expire=1604807921663)
2020-11-08 11:11:672 : MovieTicket(name=钢铁侠, create=1604807914658, delay=10000, expire=1604807924658)
这里需要思考一下为什么“流浪地球”不是在第一个。
PriorityBlockingQueue和DelayQueue相对来说也没有特别的复杂,数量掌握了PriorityQueue的实现原理后,PriorityBlockingQueue和DelayQueue只是在PriorityQueue上添加了一些特性从而提供了各种功能,所以说啊数据结构还是很重要滴。