Java基础03–非阻塞队列

  • Post author:
  • Post category:java


我们知道,队列加锁会影响到性能,但是加锁的方式可以实现队列有界。不加锁的方式实现的队列,由于无法保证队列的长度在确定的范围内,所以队列是无界的。

ConcurrentLinkedQueue就是一个典型的非阻塞、无边界的线程安全队列,基于链接节点,采用CAS算法实现。

CoucurrentLinkedQueue规定了如下几个不变形:

1. 在入队的最后一个元素的next为null;

2. 队列中所有未删除的节点的item都不能为null且都能从head节点遍历到;

3. 对于要删除的节点,不能直接将其设置为null,而是先将其item域设置为null,迭代器会跳过item为null的节点;

4. 允许head和tail更新滞后,就是说head、tail不总是指向第一个元素和最后一个元素;比如当前一个线程取到tail节点后,在操作前,另一个线程把tail更新了,此时第一个线程的tail就不是指向最后一个元素,也就是说tail更新滞后了。

head的不可变和可变性:

*不可变

1. 所有未删除的节点都可以通过head节点遍历到;

2. head不能为null;

3. head节点的next不能指向自身;

*可变性

1. head的item可能为null,也可能不为null;

2. 允许tail滞后head;也就是调用succ()方法,从head不可达tail;

tail的不可变和可变性:

*不可变

1. tail不能为null

*可变性

1. tail的item可能为null,也可能不为null;

2. tail节点的next域可以指向自身;

3. 允许tail滞后head,也就是调用succ()方法,从head不可达tail;

有点乱,下面我们通过源码分析来理解这些特征。

一、ConcurrentLinkedQueue源码解析

ConcurrentLinkedQueue的结构由head节点和tail节点组成,每个节点由节点元素item和指向下一个节点的next引用组成,而节点与节点之间的关系就是通过该next关联起来的,从而形成一张链表的队列。节点Node为ConcurrentLinkedQueue的内部类,结构如下:

private static class Node<E> {
    // 节点元素域
    volatile E item;
    volatile Node<E> next;

    // 初始化,获取item 和 next的偏移量,为后期的CAS做准备
    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics

    private static final sun.misc.Unsafe UNSAFE;
    // 偏移量
    private static final long itemOffset;
    // 下一个元素的偏移量
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

二、入列

入列,在单线程中,是一个非常简单的过程:tail节点的next指向新节点,然后更新tail为新节点即可。但是多线程呢?如果一个线程正在将一个元素插入到队列中,那么它必须先获取尾节点,然后设置尾节点的下一个节点为当前节点;但是如果刚好有另一个节点已经完成了插入,那么尾节点是不是发生了变换?这种情况应该怎么处理?

下面,我们看一下offer(E e)的源码

offer(E e):将指定元素插入到队列尾部

public boolean offer(E e) {
    // 检查节点是否为null
    checkNotNull(e);
    // 创建新节点
    final Node<E> newNode = new Node<E>(e);

    // 死循环,直到成功为止
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        // q == null 表示p已经是最后一个节点,尝试加入到队列尾
        // 如果插入失败,说明其他线程已经修改了p的指向
        if (q == null) {                                        // --- 1
            // p is last node
            // casNext:t节点的next指向当前节点
            // CASTail:设置tail 尾节点
            if (p.casNext(null, newNode)) {                     // --- 2
                // node加入节点后,会导致tail距离最后一个节点相差大于一个,需要更新tail
                if (p != t)                                     // --- 3                                        
                    casTail(t, newNode);                        // --- 4
                return true;
            }
        }
        // p == q 等于自身
        else if (p == q)                                        // --- 5
            // p == q 代表着该节点已经被删除
            // 多线程的原因,offer()的时候也会poll,如果offer()的时候刚好该节点已经poll()
            // 那么在poll()方法中的updateHead()方法会将head指向当前的q,而把p.next指向自己,即:p.next == p
            // 这样就会导致tail节点之后head(tail位于head的前面),则需要重新设置p
            p = (t != (t = tail)) ? t : head;                   // --- 6
        // tail并没有指向尾节点
        else
            // tail已经不是最后一个节点,将p指向最后一个节点
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

光看源码还是有点儿迷糊,插入节点数据分析一次,就会清晰很多。

初始化

ConcurrentLinkedQueue初始化时head、tail存储的元素都为null,且head等于tail:

添加元素A

按照程序分析:第一次插入元素A,head = tail = dummyNode,所以 q = p.next = null,直接走步骤2:p.casNext(null, newNode),由于 p == t 成立,所以不会执行步骤3和4:casTail(t, newNode),直接return true。插入A节点后如下:

添加元素B

q = p.next = A, p = tail = dummyNode,所以直接跳到步骤7:p = (p != t && t != (t = tail)) ? t : q; 此时 p = q,然后进入第二次循环 q = p.next = null,步骤1:q == null 成立,执行步骤2,将该节点插入,此时 p = q,t = tail,所以步骤3: p != t 成立,执行步骤4:casTail(t, newNode),然后return true。此时,结构如下:

添加元素C

此时 t = tail, p = t, q = p.next = null, 和插入元素A无异,如下:

这里,整个offer()过程已经分析完成,可能 p == q 有点儿难理解,p 不是等于 q.next么,怎么会有 p == q 呢?这个疑问放在出列 poll() 中分析。

三、出列

ConcurrentLinkedQueue提供了poll()方法进行出列操作。入列主要设计到tail,出列则设计到head。先看源码:

public E poll() {
    // 如果出现p被删除的情况,需要从head重新开始
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            // 节点 item
            E item = p.item;

            // item 不为null,则将item 设置为 null
            if (item != null && p.casItem(item, null)) {              // --- 1
                // p != head 则更新head
                if (p != h)                                           // --- 2
                    // p.next != null, 则将head更新为p.next,否则更新为p
                    updateHead(h, ((q = p.next) != null) ? q : p);    // --- 3
                return item;
            }
            // p.next == null 队列为空
            else if ((q = p.next) == null) {                          // --- 4
                updateHead(h, p);
                return null;
            }
            // 当一个线程在poll的时候,另一个线程已经把当前的p从队列中删除--将 p.next = p,p 已经被移除不能继续,需要重新开始
            else if (p == q)                                          // --- 5
                continue restartFromHead;
            else
                p = q;                                                // --- 6
        }
    }
}

