优先级队列
PriorityQueue
这个类,是优先级队列的核心,包括我们下面要说的延迟队列,在jdk6之前,也是以这个为基础的
在该类中,维护了几个核心的属性
transient Object[] queue; // non-private to simplify nested class access
private int size = 0;
private final Comparator<? super E> comparator;
这里的queue,就是实际存放元素的数组结构 – 数组
comparator,这个是比较器,也就是说,既然是优先级队列,总要有一个优先级比较规则,如果我们自己指定了比较器,在入队和出队的时候,会根据比较器去比较优先级,如果没有指定,就使用默认的,默认的就要看当前key是哪种类型的,如果是Integer类型的,那就使用Integer的compareTo()方法去比较
我们来看下入队的操作
public boolean offer(E e) {
// 1.如果入队的元素为null,抛出异常
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
// 2.如果当前元素个数大于等于数组长度,就扩容
if (i >= queue.length)
grow(i + 1);
size = i + 1;
// 3.如果数组为空,是第一个入队的元素,就直接放到第一个位置即可,无需比较
if (i == 0)
queue[0] = e;
else
// 4.否则,就需要进行优先级的比较
siftUp(i, e);
return true;
}
/**
* 在进行优先级比较的时候,如果程序员指定了比较的规则,那就使用程序员指定的比较规则进行优先级的比较
* 如果没有指定,那就使用默认的
* @param k the position to fill
* @param x the item to insert
*/
private void siftUp(int k, E x) {
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}
/**
* 这里是对要插入的元素进行判断
* @param k:元素X所占用的位置
* @param x:要插入的元素
*/
@SuppressWarnings("unchecked")
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
/**
* 获取到k位置的父元素,用要插入的元素和其父元素相比
* 返回值为-1,表示要插入的元素早于parent执行,就需要继续向前遍历
* 返回值为1,表示要插入的元素晚于parent执行,就不遍历了,直接放在该位置即可
*
* 右移一位表示原值的0.5
* 如果k-1是10,那么parent就是5
*/
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)
break;
/**
* 代码执行到这里,表示要插入的元素x比其父元素 e先执行,那怎么处理呢?将e放到x原来要存放的位置,可以理解为父子元素互换位置
* 互换位置之后,继续向上查找父元素,去比较
*/
queue[k] = e;
k = parent;
}
queue[k] = key;
}
可以看到,在offer()方法中,有一个siftUp()方法,这个方法是优先级的关键核心点,也就是说,在入队的时候,会根据插入的元素,进行优先级比较
如果我们自己指定了比较器,你就会使用我们指定的比较器去比较优先级,但是假如没有指定,就要使用默认的
对于优先级队列,内部是一个二叉堆结构,所以,在插入一个元素的时候,会拿着当前要插入的节点和父节点进行优先级比较,如果当前插入的元素,优先级高于父节点,就会交换位置,然后继续和父节点进行优先级比较
延迟队列
DelayQueue和ScheduledThreadPoolExecutor中的DelayedWorkQueue是类似的
在jdk6以及之前的版本中,ScheduledThreadPoolExecutor中使用的是DelayQueue,作为延迟队列,按照到期时间存储元素,但是在jdk6之后,ScheduledThreadPoolExecutor自己声明了一个内部类DelayedWorkQueue来作为延迟队列,其实实现的原理是一样的
所以,我们以DelayedWorkQueue作为学习的例子
/**
* 入队方法,将待执行的任务插入到队列中
* 在入队的时候,会进行优先级的判断
* @param x
* @return
*/
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
/**
* 1.加锁
*/
lock.lock();
try {
int i = size;
/**
* 2.判断是否需要进行扩容
* 这里的扩容,和ArrayList扩容的方法类型:
* 先扩容50%,然后通过Arrays.copy将数组扩容之后的数据,再复制到queue中
*/
if (i >= queue.length)
grow();
size = i + 1;
/**
* 3.如果当前插入的是第一个任务
* 就将e设置为头结点
*
* 否则的话,就进行优先级的处理
*/
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
/**
* 4.这里是如果插入了第一个元素,去通知
* take方法,这里的available是一个condition对象
*/
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
可以看到,在入队的时候,如果当前元素不是第一个元素,就会调用siftUp()方法,进行优先级的排序
/**
* Sifts element added at bottom up to its heap-ordered spot.
* Call only when holding lock.
* 这是DelayedWorkQueue自己实现的,在入队时,进行优先级判断的逻辑
* k:当前待插入元素要入队的位置
* key:就是要入队的任务
*/
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
/**
* 1.获取到k对应的父节点元素
*/
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
/**
* 2.如果任务k执行的时间晚于e父节点的,就无需再遍历处理
* 如果k的执行时间早于e,那就需要交换位置,然后再次遍历判断父节点和交换之后的优先级
*/
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
/**
* 设置待插入元素的实际位置
*/
queue[k] = key;
setIndex(key, k);
}
这里的逻辑,和优先级队列PriorityQueue的操作基本上一样的,但是,这个方法的入参,需要注意一下,是RunnableScheduledFuture这个类,也就是说,在调用compareTo的时候,会调用RunnableScheduledFuture的compareTo()方法,接着来看下compareTo方法的逻辑
/**
* 用来比较优先级,这里的other是插入元素要对比的元素
* @param other
* @return
*/
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
/**
* 如果当前要插入的元素对应的时间 早于X节点执行,那就返回-1
* 如果要插入的元素对应的执行时间 晚于X节点执行,那就返回1
* 举例:X要在5S之后执行,但是当前插入的元素在2S之后执行,那这里的diff就小于0,返回-1
* 如果X要5S之后执行,但是待插入元素是10S之后执行,那这里的diff就大于0,返回1
* 至于下面的sequenceNumber应该是在任务是同时执行的情况下,再进行的优先级判断吧
* sequenceNumber是根据AtomicLong生成的,所以理论上不会重复,即使并发请求
*/
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
/**
* 如果要比较的任务不是ScheduledFutureTask,那就直接获取到每个任务还有多少毫秒要执行,进行优先级判断
*/
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
这里可以看到,代码是比较清晰明了的,直接根据当前元素的到期时间进行比较,这里的time,就不细说了,在前面介绍scheduleThreadPollExecutor的笔记中,有介绍过
所以,我们可以简单的认为,ScheduledThreadPoolExecutor中,周期性任务,是不断的将要执行的任务,插入到队列中,在插入到队列中的时候,会根据任务的time 到期时间进行优先级的比较,也就是说,如果A任务还有5S到期,此时插入了一个任务,2S之后执行,那就会将新插入的任务放到任务A的前面
最后,再来看下出队的源码
/**
* 这里是出队的方法,是在什么时候调用的?在线程池中调用的,前面的线程池源码中有说过,在worker在启动之后,会从任务队列中会去任务
* 会调用queue的take方法,核心线程会阻塞到这里,直到返回任务
* @return
* @throws InterruptedException
*/
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
/**
* 1.加锁
*/
lock.lockInterruptibly();
try {
/**
* 2.这里是一个死循环,直到return
* 会先判断队列中的第一个元素是否为null,为null,就调用condition的await
* 方法即可,等插入第一个元素的时候,会调用condition的signal方法
*/
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
/**
* 3.如果头结点不为null,就获取到元素还有多少秒会被执行
* 如果小于等于0,表示可以执行了,就return
*/
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
/**
* 4.如果任务还没有到执行的时间,就将临时变量设置为null,应该是为了内存回收
* 可以看下上面的代码,first是直接queue[0]获取到的
*/
first = null; // don't retain ref while waiting
/**
* 5.判断leader信息,这个leader的判断,应该是为了防止多线程来操作的
* 如果leader不为null,就阻塞
* 第一次进入到这里的时候,应该是null,同一个线程第二次进来的时候,也是null
*/
if (leader != null)
available.await();
else {
/**
* 6.将当前线程赋值到leader中,然后阻塞指定的时间 delay
*/
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
/**
* 7.阻塞了指定的时间之后,将leader设置为null
*/
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
这里的注释,也比较清晰明了了
我自己认为,延迟队列,也是优先级队列的一部分,因为延迟队列就是根据到期时间作为优先级比较规则,先到期的,优先级就高,放在队列头部