这篇文章介绍java的数据结构之链表阻塞队列LinkedBlockingQueue
1、LinkedBlockingQueue
LinkedBlockingQueue是一个链表形式的阻塞队列,遵循FIFO的原则,是线程安全的。
队列的添加数据和获取数据都比较简单,在LinkedBlockingQueue队列中,最重要的是线程安全性,这篇文章不讨论数据的插入和获取,主要讨论LinkedBlockingQueue是如何保证线程安全的。
读这篇文章的前提要对ReentrantLock锁机制有所了解,可以参考以下文章:
2、LinkedBlockingQueue的锁
LinkedBlockingQueue中有两个锁及锁的状态,如下:
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
— takeLock:
在获取数据时进行线程加锁
— notEmpty:
在获取数据时,如果队列为空时,使用notEmpty进行线程的阻塞,直到队列不为空时,被唤起,获取数据。
— putLock:
在添加数据时进行线程加锁
— notFull:
在添加数据时,如果队列数据已满,使用notFull进行线程阻塞,直到队列不满时被唤起,插入数据。
3、线程安全–数据插入
有两个数据插入的方法:
put(E e): 如果队列已满,阻塞线程,直到队列不满时被唤起,插入数据。
offer(E e):如果队列已满,直接返回false,不插入数据。
put(E e)
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
//作者注:构造一个包含插入数据的Node节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//作者注:获取putLock的锁
putLock.lockInterruptibly();
try {
//作者注:如果队列已满,调用await进行线程挂起(阻塞)
// 直到队列不满时,被唤起(参看take函数中的notFull.signal())
while (count.get() == capacity) {
notFull.await();
}
/**
* 作者注:如果队列不满或者线程被唤起(线程被唤起时会重新获取putLock锁)
* 将node节点插入到链表队列的末尾
*/
enqueue(node);
c = count.getAndIncrement();
//作者注:此时notFull的阻塞队列里面有可能被阻塞了很多线程,所以每一次插入一条数据时,都尝试进行一次唤起。
if (c + 1 < capacity)
notFull.signal();
} finally {
//作者注:释放putLock锁
putLock.unlock();
}
//作者注:如果此时c == 0,说明之前队列时空的,有可能有线程在插入数据时被挂起了,此时将这些线程进行唤起,告诉他们队列里有数据了。
if (c == 0)
signalNotEmpty();
}
offer(E e)
offer方法和put方法唯一的区别是,如果队列已满,不进行阻塞等待,直接返回false,不再插入数据。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//如果队列已满,直接返回
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
4、线程安全–数据获取
获取数据的方法比较多,有以下几个:
— take():
如果队列为空,阻塞线程,直到队列不空时,获取数据,并删除数据。
— peek():
如果队列为空,不阻塞线程,直接返回null。如果队列不为空,获取数据,但不从队列删除数据
— poll():
如果队列为空,不阻塞线程,直接返回null。如果队列不为空,获取数据,同时从队列中将数据删除
take():
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//作者注:获取takeLock的锁,获取成功继续执行,获取失败进行等待
// 等待和阻塞的区别:
// 等待:有权利获取锁
// 阻塞:没有权利获取锁,只有被唤醒,才有权利获取锁
takeLock.lockInterruptibly();
try {
//作者注:如果队列未空,调用await进行线程挂起(阻塞),并释放takeLock锁
// 直到队列不为空时,被唤起(参看put函数中的notEmpty.signal())
while (count.get() == 0) {
notEmpty.await();
}
/**
* 作者注:如果队列不空或者线程被唤起(线程被唤起时会重新获取takeLock锁)
* 获取head头部数据,并删除
*/
x = dequeue();
c = count.getAndDecrement();
//作者注:此时notEmpty的阻塞队列里面有可能被阻塞了很多线程,所以每一次插入一条数据时,都尝试进行一次唤起。
if (c > 1)
notEmpty.signal();
} finally {
//作者注: 释放takeLock锁
takeLock.unlock();
}
//作者注:如果此时c == capacity,说明之前队列满时,有可能有线程在插入数据时被挂起了,此时将这些线程进行唤起,告诉他们队列已经不满了,可以插入数据了。
// NotFull.signal()就在此方法中。
if (c == capacity)
signalNotFull();
return x;
}
peek():
与take的不同点:
(1)、take在队列为空时阻塞等待,peek在队列为空时直接返回,不阻塞。
(2)、take在获取数据后,将数据从队列删除;peek只获取数据,不删除数据。
public E peek() {
//作者注:如果队列为空,直接返回,不进行阻塞等待。
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else //作者注:只返回数据,但不移动head指针,代表不删除数据。
return first.item;
} finally {
takeLock.unlock();
}
}
pool():
与take的不同点:take在队列为空时阻塞等待,pool在队列为空时直接返回,不阻塞
与take的相同点:如果获取到数据,将数据从队列里删除。
public E poll() {
final AtomicInteger count = this.count;
//作者注:如果队列为空,直接返回,不进行阻塞等待。
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
//作者注:获取数据,并且将获取的数据从队列中删除。
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}