线程池类似于内存池,连接池等,在一开始创建好一定数量的线程,当有任务需要处理时,就将该任务丢进线程池的某个线程中处理,而不是每个任务都启动一个线程。使用线程池的好处在于,不需要频繁的创建线程,因为创建线程是一个较耗资源的操作。因此,相对而言一开始创建好线程,要比动态创建线程对性能影响更小。其次,线程池可以更好的将线程管理起来,对外提供简单的接口,内部完成对线程的调度,这也是高内聚的一个思想,将复杂封装起来,对外在满足业务场景的前提下尽可能简单。线程池作为一个独立的非业务模块,任何需要处理多任务的场景都可以使用该组件,这样更有利于代码复用。
需求分析:
- 线程池可以根据自定义参数实现线程数量控制,比如,可以设置初始线程数量,可以设置最大线程数量,可以设置线程空闲时间。
- 初始线程数量为线程池初始化时,创建线程的数量。此数量是线程池最小线程数,任何时候线程池都应该有这么多线程。
- 最大线程数量为线程池支持的最大线程数量,线程数量不是越多越好,线程数过多会导致大量的线程切换,这本身又会造成性能问题,因此使用最大线程数量来限制线程数量。 一般情况下,初始线程数量小于最大线程数量,当任务数量较多时,线程池能够动态扩展线程数量。同样,线程比较空闲时,为了不占用过多的线程资源,可以将空闲线程退出。
- 线程空闲时间,当线程池在一段时间内都没有事情可干,那么就将此线程退出。
- 任务调度设计,任务进入线程池,由哪个线程处理,如何平均的将任务分配给各个线程?这里需要一个队列,用来缓存任务,外部通过接口将任务推到队列,线程池内部需要快速将任务分配给某个线程。这里设计为让线程内部去竞争获得任务,这样就不需要设计任务分配相关算法,简化了设计。线程池中的各个线程,当有任务的时候从队列里面拿任务,当没有任务的时候希望线程能够挂起。使用条件变量,当没有任务的时候,将线程挂起来,当有任务时,由外部通知条件变量,唤醒线程,获取任务,执行任务。这样的设计,在同一时刻只能有一个线程拿到一个任务,这也是对任务队列的保护。在任务运行时也可以达到并行的效果,这样线程池的设计目的也就达到了。
代码实现
// ThreadPool.h
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__
#include <functional>
#include <queue>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>
namespace threadpool
{
class ThreadPool
{
using TaskType = std::function<void (void*)>;
static const int INIT_NUM = 1;
static const int MAX_NUM = 4;
static const int MAX_IDLE_TIME_SECOND = 6;
public:
ThreadPool(int initNum, int maxNum, int idleSec);
ThreadPool();
~ThreadPool();
public:
void AddTask(const TaskType& task, void* arg);
private:
void Init();
void ThreadRoutine(int index);
void PoolGrow();
private:
int _initNum;
int _maxNum;
int _idleSec;
bool _stop;
std::queue<std::pair<TaskType, void*>> _taskQueue;
std::vector<std::thread*> _pool;
std::mutex _mutex;
std::condition_variable _cond;
std::atomic<int> _busyCount;
std::atomic<int> _threadCount;
};
} // namespace threadpool
#endif // __THREAD_POOL_H__
// ThreadPool.cpp
#include <assert.h>
#include <stdio.h>
#include "ThreadPool.h"
using namespace threadpool;
const int ThreadPool::INIT_NUM;
const int ThreadPool::MAX_NUM;
const int ThreadPool::MAX_IDLE_TIME_SECOND;
ThreadPool::ThreadPool(int initNum, int maxNum, int idleSec)
: _initNum(initNum)
, _maxNum(maxNum)
, _idleSec(idleSec)
, _stop(false)
, _busyCount(0)
, _threadCount(0)
{
Init();
}
ThreadPool::ThreadPool()
: _initNum(ThreadPool::INIT_NUM)
, _maxNum(ThreadPool::MAX_NUM)
, _idleSec(ThreadPool::MAX_IDLE_TIME_SECOND)
, _stop(false)
, _busyCount(0)
, _threadCount(0)
{
Init();
}
void ThreadPool::Init()
{
if(_idleSec <= 0){
_idleSec = ThreadPool::MAX_IDLE_TIME_SECOND;
}
_pool.reserve(_maxNum);
for(int i = 0; i < _maxNum; ++i){
if(i < _initNum){
_pool.push_back(new std::thread(&ThreadPool::ThreadRoutine, this, i));
}else{
_pool.push_back(nullptr);
}
}
_threadCount.store(_initNum);
}
ThreadPool::~ThreadPool()
{
{
std::lock_guard<std::mutex> lock(_mutex);
_stop = true;
}
_cond.notify_all();
for(int i = 0; i < _maxNum; ++i){
auto thread = _pool[i];
if(thread && thread->joinable()){
thread->join();
delete thread;
}
}
}
void ThreadPool::AddTask(const TaskType& task, void* arg)
{
{
std::lock_guard<std::mutex> lock(_mutex);
_taskQueue.emplace(task, arg);
}
_cond.notify_one();
PoolGrow();
}
void ThreadPool::PoolGrow()
{
int busy = _busyCount.load();
int threadCount = _threadCount.load();
printf("count:%d, busy:%d\n", threadCount, busy);
if(threadCount == busy){
if(threadCount < _maxNum){
for(int i = 0; i < _maxNum; ++i){
if(_pool[i] == nullptr){
_pool[i] = new std::thread(&ThreadPool::ThreadRoutine, this, i);
printf("add thread[%d]\n", i);
++_threadCount;
break;
}
}
}
}
}
void ThreadPool::ThreadRoutine(int index)
{
while(1){
std::pair<TaskType, void*> task(nullptr, 0);
{
std::cv_status waitStatus = std::cv_status::no_timeout;
std::unique_lock<std::mutex> lock(_mutex);
while(_taskQueue.empty() && waitStatus != std::cv_status::timeout && !_stop){
int idleTime = MAX_IDLE_TIME_SECOND;
waitStatus = _cond.wait_for(lock, std::chrono::seconds(_idleSec));
}
if(!_taskQueue.empty()){
task = _taskQueue.front();
_taskQueue.pop();
}else if(_stop){
break;
}else if(waitStatus == std::cv_status::timeout){
if(_threadCount > _initNum){
_pool[index]->detach();
delete _pool[index];
_pool[index] = nullptr;
--_threadCount;
printf("thread[%d] exit\n", index);
break;
}
}
}
if(task.first != nullptr){
++_busyCount;
task.first(task.second);
--_busyCount;
}
}
}
使用示例
#include <stdio.h>
#include "ThreadPool.h"
int main()
{
int initThreadCount = 1; // 初始线程个数
int maxThreadCount = 4; // 最大线程个数
int idleSec = 6; // 当某个线程空闲6秒时,将退出
threadpool::ThreadPool pool(initThreadCount, maxThreadCount, idleSec);
// 向线程池中添加任务
pool.AddTask([](void* arg){
printf("this is task1.\n");
});
pool.AddTask([](void* arg){
printf("this is task2.\n");
});
pool.AddTask([](void* arg){
printf("this is task3.\n");
});
}
版权声明:本文为Huoon原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。