使用C++实现一个简单的线程池

  • Post author:
  • Post category:其他


线程池类似于内存池,连接池等,在一开始创建好一定数量的线程,当有任务需要处理时,就将该任务丢进线程池的某个线程中处理,而不是每个任务都启动一个线程。使用线程池的好处在于,不需要频繁的创建线程,因为创建线程是一个较耗资源的操作。因此,相对而言一开始创建好线程,要比动态创建线程对性能影响更小。其次,线程池可以更好的将线程管理起来,对外提供简单的接口,内部完成对线程的调度,这也是高内聚的一个思想,将复杂封装起来,对外在满足业务场景的前提下尽可能简单。线程池作为一个独立的非业务模块,任何需要处理多任务的场景都可以使用该组件,这样更有利于代码复用。



需求分析:

  1. 线程池可以根据自定义参数实现线程数量控制,比如,可以设置初始线程数量,可以设置最大线程数量,可以设置线程空闲时间。
  • 初始线程数量为线程池初始化时,创建线程的数量。此数量是线程池最小线程数,任何时候线程池都应该有这么多线程。
  • 最大线程数量为线程池支持的最大线程数量,线程数量不是越多越好,线程数过多会导致大量的线程切换,这本身又会造成性能问题,因此使用最大线程数量来限制线程数量。 一般情况下,初始线程数量小于最大线程数量,当任务数量较多时,线程池能够动态扩展线程数量。同样,线程比较空闲时,为了不占用过多的线程资源,可以将空闲线程退出。
  • 线程空闲时间,当线程池在一段时间内都没有事情可干,那么就将此线程退出。
  1. 任务调度设计,任务进入线程池,由哪个线程处理,如何平均的将任务分配给各个线程?这里需要一个队列,用来缓存任务,外部通过接口将任务推到队列,线程池内部需要快速将任务分配给某个线程。这里设计为让线程内部去竞争获得任务,这样就不需要设计任务分配相关算法,简化了设计。线程池中的各个线程,当有任务的时候从队列里面拿任务,当没有任务的时候希望线程能够挂起。使用条件变量,当没有任务的时候,将线程挂起来,当有任务时,由外部通知条件变量,唤醒线程,获取任务,执行任务。这样的设计,在同一时刻只能有一个线程拿到一个任务,这也是对任务队列的保护。在任务运行时也可以达到并行的效果,这样线程池的设计目的也就达到了。



代码实现

// ThreadPool.h
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__
#include <functional>
#include <queue>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>
namespace threadpool
{

class ThreadPool
{
    using TaskType = std::function<void (void*)>;

    static const int INIT_NUM = 1;
    static const int MAX_NUM = 4;
    static const int MAX_IDLE_TIME_SECOND = 6;
public:
    ThreadPool(int initNum, int maxNum, int idleSec);
    ThreadPool();
    ~ThreadPool();

public:
    void AddTask(const TaskType& task, void* arg);

private:
    void Init();
    void ThreadRoutine(int index);
    void PoolGrow();
    
private:
    int _initNum;
    int _maxNum;
    int _idleSec;
    bool _stop;
    std::queue<std::pair<TaskType, void*>> _taskQueue;
    std::vector<std::thread*> _pool;
    std::mutex _mutex;
    std::condition_variable _cond;
    std::atomic<int> _busyCount;
    std::atomic<int> _threadCount;
};


} // namespace threadpool

#endif // __THREAD_POOL_H__
// ThreadPool.cpp
#include <assert.h>
#include <stdio.h>
#include "ThreadPool.h"

using namespace threadpool;

const int ThreadPool::INIT_NUM;
const int ThreadPool::MAX_NUM;
const int ThreadPool::MAX_IDLE_TIME_SECOND;

ThreadPool::ThreadPool(int initNum, int maxNum, int idleSec)
    : _initNum(initNum)
    , _maxNum(maxNum)
    , _idleSec(idleSec)
    , _stop(false)
    , _busyCount(0)
    , _threadCount(0)
{
    Init();
}

