JAVA并发:ConcurrentLinkedQueue

  • Post author:
  • Post category:java





概述

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,采用FIFO的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾;当我们获取一个元素时,它会返回队列头部的元素。用CAS实现非阻塞的线程安全队列。

ConcurrentLinkedQueue 的非阻塞算法实现主要可概括为下面几点:

  1. 使用 CAS 原子指令来处理对数据的并发访问,这是非阻塞算法得以实现的基础。
  2. head/tail 并非总是指向队列的头 / 尾节点,也就是说允许队列处于不一致状态。 这个特性把入队 / 出队时,原本需要一起原子化执行的两个步骤分离开来,从而缩小了入队 / 出队时需要原子化更新值的范围到唯一变量。这是非阻塞算法得以实现的关键。
  3. 以批处理方式来更新head/tail,从整体上减少入队 / 出队操作的开销。

在ConcurrentLinkedQueue的源码中,有一段红字规定了的一些基本不变性条件

  1. 在入队时最后一个结点中的next域为null

  2. 队列中的所有未删除结点的item域不能为null且从head都可以在O(N)时间内遍历到

  3. 对于要删除的结点,不是将其引用直接置为空,而是将其的item域先置为null(迭代器在遍历是会跳过item为null的结点)

  4. 允许head和tail滞后更新,也就是上文提到的head/tail并非总是指向队列的头 / 尾节点(这主要是为了减少CAS指令执行的次数,但同时会增加volatile读的次数,但是这种消耗较小)。具体而言就是,当在队列中插入一个元素是,会检测tail和最后一个结点之间的距离是否在两个结点及以上(内部称之为hop);而在出队时,对head的检测就是与队列的第一个结点的距离是否达到两个,有则将head指向第一个结点并将head原来指向的结点的next域指向自己,这样就能断开与队列的联系从而帮助GC

head的不变性和可变性条件


不变性:

  1. 所有未删除节点,都能从head通过调用succ()方法遍历可达。

  2. head不能为null。

  3. head节点的next域不能引用到自身。


可变性:

  1. head节点的item域可能为null,也可能不为null。

  2. 允许tail滞后(lag behind)于head,也就是说:从head开始遍历队列,不一定能到达tail。

tail的不变性和可变性条件


不变性:

  1. 通过tail调用succ()方法,最后节点总是可达的。

  2. tail不能为null。


可变性:

  1. tail节点的item域可能为null,也可能不为 null。
  2. 允许tail滞后于head,也就是说:从head开始遍历队列,不一定能到达tail。
  3. tail节点的next域可以引用到自身。



结构

ConcurrentLinkedQueue是由head节点和tail节点组成,每个节点有节点元素item和指向下一个节点的next的引用组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。

 private transient volatile Node<E> head;
 private transient volatile Node<E> tail;

Node节点的构成如下:

  private static class Node<E> {
        volatile E item;
        volatile Node<E> next;

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }




入队列

这里写图片描述

添加元素1:队列更新head节点的next节点为元素1。因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点

添加元素2:队列首先设置元素1节点的next节点为元素2,然后更新tail节点指向元素2

添加元素3:设置tail节点的next节点为元素3节点

添加元素4:设置元素3的next节点为元素4,然后将tail节点指向元素4节点。

从以上描述可以知道,发现入队过程主要做两件事情:第一是将入队节点设置成当前队列最后一个节点的next节点;第二是更新tail节点,如果tail节点的next节点不为空,则将新添加节点设置成tail节点,如果tail节点的next节点为空,则将新添加节点设置成tail节点的next节点。

在并发情况下,如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发送变化,这是当前线程要暂停入队操作,然后重新获取尾节点。

    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {//1
            Node<E> q = p.next;
            if (q == null) {//2
              if (p.casNext(null, newNode)) {//3

                    if (p != t) //4
                        casTail(t, newNode);  //更新失败了也是没事的,因为表示有其他线程成功更新了tail节点
                    return true;
                }
                //其他线程抢先完成入队,需要重新尝试
            }
            else if (p == q)//5

                p = (t != (t = tail)) ? t : head;
            else
                // 在两跳之后检查尾部更新.
                p = (p != t && t != (t = tail)) ? t : q; //6
        }
    }

1:对于入队操作,采用失败即重试的方式,直到入队成功

2:表明p是最后一个结点

3:采用CAS指令修改队列的最后一个结点的next域,从而保证最后一个结点是新插入的结点,同时将p指向这个新结点

4:如果插入结点后tail和p距离达到两个结点,则修改tail的指向(失败也没关系),这里在判断tail为最后一个结点后仍然要判断hop是否达到2主要是为了预防在并发修改下,多个线程同时修改的问题

5:根据tail的可变性条件和滞后更新策略,我们知道tail的next域可以引用到自身,在ConcurrentLinkedQueue规定如果tail的next如果指向自己的话,则表明tail现在所在指向的结点已被删除(从head遍历无法到达tail),那么就要从head开始遍历到所有的未删除结点(这也是上文head的不变性条件保证的)具体看下图:

