BlockingQueue中 take、offer、put、add的一些比较

  • Post author:
  • Post category:其他


(转自:

https://blog.csdn.net/wei_ya_wen/article/details/19344939

侵删)

在java多线程操作中, BlockingQueue<E> 常用的一种方法之一。在看jdk内部尤其是一些多线程,大量使用了blockinkQueue 来做的。

借用jdk api解释下:


BlockingQueue

方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(

null



false

,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:

抛出异常 特殊值 阻塞 超时

插入


add(e)



offer(e)



put(e)



offer(e, time, unit)


移除


remove()



poll()



take()



poll(time, unit)


检查


element()



peek()

不可用 不可用

offer: 将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回

true

,如果当前没有可用的空间,则返回

false,不会抛异常:



java源代码




  1. public boolean offer(E e) {


  2. if (

    e


    == null) throw new NullPointerException();


  3. final ReentrantLock

    lock


    =


    this


    .lock;


  4. lock.lock();

  5. try {

  6. if (

    count


    == items.length)


  7. return false;

  8. else {

  9. insert(e);

  10. return true;

  11. }

  12. } finally {

  13. lock.unlock();

  14. }

  15. }




put:


将指定元素插入此队列中,将等待可用的空间.通俗点说就是>maxSize 时候,阻塞,直到能够有空间插入元素


java源代码:



  1. public void put(E e) throws InterruptedException {


  2. if (

    e


    == null) throw new NullPointerException();


  3. final E[]

    items


    =


    this


    .items;


  4. final ReentrantLock

    lock


    =


    this


    .lock;


  5. lock.lockInterruptibly();

  6. try {

  7. try {

  8. while (

    count


    == items.length)


  9. notFull.await();

  10. } catch (InterruptedException ie) {

  11. notFull.signal(); // propagate to non-interrupted thread

  12. throw ie;

  13. }

  14. insert(e);

  15. } finally {

  16. lock.unlock();

  17. }

  18. }



take: 获取并移除此队列的头部,在元素变得可用之前一直等待 。queue的长度 == 0 的时候,一直阻塞


java 源代码:




  1. public E take() throws InterruptedException {


  2. final ReentrantLock

    lock


    =


    this


    .lock;


  3. lock.lockInterruptibly();

  4. try {

  5. try {

  6. while (

    count


    == 0)


  7. notEmpty.await();

  8. } catch (InterruptedException ie) {

  9. notEmpty.signal(); // propagate to non-interrupted thread

  10. throw ie;

  11. }

  12. E

    x


    =


    extract


    ();


  13. return x;

  14. } finally {

  15. lock.unlock();

  16. }

  17. }




add: 和collection的add一样,没什么可以说的。如果当前没有可用的空间,则抛出


IllegalStateException







例子如下:



  1. public static void main(String[] args) {


  2. java.util.concurrent.Executor

    executor


    =


    Executors


    .newFixedThreadPool(10);


  3. Runnable

    task


    =


    new


    Runnable() {



  4. @Override

  5. public void run() {

  6. System.out.println(“ggg”);

  7. }

  8. };

  9. executor.execute(task);

  10. */


  11. BlockingQueue

    q


    =


    new


    ArrayBlockingQueue(10);


  12. Producer

    p


    =


    new


    Producer(q);


  13. Consumer

    c1


    =


    new


    Consumer(q);


  14. Consumer

    c2


    =


    new


    Consumer(q);


  15. new Thread(p).start();

  16. new Thread(c1).start();

  17. new Thread(c2).start();


  18. }

  19. }



  20. class Producer implements Runnable {

  21. private final BlockingQueue

    <


    Object


    >


    queue;


  22. Producer(BlockingQueue q) {

    queue


    =


    q


    ; }


  23. public void run() {

  24. for(int

    i


    =


    0


    ;i


    <


    100


    ;i++){


  25. try {

  26. queue.put(i);

  27. } catch (InterruptedException e1) {

  28. // TODO Auto-generated catch block

  29. e1.printStackTrace();

  30. }

  31. try {

  32. Thread.sleep(20);

  33. } catch (InterruptedException e) {

  34. // TODO Auto-generated catch block

  35. e.printStackTrace();

  36. }

  37. }

  38. }


  39. }


  40. class Consumer implements Runnable {

  41. private final BlockingQueue queue;

  42. Consumer(BlockingQueue q) {

    queue


    =


    q


    ; }


  43. public void run() {

  44. try {

  45. while(true) {

  46. consume(

  47. queue.take()

  48. );

  49. }

  50. } catch (InterruptedException ex) {


  51. }

  52. }

  53. void consume(Object x) {

  54. System.out.println(x);


  55. }

  56. }