描述
SynchrounousQueue
是一个比较特殊的无界阻塞队列并支持非公平和公平模式,严格意义上来说不算一个队列,因为它不像其他阻塞队列一样能有容量,它仅有一个指向栈顶的地址,栈中的节点由线程自己保存。任意的线程都会等待直到获得数据(消费)或者交付完成(生产)才会返回。
SynchronousQueue
和普通的阻塞队列的差异类似于下图所示(非公平模式):
阻塞队列通常是存储生产者的生产结果然后消费者去消费,阻塞队列就类似于一个中转站。
SynchronousQueue
则存储生产结果,只告诉消费者生产者的位置,然后让其自己去与之交流(反过来一样),就没有中转的一个过程而是直接交付的。
SynchronousQueue
将数据交付的任务交给生产者或消费者自行处理,实现的非常看不懂。
那么既然是
Queue
,就可以通过
offer
和
take
方法来了解
offer:
public boolean offer(E e) { if (e == null) throw new NullPointerException(); return transferer.transfer(e, true, 0) != null; }
take:
public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
offer
和
take
中都调用了
transferer.transfer(...)
transferer
是一个接口
SynchrounousQueue
有两个实现类:
- TransferQueue: 用于公平交付
-
TransferStack:用于不公平交付
这两个的作用可以通过
SynchrounousQueue
的构造方法得知:
public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
如果启用公平交付则创建
TransferQueue
否则使用
TransferStack
首先先分析
TransferStack.transfer
非公平的TransferStack
通过类名得知,是使用堆栈实现的,是一个LIFO的序列。
每个请求的线程都会被包装成一个
SNode
,具有以下属性:
class SNode { volatile SNode next; // 链接的下一个SNode volatile SNode match; // 与该线程匹配的另外一个线程SNode节点 volatile Thread waiter; // 当前请求的线程 Object item; int mode; //该节点的类型(模式) }
很显然,是一个链表结构,使用一个
mode
来标识该节点的类型,具有以下值:
// 代表一个消费者 int REQUEST = 0; // 代表一个生产者 int DATA = 1; // 代表已经和另外一个节点匹配 int FULFILLING = 2;
通过源码注释得知,整个
TransferStack.transfer
可以分为以下几步:
- 如果当前的栈是空的,或者栈顶与请求的节点模式相同,那么就将该节点作为栈顶并等待下一个与之相匹配的请求节点,最后返回匹配节点的数据(take或offer)或者null(被取消/中断)
- 如果栈不为空,请求节点与栈顶节点相匹配(一个是REQUEST一个是DATA)那么当前节点模式变为FULFILLING,然后将其压入栈中和互补的节点进行匹配,完成交付后同时弹出栈并返回交易的数据,如果匹配失败则与其他节点解除关系等带回收。
-
如果栈顶已经存在一个FULFILLING的节点,说明正在交付,那么就帮助这个栈顶节点快速完成交易。
下面用图来描述先生产后消费的例子
当栈为空将其封装为
SNode
节点后入栈,自旋等待其他节点与自己匹配这是
TransferStack.transfer
的第一个部分,用来处理栈为空或者是多个生产者/消费者的情况,使得都自旋等待匹配。
if (h == null || h.mode == mode) { // empty or same-mode // 生产者/消费者不愿意等待则直接返回 if (timed && nanos <= 0L) { // can't wait if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else return null; // 创建一个节点并将该节点作为栈顶 } else if (casHead(h, s = snode(s, e, h, mode))) { // 自旋等待下一个与之匹配的节点 SNode m = awaitFulfill(s, timed, nanos); // 如果该节点已经取消等待 if (m == s) { //清理该节点 clean(s); return null; } // 如果有节点与自己匹配那么就返回交换的元素 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller // 如果当前模式为DATA就表示是 消费者等待生产者生产 // 如果当前模式为REQUEST就表示是 生产者等待消费者消费 return (E) ((mode == REQUEST) ? m.item : s.item); } }
栈不为空,消费者压入栈顶,消费者与生产者进行匹配,消费者改变头节点(也就是本身)的状态为
FuLFILLING
<img src=”