Linux系统编程:生产者与消费者模型

  • Post author:
  • Post category:linux




前言

生产者和消费者问题是线程模型中的经典问题:

生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加资源,消费者从存储空间中取走资源

,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。



一:模型实现

在多线程开发当中,同一线程中若

生产者处理速度与消费者处理速度相差很大,则有可能出现等待对方的情况

。为了解决这个问题于是引入了生产者和消费者模型。



模型框架


一个场所:

线程安全队列


两种角色:

生产者线程和消费者线程


三种关系:

生产者与生产者(互斥)、消费者与消费者(互斥)、生产者与消费者(同步+互斥)



模型优点


解耦合:

生产者模块和消费者模块不需要直接进行交互,只是共同操作线程安全队列。


支持并发:

支持多个生产者或消费者线程操作线程安全队列。


支持忙闲不均:

线程安全队列中有多个结点可进行缓冲。



代码实现

借助

阻塞队列作为交易场所,互斥锁实现互斥关系,条件变量实现同步关系

实现生产者与消费者模型

#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include<queue>

using namespace std;

const size_t MAX_SIZE = 5;

class BlockQueue
{
    public:
        BlockQueue(size_t capacity = MAX_SIZE) : _capacity(capacity)
        {
            pthread_mutex_init(&_mutex, NULL);
            pthread_cond_init(&_pro_cond, NULL);
            pthread_cond_init(&_cus_cond, NULL);
        }

        ~BlockQueue()
        {
            pthread_mutex_destroy(&_mutex);
            pthread_cond_destroy(&_pro_cond);
            pthread_cond_destroy(&_cus_cond);
        }

        void Push(int data)
        {
            //加锁保证线程安全
            pthread_mutex_lock(&_mutex);

            //队列已满,阻塞生产者生产
            //循环等待,防止有其他进程进入
            while(_queue.size() == _capacity)
            {
                pthread_cond_wait(&_pro_cond, &_mutex);
            }

            //数据入队
            _queue.push(data);

            pthread_mutex_unlock(&_mutex);
            //生产完毕,唤醒消费者消费
            pthread_cond_signal(&_cus_cond);

        }

        void Pop(int& data)
        {
            pthread_mutex_lock(&_mutex);

            while(_queue.empty())
            {
                //队列空,阻塞消费者消费
                pthread_cond_wait(&_cus_cond, &_mutex);
            }

            //数据出队
            data = _queue.front();
            _queue.pop();
            
            pthread_mutex_unlock(&_mutex);

            //消费完毕,唤醒生产者生产
            pthread_cond_signal(&_pro_cond);
        }

    private:
        queue<int> _queue;
        size_t _capacity;
		
		//实现互斥关系的互斥锁
        pthread_mutex_t _mutex;
        //实现同步关系的条件变量
        pthread_cond_t _pro_cond;
        pthread_cond_t _cus_cond;
};

//因为入口函数的参数必须为void* ,所以要强转成BlockQueue类型
void *productor(void *arg)o
{
    BlockQueue *queue = (BlockQueue*)arg;
    //生产者不断生产数据
    
    int data = 0;
    while(1)
    {
        //生产数据
        queue->Push(data);

        cout << "生产者放入数据:" << data++ << endl;
    }

    return NULL;
}

void *customer(void *arg)
{
    BlockQueue *queue = (BlockQueue*)arg;
    //消费者不断取出数据
    while(1)
    {
        //取出数据
        int data;
        queue->Pop(data);

        cout << "消费者取出数据:" << data << endl;
    }

    return NULL;
}

int main()
{
    BlockQueue queue;
    pthread_t pro_tid[5], cus_tid[5];
	
    //创建线程
    for(size_t i = 0; i < 5; i++)
    {
        int ret = pthread_create(&pro_tid[i], NULL, productor, (void*)&queue);

        if(ret)
        {
            cout << "生产者线程创建失败" << endl;
            return -1;
        }

        ret = pthread_create(&cus_tid[i], NULL, customer, (void*)&queue);
        if(ret)
        {
            cout << "消费者线程创建失败" << endl;
            return -1;
        }
    }   
    
    //等待线程退出
    for(size_t i = 0; i < 4; i++)
    {
        pthread_join(pro_tid[i], NULL);
        pthread_join(cus_tid[i], NULL);
    }

    return 0;
}



版权声明:本文为Outtch_原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。