【Java多线程案例】使用阻塞队列实现生产者消费者模型

  • Post author:
  • Post category:java




前言


本篇文章讲解多线程案例之阻塞队列。主要讲解阻塞队列的特性、实际开发中常用的到的生产者消费者模型,以及生产者消费者模型解耦合、削峰填谷的好处。并且使用 Java 多线程模拟实现一个生产者消费者模型、阻塞队列版的生产者消费者模型。

文章从什么是阻塞队列、生产者消费者模型、高内聚低耦合、削峰填谷、模拟实现生产者消费者模型、阻塞队列版消费者模型,这几个模块来讲解。话不多说,让我们进入 阻塞队列 的学习吧~


目录


1. 什么是阻塞队列


2. 生产者消费者模型


2.1. 解耦合


2.2 削峰填谷


2.3 生产者消费者案例


3. 阻塞队列生产者消费者模型的实现




1. 什么是阻塞队列


在数据结构的学习中,我们知道了队列有普通队列、循环队列,它们都遵循“先进先出”的原则。阻塞队列也遵循这个原则,它是一种特殊的队列(带有阻塞功能的队列),并且满足以下两点:


  • 当队列



    的时候,如果继续往队列中插入数据,则发生阻塞状态,直到有数据出队列。

  • 当队列



    的时候,如果往外取数据,也发生阻塞状态,直到有数序入队列。

Java 标准库中的阻塞队列为:



BlockingDeque<>



,是一个泛型接口。因此,我们使用的时候直接遵循标准库的写法即可。注意以下两点:


  1. BlockingDeque 是一个接口,因此我们实例对象时用的是 LinkedBlockingQueue类。

  2. put 方法用于阻塞式的入队列, take 用于阻塞式的出队列。

通过上述介绍,我们可以写出一段简易的阻塞队列代码:

    public static void main(String[] args) throws InterruptedException {
        //BlockingQueue<>为阻塞队列的原型
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>();
        //take(取元素)、put(插入元素)为阻塞队列的两个核心方法
        blockingQueue.put(20);//插入元素20
        Integer result = blockingQueue.take();//从队头取元素
        System.out.println(result);
    }


运行后打印:

通过上述代码,大家已经对阻塞队列有了一个浅的认识,当然你可以可以多 take 几次来达到阻塞效果。

阻塞队列主要用于“生产者消费者模型”,是实际开发中常用到的,下面我就来介绍它的用法。




2. 生产者消费者模型


什么是生产者消费者模型?从字面上来看,前者是生产者,后者是消费者。

因此,生产者与消费者之间进行交互需要一个中间平台,这个平台就是阻塞队列,如果没有中间平台交易就会产生一定风险、效率也会降低很多。

生产者消费者体现:过年大家都包饺子,假设一家有三个人员,人员1 擀饺子皮,擀完后放在砧板上,人员2 和 人员3 负责包饺子。这样一个例子中 人员1 就是生产者,砧板就是平台,人员2 和 人员3 是消费者。如果三个人员自己擀皮自己包,这样的效率是非常低的!(只有一个擀面杖、无砧板情况下)

中间平台优点体现:假如,有两个服务器它们直接进行交互。服务器1挂了,紧接着服务器2也挂了。因此,我们需要一个中间平台(阻塞队列),连接这两个服务器并进行交互。这样无论那一个服务器挂了也不影响另一个服务器。

生产者消费者模型的优点有很多,但最突出了有两点:


解耦合





削峰填谷


。请看下方讲解。




2.1. 解耦合


大家都听过


高内聚低耦合


这个概念,在此我来做个解释:

何为


内聚


,举个例子:在快递站拿快递,我们可以根据货物号来快速的找到想要的物品,这就是


高内聚


但某一天,快递站来了个怪人,他在找快递的过程中把每个拿起来的快递都随意放在其他位置。因此别人再去找自己的快递时就不能快速的找到自己的快递了,这就是


低内聚


的一个体现。

在 Java 中高内聚主要体现在代码的条理性,相关联的代码很好的放在一起。低内聚则是相关联的代码没有放在一起,东一块、西一块。

何为


耦合




主要体现一个关联性

。也是举个例子:假设我的亲人生病住院了,我会放下手中的一切去好好照顾他/她,哪怕对我现实生活影响很大,我也义无反顾。这样的行为就是


高耦合


的。

但我的女神生病了,她发了个朋友圈。由于我和她只是“朋友圈点赞之交”,我只会给她点个赞并且评论句多喝热水。因为她生病了对我的影响是很低的,所以可以称为


低耦合


耦合高,在 Java 主要体现在多个模块之间的关联,关联越强耦合越高,关联越弱耦合越低。


回归正题,阻塞队列的解耦合主要体现在多个线程之间进行交互。如以下例子:

在上、下图中,A、B、C是我们的业务服务器,会经常更改代码, 因此会经常出现 bug 就容易挂。通过消费者模型就能很好的避免这个问题。

当然,阻塞队列服务器也会挂,但相对于ABC业务服务器来说挂的机率较小。




2.2 削峰填谷


三峡大坝利用的就是削峰填谷机制,有效缓解了电力系统在高峰期的压力和在低峰期的浪费现象。

当电力系统电力值达到高峰时,三峡大坝则会把部分的水存储在水库里面,只放出适合的水流量,减少并调节电力系统的负荷,有效缓解电力系统在高峰期的浪费现象。

当电力处于低峰期时也就是电力供给不足的情况,三峡大坝会把水库里存储的水给放出来,通过电站的发电量、水库的排水等措施,缓解了电力系统在低峰期的电力不足。

上述例子就是削峰填谷的一个简单理解,在 Java 中阻塞队列就能达到削峰填谷的功能。

