特殊的阻塞队列 – java.util.concurrent.SynchronousQueue 分析

  • Post author:
  • Post category:java


描述


SynchrounousQueue

是一个比较特殊的无界阻塞队列并支持非公平和公平模式,严格意义上来说不算一个队列,因为它不像其他阻塞队列一样能有容量,它仅有一个指向栈顶的地址,栈中的节点由线程自己保存。任意的线程都会等待直到获得数据(消费)或者交付完成(生产)才会返回。


SynchronousQueue

和普通的阻塞队列的差异类似于下图所示(非公平模式):

阻塞队列通常是存储生产者的生产结果然后消费者去消费,阻塞队列就类似于一个中转站。


SynchronousQueue

则存储生产结果,只告诉消费者生产者的位置,然后让其自己去与之交流(反过来一样),就没有中转的一个过程而是直接交付的。


SynchronousQueue

将数据交付的任务交给生产者或消费者自行处理,实现的非常看不懂。

那么既然是

Queue

,就可以通过

offer



take

方法来了解

offer:

public boolean offer(E e) {  
    if (e == null) throw new NullPointerException();  
    return transferer.transfer(e, true, 0) != null;  
}

take:

public E take() throws InterruptedException {  
    E e = transferer.transfer(null, false, 0);  
    if (e != null)  
        return e;  
    Thread.interrupted();  
    throw new InterruptedException();  
}


offer



take

中都调用了

transferer.transfer(...)



transferer

是一个接口

SynchrounousQueue

有两个实现类:

  • TransferQueue: 用于公平交付
  • TransferStack:用于不公平交付

    这两个的作用可以通过

    SynchrounousQueue

    的构造方法得知:
public SynchronousQueue(boolean fair) {  
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();  
}

如果启用公平交付则创建

TransferQueue

否则使用

TransferStack

首先先分析

TransferStack.transfer

非公平的TransferStack

通过类名得知,是使用堆栈实现的,是一个LIFO的序列。

每个请求的线程都会被包装成一个

SNode

,具有以下属性:

class SNode {
	volatile SNode next;        // 链接的下一个SNode
	volatile SNode match;       // 与该线程匹配的另外一个线程SNode节点
	volatile Thread waiter;     // 当前请求的线程
	Object item;                 
	int mode; //该节点的类型(模式)
}

很显然,是一个链表结构,使用一个

mode

来标识该节点的类型,具有以下值:

// 代表一个消费者
int REQUEST    = 0;
// 代表一个生产者
int DATA       = 1;
// 代表已经和另外一个节点匹配
int FULFILLING = 2;

通过源码注释得知,整个

TransferStack.transfer

可以分为以下几步:

  1. 如果当前的栈是空的,或者栈顶与请求的节点模式相同,那么就将该节点作为栈顶并等待下一个与之相匹配的请求节点,最后返回匹配节点的数据(take或offer)或者null(被取消/中断)
  2. 如果栈不为空,请求节点与栈顶节点相匹配(一个是REQUEST一个是DATA)那么当前节点模式变为FULFILLING,然后将其压入栈中和互补的节点进行匹配,完成交付后同时弹出栈并返回交易的数据,如果匹配失败则与其他节点解除关系等带回收。
  3. 如果栈顶已经存在一个FULFILLING的节点,说明正在交付,那么就帮助这个栈顶节点快速完成交易。

    下面用图来描述先生产后消费的例子

    当栈为空将其封装为

    SNode

    节点后入栈,自旋等待其他节点与自己匹配

    这是

    TransferStack.transfer

    的第一个部分,用来处理栈为空或者是多个生产者/消费者的情况,使得都自旋等待匹配。

if (h == null || h.mode == mode) {  // empty or same-mode
	// 生产者/消费者不愿意等待则直接返回
	if (timed && nanos <= 0L) {     // can't wait  
		if (h != null && h.isCancelled())  
			casHead(h, h.next);     // pop cancelled node  
		else  
			return null;
	  // 创建一个节点并将该节点作为栈顶
	} else if (casHead(h, s = snode(s, e, h, mode))) {
		// 自旋等待下一个与之匹配的节点  
		SNode m = awaitFulfill(s, timed, nanos);
		// 如果该节点已经取消等待
		if (m == s) {
			//清理该节点               
			clean(s);  
			return null;  
		}
		// 如果有节点与自己匹配那么就返回交换的元素
		if ((h = head) != null && h.next == s)
			casHead(h, s.next);     // help s's fulfiller  
		// 如果当前模式为DATA就表示是 消费者等待生产者生产
		// 如果当前模式为REQUEST就表示是 生产者等待消费者消费
		return (E) ((mode == REQUEST) ? m.item : s.item);  
	}  
}

栈不为空,消费者压入栈顶,消费者与生产者进行匹配,消费者改变头节点(也就是本身)的状态为

FuLFILLING

<img src=”



版权声明:本文为CR2567001684原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。