相对于offer()方法,这个方法会简单一点,里面有一个很重要的方法:updateHead(),该方法用于CAS更新head节点,如下:

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

我们先将上面offer()的链表poll()掉,添加A、B、C节点结构如下:

poll A

head = dummy,p = head,item = p.next = null,步骤1不成立,步骤4:(q = p.next) == null 不成立,p.next = A,跳到步骤6,下一个循环。此时p = A,所以步骤1 item != null,进行p.casItem(item, null)成功,此时 p == A != h,所以执行步骤3:updateHead(h, ((q = p.next) != null) ? q : p),q = p.next = B != null,则将head CAS更新成B,如下:

poll B

head = B,p = head = B,item = p.item = B,步骤1成立,步骤2:p != h 不成立,直接return,如下:

poll C

head = dummy,p = head = dumy,item = p.item = null,步骤1不成立,跳到步骤4:(q = p.next) == null,不成立,然后跳到步骤6,此时 p = q = C,item = C(item),步骤1成立,所以将C(item)设置为null,步骤2:p != h 成立,执行步骤3:updateHead(h, ((q = p.next) != null) ? q : p),如下:

看到这里差不多可以一目了然,在这里我们再来分析offer()的步骤5:

else if (p == q){
    p = (t != (t = tail)) ? t : head;
}

ConcurrentLinkedQueue中规定,p == q表明该节点已经被删除,也就是说tail滞后于head,head无法通过succ()方法遍历到tail,怎么做?(t != (t = tail)) ? t : head; 这段代码主要是来判断tail节点是否已经发生了改变,如果发生了改变,则说明tail已经重新定位了,只需要重新找到tail即可,否则就只能指向head了。

接着上面的步骤,我们再插入一个元素D。更新完之后,则 p = head, q = p.next = null, 执行步骤1:q = null且 p != t,所以执行步骤4,如下:

再插入元素E,q = p.next = null,p == t,所以 插入E后如下:

到这里ConcurrentLinkedQueue的入列、出列分析完毕。ConcurrentLinkedQueue利用CAS来完成数据操作,同时允许队列的不一致性,这种弱一致性确实很强大。

该文绝大部分来自参考链接,有部分地方与作者理解有出入,结合自己的理解,记录如上文档。

参考:


https://blog.csdn.net/liuxiao723846/article/details/88382520



版权声明:本文为Johnson8702原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。