这里写图片描述

当然,我们还是要判断其他线程是否已经提前修改tail的指向,修改的话就表明tail结点已经更新完毕,没有引用到自身了,就可以直接重新尝试插入了。其实从这我们大致可以揣摩出作者的设计的巧妙部分:即虽然tail有滞后更新策略从而导致无法一次就将结点插入,但结点要想插入的话还是必须要当tail为最后一个结点才行

6:tail未指向尾结点,同时也没有滞后head,就像下图这样:

插入前:

这里写图片描述

这时候表明tail结点还未更新,但需要事先判断其他线程是否可能抢先插入了一个结点,如下图:

其它线程抢先插入后:

这里写图片描述

在这种情况下如果插入元素的话导致tail和最后一个结点的距离达到两个,就要更新tail的指向(不得不承认这句代码的简洁性,但还是要吐槽一下,从可读性的角度和JDK6.0的版本比起来实在是难以理解),并且tail已经指向尾结点,说明下一个结点可以直接将tail赋给p以便重新尝试插入。

其实仔细分析的话就可以明白多个if判断表明tail的三种可能状态:

  1. tail滞后于 head。
  2. tail指向尾结点。
  3. tail指向非尾结点。



*出队列*

   public E poll()
{
      restartFromHead:
      for (;;) 
      {
          for (Node<E> h = head, p = h, q;;) 
          {
              E item = p.item;

              if (item != null && p.casItem(item, null))    //1
              {
                  if (p != h)       //2
                      updateHead(h, ((q = p.next) != null) ? q : p);
                  return item;
              }
              else if ((q = p.next) == null) //3
              {
                  updateHead(h, p);
                  return null;
              }
              else if (p == q)  //4
                  continue restartFromHead;
              else  //5
                  p = q;
          }
      }
  }

1:在获取head结点后,如果item不为null的话将其设为null实现删除头结点(这是一个特殊的删除策略,即item为null的结点就是已经删除的结点,即使它还在队列中)

这里写图片描述

2:删除该结点后检查head是否与头结点相差两个结点,有则向后推进一个item非null结点来更新head

这里写图片描述

final void updateHead(Node<E> h, Node<E> p) 
{
     // 如果两个结点不相同,尝试用CAS指令原子更新head指向新头节点
     if (h != p && casHead(h, p))
         //将旧的头结点指向自身以实现删除
     h.lazySetNext(h);
}

3:head的item为null则向后选取一个结点,如果item为null的结点,设置head指向p节点(此时队列没有元素,只有一个伪结点p)

4:结点出队失败,重新进行出队(关于p == q的判断条件我是在有点难以理解,在此只能作一个不负责任的猜测:就是上一次判断先执行了步骤5,使得p和q指向同一个item不为null的结点,在下一次循环开始前其它线程线程先删除了该结点导致步骤4的发生,这样的话就要重新获取head进行删除)

A线程执行步骤5后(为了方便没有画出tail,再次声明,只是个人观点):

这里写图片描述

B线程抢先删除结点后A线程执行步骤4:

这里写图片描述

5:在结点出队失败后可以保证下次尝试出队时p不为空(之前q = p.next != null才有可能跳到这一步)

根据head的不变性和可变性条件,在执行出队操作前,head在队列中的位置共有两种可能:

  1. head指向有效结点(从head向后遍历可达的结点当中,item域不为null的结点)

  2. head指向无效结点(从head向后遍历可达的结点当中,item域为null的结点)




队列判空

有些人在判断队列是否为空时喜欢用

queue.size()==01
public int size() 
{
     int count = 0;
     for (Node<E> p = first(); p != null; p = succ(p))
         if (p.item != null)
             // Collection.size() spec says to max out
             if (++count == Integer.MAX_VALUE)
                 break;
     return count;
 }

在计算队列的长度是并没有向我们往常一样直接通过一个变量来存储,这样主要是要尽可能保证队列在并发访问下的数据的正确性,但由于遍历时还是会有其它线程对队列的状态进行修改,因而数据仍有可能错误(removeAll,retainAll,containsAll,equals,toArray也有一样的问题)

可以看到这样在队列在结点较多时会依次遍历所有结点,这样的性能会有较大影响,因而可以考虑empty函数,它只要判断第一个结点(注意不一定是head指向的结点)

public boolean isEmpty()
{
    return first() == null;
}




最后

ConcurrentLinkedQueue的迭代器是弱一致性的,这在并发容器中是比较普遍的现象,主要是指在一个线程在遍历队列结点而另一个线程尝试对某个队列结点进行修改的话不会抛出ConcurrentModificationException,这也就造成在遍历某个尚未被修改的结点时,在next方法返回时可以看到该结点的修改,但在遍历后再对该结点修改时就看不到这种变化。特别注意的是ConcurrentLinkedQueue提供的线程安全操作只是相对安全的,即只对单个函数调用所涉及的操作提供安全性