当服务器与服务器之间进行交互常常是以一个很平缓的速率进行的,但某一时刻突然达到了一个峰值。

这个时候阻塞队列就能把峰值带来的压力给顶下来,让服务器之间还是以平稳的速率进行交互。

如:服务器A 作为生产者,服务器B 作为消费者,服务器A 最高可达到 1秒3万 次的速率,服务器B 最高只能 1秒1万 次这时候就会出现下图这样的问题。

上图中 服务器A 作为生产者、服务器B 作为消费者。当 服务器A 收到的请求多了。回复给阻塞队列的内容也变多了。

但 服务器B 最多能接受 1秒1万 次的数据。因此,阻塞队列就会把多的请求存储下来并按照 1秒1万 次的速率给 服务器B 传输数据,这样就不会导致 服务器B 崩溃。

以上的三峡大坝、服务器交互的例子就是对削峰填谷进行的一个讲解,当然比较浅显。具体代码的实现,请看下方讲解。




2.3 生产者消费者案例


生产者消费者主要体现一个线程生产,一个线程消费。如下代码:

public static void main(String[] args) {
        BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>();
        //消费者
        Thread thread1 = new Thread(()->{
            while (true) {
                try {
                    int value = blockingDeque.take();
                    System.out.println("消费者: "+value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread1.start();//启动线程1

        //生产者
        Thread thread2 = new Thread(()->{
            int value = 1;
            while (true) {
                try {
                    blockingDeque.put(value);
                    System.out.println("生产者: "+value);
                    Thread.sleep(1000);
                    value++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread2.start();//启动线程2
    }


运行后打印:

以上代码不难看懂,主要用到阻塞队列的


take





put


方法。生产者 thread2 使用 put 方法生产元素,消费者 thread1 使用 take 方法消费元素。



注意


,在线程内调用 take 或put 方法,都得 try/catch

InterruptedException

这个异常。我们直接Alt+Enter take 或 put方法即可。




3. 阻塞队列生产者消费者模型的实现


使用阻塞队列实现生产者消费者模式过程如下:

首先我们要让这个队列循环下去,如何让一个队列循环下去,最好实现方法就是使用循环队列。

设计中我们可以用 head 作为队头元素下标、tail 作为队尾元素下标、size 作为当前元素的个数。

head 等于 tail 的时候证明是初始状态(队列空),或者是队列已满。因此,有以下几点注意事项:


入队列:

  • 当 size 等于队列长度时,证明队列已满,此时不能插入数据。
  • 当 tail 等于队列长度时,tail 置为0,从第一个位置开始插入元素。


出队列:

  • 当 size 等于 0 时,证明队列已空,此时不能出数据。
  • 当 head 等于队列长度时候,head 置为 0 ,从第一个元素开始出元素。

当然,为了达到阻塞的效果,在队列

满状态



空状态

的方法里面使用 wait 方法造成阻塞状态。在插元素方法里面里面 notify 唤醒队列空时的阻塞状态,在拿元素里面 notify 唤醒队列满时的阻塞状态。

具体代码实现如下:

class MyBlockingQueue {
    int [] array = new int[100];//定义一个数组为队列
    int head = 0;//队头下标
    int tail = 0;//队尾下标
    int size = 0;//元素个数

    //模拟实现 put 方法
    synchronized public void put(int value) throws InterruptedException {
        if (size == array.length) {
            this.wait();//队列已满设为阻塞状态
        }
        array[tail] = value;//把value值放在数组对应下标中
        tail++;//队尾下表自增
        size++;//元素个数自增
        if (tail == array.length) {
            tail = 0;//队尾下标重置为0
        }
        this.notify();//唤醒队列空的阻塞状态
    }

    //模拟实现 take 方法
    synchronized public int take() throws InterruptedException {
        if (size == 0){
            this.wait();//队列已空设为阻塞状态
        }
        int value = array[head];//队头元素负责个value
        head++;//队头下标往后自增
        size--;//元素个数自减
        if (head == array.length) {
            head = 0;//队头下标置为0
        }
        this.notify();//唤醒队列满的阻塞状态
        return value;//返回队头元素
    }
}
public class ThreadDemo2 {
    public static void main(String[] args) {
        MyBlockingQueue myBlockingQueue = new MyBlockingQueue();
        //生产者
        Thread thread1 = new Thread(()-> {
            int i = 1;
            while (true) {
                try {
                    System.out.println("生产者: "+i);
                    myBlockingQueue.put(i);
                    i++;
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread1.start();
        //消费者
        Thread thread2 = new Thread(()-> {
            while (true) {
                try {
                    int i = myBlockingQueue.take();
                    System.out.println("消费者: "+i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread2.start();
    }
}


运行后打印:

以上代码,我使用一个数组来模拟实现循环队列的这样更容易去理解。其他细节大家可以在代码中的注释进行理解。 队列已经循环队列不太熟悉朋友可以回头好好复习一下。

注意,一个队列不可能为空状态又为满状态,因此在上述代码中,notify 唤醒的都是对方的状态。这样一个阻塞队列生产者消费者模式就能很好的实现了。

另外,阻塞队列不存在线程安全问题,因为阻塞队列底层有加锁机制。因此,大家可以安心使用。

如果面试的时候,面试说:“请你写一个生产者消费者模型”。那么这个时候,你就可以利用上方代码进行拓展。


🧑‍💻作者:一只爱打拳的程序猿,Java领域新星创作者,阿里云社区优质创作者、专家博主。

📒博客主页:

这是博主的主页

🗃️文章收录于:

Java多线程编程

🗂️JavaSE的学习:

JavaSE

🗂️Java数据结构:

数据结构与算法

本篇博文到这里就结束了,感谢点赞、评论、收藏、关注~



版权声明:本文为weixin_64916311原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。