java数据结构之LinkedBlockingQueue

  • Post author:
  • Post category:java


这篇文章介绍java的数据结构之链表阻塞队列LinkedBlockingQueue

1、LinkedBlockingQueue

LinkedBlockingQueue是一个链表形式的阻塞队列,遵循FIFO的原则,是线程安全的。

队列的添加数据和获取数据都比较简单,在LinkedBlockingQueue队列中,最重要的是线程安全性,这篇文章不讨论数据的插入和获取,主要讨论LinkedBlockingQueue是如何保证线程安全的。

读这篇文章的前提要对ReentrantLock锁机制有所了解,可以参考以下文章:


ReentrantLock及Condition加锁解锁机制

2、LinkedBlockingQueue的锁

LinkedBlockingQueue中有两个锁及锁的状态,如下:

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();


— takeLock:

在获取数据时进行线程加锁


— notEmpty:

在获取数据时,如果队列为空时,使用notEmpty进行线程的阻塞,直到队列不为空时,被唤起,获取数据。


— putLock:

在添加数据时进行线程加锁


— notFull:

在添加数据时,如果队列数据已满,使用notFull进行线程阻塞,直到队列不满时被唤起,插入数据。

3、线程安全–数据插入

有两个数据插入的方法:


put(E e): 如果队列已满,阻塞线程,直到队列不满时被唤起,插入数据。


offer(E e):如果队列已满,直接返回false,不插入数据。


put(E e)

public void put(E e) throws InterruptedException {
	if (e == null) throw new NullPointerException();
	// Note: convention in all put/take/etc is to preset local var
	// holding count negative to indicate failure unless set.
	int c = -1;
	//作者注:构造一个包含插入数据的Node节点
	Node<E> node = new Node<E>(e);
	final ReentrantLock putLock = this.putLock;
	final AtomicInteger count = this.count;
	//作者注:获取putLock的锁
	putLock.lockInterruptibly();
	try {
		
		//作者注:如果队列已满,调用await进行线程挂起(阻塞)
		//        直到队列不满时,被唤起(参看take函数中的notFull.signal())
		while (count.get() == capacity) {
			notFull.await();
		}
		/**
		* 作者注:如果队列不满或者线程被唤起(线程被唤起时会重新获取putLock锁)
		*         将node节点插入到链表队列的末尾
		*/
		enqueue(node);
		c = count.getAndIncrement();
		
		//作者注:此时notFull的阻塞队列里面有可能被阻塞了很多线程,所以每一次插入一条数据时,都尝试进行一次唤起。
		if (c + 1 < capacity)
			notFull.signal();
	} finally {
		//作者注:释放putLock锁
		putLock.unlock();
	}
	
	//作者注:如果此时c == 0,说明之前队列时空的,有可能有线程在插入数据时被挂起了,此时将这些线程进行唤起,告诉他们队列里有数据了。
	if (c == 0)
		signalNotEmpty();
}


offer(E e)

offer方法和put方法唯一的区别是,如果队列已满,不进行阻塞等待,直接返回false,不再插入数据。

public boolean offer(E e) {
	if (e == null) throw new NullPointerException();
	final AtomicInteger count = this.count;
	//如果队列已满,直接返回
	if (count.get() == capacity)
		return false;
	int c = -1;
	Node<E> node = new Node<E>(e);
	final ReentrantLock putLock = this.putLock;
	putLock.lock();
	try {
		if (count.get() < capacity) {
			enqueue(node);
			c = count.getAndIncrement();
			if (c + 1 < capacity)
				notFull.signal();
		}
	} finally {
		putLock.unlock();
	}
	if (c == 0)
		signalNotEmpty();
	return c >= 0;
}

4、线程安全–数据获取

获取数据的方法比较多,有以下几个:


— take():

如果队列为空,阻塞线程,直到队列不空时,获取数据,并删除数据。


— peek():

如果队列为空,不阻塞线程,直接返回null。如果队列不为空,获取数据,但不从队列删除数据


— poll():

如果队列为空,不阻塞线程,直接返回null。如果队列不为空,获取数据,同时从队列中将数据删除

take():

public E take() throws InterruptedException {
	E x;
	int c = -1;
	final AtomicInteger count = this.count;
	final ReentrantLock takeLock = this.takeLock;
	//作者注:获取takeLock的锁,获取成功继续执行,获取失败进行等待
	//        等待和阻塞的区别:
	//        等待:有权利获取锁
	//        阻塞:没有权利获取锁,只有被唤醒,才有权利获取锁
	takeLock.lockInterruptibly();
	try {
		//作者注:如果队列未空,调用await进行线程挂起(阻塞),并释放takeLock锁
		//        直到队列不为空时,被唤起(参看put函数中的notEmpty.signal())
		while (count.get() == 0) {
			notEmpty.await();
		}
		
		/**
		* 作者注:如果队列不空或者线程被唤起(线程被唤起时会重新获取takeLock锁)
		*         获取head头部数据,并删除
		*/
		x = dequeue();
		c = count.getAndDecrement();
		
		//作者注:此时notEmpty的阻塞队列里面有可能被阻塞了很多线程,所以每一次插入一条数据时,都尝试进行一次唤起。
		if (c > 1)
			notEmpty.signal();
	} finally {
		
		//作者注: 释放takeLock锁
		takeLock.unlock();
	}
	
	//作者注:如果此时c == capacity,说明之前队列满时,有可能有线程在插入数据时被挂起了,此时将这些线程进行唤起,告诉他们队列已经不满了,可以插入数据了。
	// NotFull.signal()就在此方法中。
	if (c == capacity)
		signalNotFull();
	return x;
}


peek():

与take的不同点:

(1)、take在队列为空时阻塞等待,peek在队列为空时直接返回,不阻塞。

(2)、take在获取数据后,将数据从队列删除;peek只获取数据,不删除数据。

public E peek() {
	//作者注:如果队列为空,直接返回,不进行阻塞等待。
	if (count.get() == 0)
		return null;
	final ReentrantLock takeLock = this.takeLock;
	takeLock.lock();
	try {
		Node<E> first = head.next;
		if (first == null)
			return null;
		else //作者注:只返回数据,但不移动head指针,代表不删除数据。
			return first.item;
	} finally {
		takeLock.unlock();
	}
}

pool():

与take的不同点:take在队列为空时阻塞等待,pool在队列为空时直接返回,不阻塞

与take的相同点:如果获取到数据,将数据从队列里删除。

public E poll() {
	final AtomicInteger count = this.count;
	//作者注:如果队列为空,直接返回,不进行阻塞等待。
	if (count.get() == 0)
		return null;
	E x = null;
	int c = -1;
	final ReentrantLock takeLock = this.takeLock;
	takeLock.lock();
	try {
		if (count.get() > 0) {
			//作者注:获取数据,并且将获取的数据从队列中删除。
			x = dequeue();
			c = count.getAndDecrement();
			if (c > 1)
				notEmpty.signal();
		}
	} finally {
		takeLock.unlock();
	}
	if (c == capacity)
		signalNotFull();
	return x;
}



版权声明:本文为chenzhiang1原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。