ThreadPool::ThreadPool()
    : _initNum(ThreadPool::INIT_NUM)
    , _maxNum(ThreadPool::MAX_NUM)
    , _idleSec(ThreadPool::MAX_IDLE_TIME_SECOND)
    , _stop(false)
    , _busyCount(0)
    , _threadCount(0)
{
    Init();
}


void ThreadPool::Init()
{
    if(_idleSec <= 0){
        _idleSec = ThreadPool::MAX_IDLE_TIME_SECOND;
    }
    _pool.reserve(_maxNum);
    for(int i = 0; i < _maxNum; ++i){
        if(i < _initNum){
            _pool.push_back(new std::thread(&ThreadPool::ThreadRoutine, this, i));
        }else{
            _pool.push_back(nullptr);
        }
    }

    _threadCount.store(_initNum);
}

ThreadPool::~ThreadPool()
{
    {
        std::lock_guard<std::mutex> lock(_mutex);
        _stop = true;
    }
    _cond.notify_all();
    for(int i = 0; i < _maxNum; ++i){
        auto thread = _pool[i];
        if(thread && thread->joinable()){
            thread->join();
            delete thread;
        }
    }
}


void ThreadPool::AddTask(const TaskType& task, void* arg)
{
    {
        std::lock_guard<std::mutex> lock(_mutex);
        _taskQueue.emplace(task, arg);
    }
   
    _cond.notify_one();

    PoolGrow();
}

void ThreadPool::PoolGrow()
{
    int busy = _busyCount.load();
    int threadCount = _threadCount.load();
    printf("count:%d, busy:%d\n", threadCount, busy);
    if(threadCount == busy){
        if(threadCount < _maxNum){
            for(int i = 0; i < _maxNum; ++i){
                if(_pool[i] == nullptr){
                    _pool[i] = new std::thread(&ThreadPool::ThreadRoutine, this, i);
                    printf("add thread[%d]\n", i);
                    ++_threadCount;
                    break;
                }
            }
        }
    }
}



void ThreadPool::ThreadRoutine(int index)
{
    while(1){
        std::pair<TaskType, void*> task(nullptr, 0);
        {
            std::cv_status waitStatus = std::cv_status::no_timeout;
            std::unique_lock<std::mutex> lock(_mutex);
            while(_taskQueue.empty() && waitStatus != std::cv_status::timeout && !_stop){
                int idleTime = MAX_IDLE_TIME_SECOND;
                waitStatus =  _cond.wait_for(lock, std::chrono::seconds(_idleSec));
            }
            if(!_taskQueue.empty()){
                task = _taskQueue.front();
                _taskQueue.pop();
            }else if(_stop){
                break;
            }else if(waitStatus == std::cv_status::timeout){
                if(_threadCount > _initNum){
                    _pool[index]->detach();
                    delete _pool[index];
                    _pool[index] = nullptr;
                    --_threadCount;
                    printf("thread[%d] exit\n", index);
                    break;
                }
            }
        }

        if(task.first != nullptr){
            ++_busyCount;
            task.first(task.second);
            --_busyCount;
        }
    }
}



使用示例

#include <stdio.h>
#include "ThreadPool.h"
int main()
{
	int initThreadCount = 1;    // 初始线程个数
	int maxThreadCount = 4;     // 最大线程个数
	int idleSec = 6;            // 当某个线程空闲6秒时,将退出
	threadpool::ThreadPool pool(initThreadCount, maxThreadCount, idleSec);
	
	// 向线程池中添加任务
	pool.AddTask([](void* arg){
	  printf("this is task1.\n");
	});
	
	pool.AddTask([](void* arg){
	  printf("this is task2.\n");
	});
	
	pool.AddTask([](void* arg){
	  printf("this is task3.\n");
	});
}



版权声明:本文为Huoon原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。