jdk优先级队列、延迟队列原理

  • Post author:
  • Post category:其他




优先级队列



PriorityQueue

这个类,是优先级队列的核心,包括我们下面要说的延迟队列,在jdk6之前,也是以这个为基础的

在该类中,维护了几个核心的属性

transient Object[] queue; // non-private to simplify nested class access

private int size = 0;

private final Comparator<? super E> comparator;

这里的queue,就是实际存放元素的数组结构 – 数组

comparator,这个是比较器,也就是说,既然是优先级队列,总要有一个优先级比较规则,如果我们自己指定了比较器,在入队和出队的时候,会根据比较器去比较优先级,如果没有指定,就使用默认的,默认的就要看当前key是哪种类型的,如果是Integer类型的,那就使用Integer的compareTo()方法去比较

我们来看下入队的操作

public boolean offer(E e) {
    // 1.如果入队的元素为null,抛出异常
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    // 2.如果当前元素个数大于等于数组长度,就扩容
    if (i >= queue.length)
        grow(i + 1);
    size = i + 1;
    // 3.如果数组为空,是第一个入队的元素,就直接放到第一个位置即可,无需比较
    if (i == 0)
        queue[0] = e;
    else
        // 4.否则,就需要进行优先级的比较
        siftUp(i, e);
    return true;
}

/**
* 在进行优先级比较的时候,如果程序员指定了比较的规则,那就使用程序员指定的比较规则进行优先级的比较
* 如果没有指定,那就使用默认的
* @param k the position to fill
* @param x the item to insert
*/
private void siftUp(int k, E x) {
  if (comparator != null)
      siftUpUsingComparator(k, x);
  else
      siftUpComparable(k, x);
}



/**
* 这里是对要插入的元素进行判断
* @param k:元素X所占用的位置
* @param x:要插入的元素
*/
@SuppressWarnings("unchecked")
private void siftUpComparable(int k, E x) {
  Comparable<? super E> key = (Comparable<? super E>) x;
  while (k > 0) {
      /**
       * 获取到k位置的父元素,用要插入的元素和其父元素相比
       * 返回值为-1,表示要插入的元素早于parent执行,就需要继续向前遍历
       * 返回值为1,表示要插入的元素晚于parent执行,就不遍历了,直接放在该位置即可
       *
       * 右移一位表示原值的0.5
       * 如果k-1是10,那么parent就是5
       */
      int parent = (k - 1) >>> 1;
      Object e = queue[parent];
      if (key.compareTo((E) e) >= 0)
          break;
      /**
       * 代码执行到这里,表示要插入的元素x比其父元素 e先执行,那怎么处理呢?将e放到x原来要存放的位置,可以理解为父子元素互换位置
       * 互换位置之后,继续向上查找父元素,去比较
       */
      queue[k] = e;
      k = parent;
  }
  queue[k] = key;
}

可以看到,在offer()方法中,有一个siftUp()方法,这个方法是优先级的关键核心点,也就是说,在入队的时候,会根据插入的元素,进行优先级比较

如果我们自己指定了比较器,你就会使用我们指定的比较器去比较优先级,但是假如没有指定,就要使用默认的

对于优先级队列,内部是一个二叉堆结构,所以,在插入一个元素的时候,会拿着当前要插入的节点和父节点进行优先级比较,如果当前插入的元素,优先级高于父节点,就会交换位置,然后继续和父节点进行优先级比较



延迟队列

DelayQueue和ScheduledThreadPoolExecutor中的DelayedWorkQueue是类似的

在jdk6以及之前的版本中,ScheduledThreadPoolExecutor中使用的是DelayQueue,作为延迟队列,按照到期时间存储元素,但是在jdk6之后,ScheduledThreadPoolExecutor自己声明了一个内部类DelayedWorkQueue来作为延迟队列,其实实现的原理是一样的

所以,我们以DelayedWorkQueue作为学习的例子

/**
* 入队方法,将待执行的任务插入到队列中
* 在入队的时候,会进行优先级的判断
* @param x
* @return
*/
public boolean offer(Runnable x) {
  if (x == null)
      throw new NullPointerException();
  RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
  final ReentrantLock lock = this.lock;
  /**
   * 1.加锁
   */
  lock.lock();
  try {
      int i = size;
      /**
       * 2.判断是否需要进行扩容
       * 这里的扩容,和ArrayList扩容的方法类型:
       * 先扩容50%,然后通过Arrays.copy将数组扩容之后的数据,再复制到queue中
       */
      if (i >= queue.length)
          grow();
      size = i + 1;
      /**
       * 3.如果当前插入的是第一个任务
       * 就将e设置为头结点
       *
       * 否则的话,就进行优先级的处理
       */
      if (i == 0) {
          queue[0] = e;
          setIndex(e, 0);
      } else {
          siftUp(i, e);
      }
      /**
       * 4.这里是如果插入了第一个元素,去通知
       * take方法,这里的available是一个condition对象
       */
      if (queue[0] == e) {
          leader = null;
          available.signal();
      }
  } finally {
      lock.unlock();
  }
  return true;
}

可以看到,在入队的时候,如果当前元素不是第一个元素,就会调用siftUp()方法,进行优先级的排序

