前言
生产者和消费者问题是线程模型中的经典问题:
生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加资源,消费者从存储空间中取走资源
,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。
一:模型实现
在多线程开发当中,同一线程中若
生产者处理速度与消费者处理速度相差很大,则有可能出现等待对方的情况
。为了解决这个问题于是引入了生产者和消费者模型。
模型框架
一个场所:
线程安全队列
两种角色:
生产者线程和消费者线程
三种关系:
生产者与生产者(互斥)、消费者与消费者(互斥)、生产者与消费者(同步+互斥)
模型优点
解耦合:
生产者模块和消费者模块不需要直接进行交互,只是共同操作线程安全队列。
支持并发:
支持多个生产者或消费者线程操作线程安全队列。
支持忙闲不均:
线程安全队列中有多个结点可进行缓冲。
代码实现
借助
阻塞队列作为交易场所,互斥锁实现互斥关系,条件变量实现同步关系
实现生产者与消费者模型
#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include<queue>
using namespace std;
const size_t MAX_SIZE = 5;
class BlockQueue
{
public:
BlockQueue(size_t capacity = MAX_SIZE) : _capacity(capacity)
{
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_pro_cond, NULL);
pthread_cond_init(&_cus_cond, NULL);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_pro_cond);
pthread_cond_destroy(&_cus_cond);
}
void Push(int data)
{
//加锁保证线程安全
pthread_mutex_lock(&_mutex);
//队列已满,阻塞生产者生产
//循环等待,防止有其他进程进入
while(_queue.size() == _capacity)
{
pthread_cond_wait(&_pro_cond, &_mutex);
}
//数据入队
_queue.push(data);
pthread_mutex_unlock(&_mutex);
//生产完毕,唤醒消费者消费
pthread_cond_signal(&_cus_cond);
}
void Pop(int& data)
{
pthread_mutex_lock(&_mutex);
while(_queue.empty())
{
//队列空,阻塞消费者消费
pthread_cond_wait(&_cus_cond, &_mutex);
}
//数据出队
data = _queue.front();
_queue.pop();
pthread_mutex_unlock(&_mutex);
//消费完毕,唤醒生产者生产
pthread_cond_signal(&_pro_cond);
}
private:
queue<int> _queue;
size_t _capacity;
//实现互斥关系的互斥锁
pthread_mutex_t _mutex;
//实现同步关系的条件变量
pthread_cond_t _pro_cond;
pthread_cond_t _cus_cond;
};
//因为入口函数的参数必须为void* ,所以要强转成BlockQueue类型
void *productor(void *arg)o
{
BlockQueue *queue = (BlockQueue*)arg;
//生产者不断生产数据
int data = 0;
while(1)
{
//生产数据
queue->Push(data);
cout << "生产者放入数据:" << data++ << endl;
}
return NULL;
}
void *customer(void *arg)
{
BlockQueue *queue = (BlockQueue*)arg;
//消费者不断取出数据
while(1)
{
//取出数据
int data;
queue->Pop(data);
cout << "消费者取出数据:" << data << endl;
}
return NULL;
}
int main()
{
BlockQueue queue;
pthread_t pro_tid[5], cus_tid[5];
//创建线程
for(size_t i = 0; i < 5; i++)
{
int ret = pthread_create(&pro_tid[i], NULL, productor, (void*)&queue);
if(ret)
{
cout << "生产者线程创建失败" << endl;
return -1;
}
ret = pthread_create(&cus_tid[i], NULL, customer, (void*)&queue);
if(ret)
{
cout << "消费者线程创建失败" << endl;
return -1;
}
}
//等待线程退出
for(size_t i = 0; i < 4; i++)
{
pthread_join(pro_tid[i], NULL);
pthread_join(cus_tid[i], NULL);
}
return 0;
}
版权声明:本文为Outtch_原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。