/**
* Sifts element added at bottom up to its heap-ordered spot.
* Call only when holding lock.
* 这是DelayedWorkQueue自己实现的,在入队时,进行优先级判断的逻辑
* k:当前待插入元素要入队的位置
* key:就是要入队的任务
*/
private void siftUp(int k, RunnableScheduledFuture<?> key) {
  while (k > 0) {
      /**
       * 1.获取到k对应的父节点元素
       */
      int parent = (k - 1) >>> 1;
      RunnableScheduledFuture<?> e = queue[parent];
      /**
       * 2.如果任务k执行的时间晚于e父节点的,就无需再遍历处理
       * 如果k的执行时间早于e,那就需要交换位置,然后再次遍历判断父节点和交换之后的优先级
       */
      if (key.compareTo(e) >= 0)
          break;
      queue[k] = e;
      setIndex(e, k);
      k = parent;
  }
  /**
   * 设置待插入元素的实际位置
   */
  queue[k] = key;
  setIndex(key, k);
}

这里的逻辑,和优先级队列PriorityQueue的操作基本上一样的,但是,这个方法的入参,需要注意一下,是RunnableScheduledFuture这个类,也就是说,在调用compareTo的时候,会调用RunnableScheduledFuture的compareTo()方法,接着来看下compareTo方法的逻辑


/**
* 用来比较优先级,这里的other是插入元素要对比的元素
* @param other
* @return
*/
public int compareTo(Delayed other) {
  if (other == this) // compare zero if same object
      return 0;
  if (other instanceof ScheduledFutureTask) {
      ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
      /**
       * 如果当前要插入的元素对应的时间 早于X节点执行,那就返回-1
       * 如果要插入的元素对应的执行时间 晚于X节点执行,那就返回1
       * 举例:X要在5S之后执行,但是当前插入的元素在2S之后执行,那这里的diff就小于0,返回-1
       * 如果X要5S之后执行,但是待插入元素是10S之后执行,那这里的diff就大于0,返回1
       * 至于下面的sequenceNumber应该是在任务是同时执行的情况下,再进行的优先级判断吧
       * sequenceNumber是根据AtomicLong生成的,所以理论上不会重复,即使并发请求
       */
      long diff = time - x.time;
      if (diff < 0)
          return -1;
      else if (diff > 0)
          return 1;
      else if (sequenceNumber < x.sequenceNumber)
          return -1;
      else
          return 1;
  }
  /**
   * 如果要比较的任务不是ScheduledFutureTask,那就直接获取到每个任务还有多少毫秒要执行,进行优先级判断
   */
  long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
  return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

这里可以看到,代码是比较清晰明了的,直接根据当前元素的到期时间进行比较,这里的time,就不细说了,在前面介绍scheduleThreadPollExecutor的笔记中,有介绍过

所以,我们可以简单的认为,ScheduledThreadPoolExecutor中,周期性任务,是不断的将要执行的任务,插入到队列中,在插入到队列中的时候,会根据任务的time 到期时间进行优先级的比较,也就是说,如果A任务还有5S到期,此时插入了一个任务,2S之后执行,那就会将新插入的任务放到任务A的前面

最后,再来看下出队的源码

/**
* 这里是出队的方法,是在什么时候调用的?在线程池中调用的,前面的线程池源码中有说过,在worker在启动之后,会从任务队列中会去任务
* 会调用queue的take方法,核心线程会阻塞到这里,直到返回任务
* @return
* @throws InterruptedException
*/
public RunnableScheduledFuture<?> take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  /**
   * 1.加锁
   */
  lock.lockInterruptibly();
  try {
      /**
       * 2.这里是一个死循环,直到return
       * 会先判断队列中的第一个元素是否为null,为null,就调用condition的await
       * 方法即可,等插入第一个元素的时候,会调用condition的signal方法
       */
      for (;;) {
          RunnableScheduledFuture<?> first = queue[0];
          if (first == null)
              available.await();
          else {
              /**
               * 3.如果头结点不为null,就获取到元素还有多少秒会被执行
               * 如果小于等于0,表示可以执行了,就return
               */
              long delay = first.getDelay(NANOSECONDS);
              if (delay <= 0)
                  return finishPoll(first);
              /**
               * 4.如果任务还没有到执行的时间,就将临时变量设置为null,应该是为了内存回收
               * 可以看下上面的代码,first是直接queue[0]获取到的
               */
              first = null; // don't retain ref while waiting
              /**
               * 5.判断leader信息,这个leader的判断,应该是为了防止多线程来操作的
               * 如果leader不为null,就阻塞
               * 第一次进入到这里的时候,应该是null,同一个线程第二次进来的时候,也是null
               */
              if (leader != null)
                  available.await();
              else {
                  /**
                   * 6.将当前线程赋值到leader中,然后阻塞指定的时间 delay
                   */
                  Thread thisThread = Thread.currentThread();
                  leader = thisThread;
                  try {
                      available.awaitNanos(delay);
                  } finally {
                      /**
                       * 7.阻塞了指定的时间之后,将leader设置为null
                       */
                      if (leader == thisThread)
                          leader = null;
                  }
              }
          }
      }
  } finally {
      if (leader == null && queue[0] != null)
          available.signal();
      lock.unlock();
  }
}

这里的注释,也比较清晰明了了

我自己认为,延迟队列,也是优先级队列的一部分,因为延迟队列就是根据到期时间作为优先级比较规则,先到期的,优先级就高,放在队列头部



版权声明:本文为CPLASF_原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。