C++11:综合案例:线程池的实现

  • Post author:
  • Post category:其他

//threadpool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <future>
#include <functional>
#include <iostream>
#include <queue>
#include <mutex>
#include <memory>
#ifdef WIN32
#include <windows.h>
#else
#include <sys/time.h>
#endif
using namespace std;

void getNow(timeval *tv);
int64_t getNowMs();

#define TNOW    getNow()
#define TNOWMS  getNowMs()

/
/**
 * @file thread_pool.h
 * @brief c++11线程池类
 * 使用说明:
 * ThreadPool tpool; // 封装线程池
 * tpool.init(5); // 初始化线程池线程数
 * tpool.start(); // 启动线程方式
 * tpool.exec(testFunction, 10); // 将任务丢到线程池中,返回future异步获取结果
 * tpool.waitForAllDone(1000); // 等待线程池结束,参数<0时, 表示无限等待
 * tpool.stop(); //外部结束线程池
 */

class ThreadPool {
protected:
	// 任务结构体 
	struct TaskFunc {
		TaskFunc(uint64_t expireTime): _expireTime(expireTime){}
		std::function<void()> _func; // 要执行的任务函数
		int64_t _expireTime = 0; //超时的绝对时间
	};
	// 指向任务结构体的指针
	typedef shared_ptr<TaskFunc> TaskFuncPtr;

public:
	/** @brief 构造函数 */
	ThreadPool();

	/** @brief 析构函数*/
	virtual ~ThreadPool();

	/** @brief 初始化 @param num 工作线程个数 */
	bool init(size_t num);

	/** @brief 获取线程个数 @return size_t 线程个数 */
	size_t getThreadNum() {
		std::unique_lock<std::mutex> lock(_mutex);
		return _threads.size();
	}

	/** @brief 获取当前线程池的任务数 @return size_t 线程池的任务数 */
	size_t getJobNum() {
		std::unique_lock<std::mutex> lock(_mutex);
		return _tasks.size();
	}

	/** @brief 停止所有线程, 会等待所有线程结束 */
	void stop();

	/** @brief 创建并启动线程 */
	bool start(); 

	/**
	* @brief 用线程池启用任务(F是function, Args是参数)
	* @param bind function
	* @return 返回任务的future对象, 可以通过这个对象来获取返回值
	*/
	template <class F, class... Args>
	auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))> 
    {
		return exec(0, f, args...);
	}

	/**
	* @brief 用线程池启用任务(F是function, Args是参数)
	* @param 超时时间,单位ms (为0时不做超时控制);若任务超时,此任务将被丢弃
	* @param bind function
	* @return 返回任务的future对象, 可以通过这个对象来获取返回值
	*/
	template <class F, class... Args>
	auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>
	{
		// 获取现在时间
        int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs);  
		//定义返回值类型,推导返回值
		using RetType = decltype(f(args...));
		// 封装任务,绑定函数和函数的参数
		auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        // 封装任务指针,设置过期时间
		TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);  
		// 封装具体的执行函数
        fPtr->_func = [task]() { 
			(*task)();
		};

		std::unique_lock<std::mutex> lock(_mutex);
        // 插入任务
		_tasks.push(fPtr);
        // 唤醒阻塞的线程,可以考虑只有任务队列为空的情况再去notify              
		_condition.notify_one();
        // 返回绑定任务的 future
		return task->get_future();;
	}

	/**
	* @brief 等待当前任务队列中, 所有工作全部结束(队列无任务).
	* @param millsecond 等待的时间(ms), -1:永远等待
	* @return true, 所有工作都处理完毕;false,超时退出
	*/
	bool waitForAllDone(int millsecond = -1);

protected:
	/** @brief 获取任务  @return TaskFuncPtr */
	bool get(TaskFuncPtr&task);

	/** @brief 线程池是否退出 */
	bool isTerminate() { return _bTerminate; }

	/** @brief 线程运行态 */
	void run();

protected:
	queue<TaskFuncPtr> _tasks; //任务队列
	std::vector<std::thread*> _threads; // 工作线程
	std::mutex _mutex;
	std::condition_variable _condition;
	size_t _threadNum; // 线程的数量 
	bool _bTerminate; // 线程池终止标志位
	std::atomic<int> _atomic{ 0 }; // 原子操作
};

#endif // THREADPOOL_H

#include "threadpool.h"

ThreadPool::ThreadPool()
: _threadNum(1), _bTerminate(false)
{}

ThreadPool::~ThreadPool() {
    stop();
}

bool ThreadPool::init(size_t num) {
    std::unique_lock<std::mutex> lock(_mutex);

    if (!_threads.empty()) {
        return false;
    }

    // 设置线程数量
    _threadNum = num; 
    return true;
}

void ThreadPool::stop() {
    {
        std::unique_lock<std::mutex> lock(_mutex);  //加锁
        _bTerminate = true;     // 触发退出
        _condition.notify_all(); // 唤醒其他线程
    }

    for (size_t i = 0; i < _threads.size(); i++) {
        if(_threads[i]->joinable()) { 
            _threads[i]->join(); // 等线程推出
        }
        delete _threads[i];
        _threads[i] = NULL;
    }

    std::unique_lock<std::mutex> lock(_mutex); // 加锁
    _threads.clear();
}

bool ThreadPool::start() {
    std::unique_lock<std::mutex> lock(_mutex);

    // 线程已经启动,不能再次启动
    if (!_threads.empty()) {
        return false;
    }

    for (size_t i = 0; i < _threadNum; i++) {
        _threads.push_back(new thread(&ThreadPool::run, this));
    }
    return true;
}

bool ThreadPool::get(TaskFuncPtr& task) {
    std::unique_lock<std::mutex> lock(_mutex); // 加锁

    // 没有任务了
    if (_tasks.empty()) {
        _condition.wait(lock, [this] { return _bTerminate  // 要么终止线程池 bTerminate_设置为true
                    || !_tasks.empty();  // 要么补充了新的任务,任务队列不为空
        }); // notify -> 1.退出线程池; 2.任务队列不为空(补充了新的任务)
    }

    // 线程池终止了
    if (_bTerminate) {
        return false;
    }

    // 有任务存在   
    if (!_tasks.empty()) {
        task = std::move(_tasks.front());  // 移动语义,获取一个任务
        _tasks.pop(); // 释放已被移动的任务
        return true; 
    }

    return false;
}

void ThreadPool::run() { // 执行任务的线程
    //调用处理部分
    while (!isTerminate()) { // 先判断是不是要停止
        TaskFuncPtr task;
        // 1、读取任务
        bool ok = get(task); 
        // 拿到了任务
        if (ok) {
            // 任务的执行是原子操作,一气呵成
            ++_atomic;
            try {
                if (task->_expireTime != 0 && task->_expireTime < TNOWMS) {
                    //超时任务,是否需要处理?
                }
                else {
                    // 2、执行任务
                    task->_func();  
                }
            }
            catch (...) 
            {}

            --_atomic;

            // 至此,任务全都执行完毕了
            std::unique_lock<std::mutex> lock(_mutex);
            
            // 3、检测是否所有任务都运行完毕
            if (_atomic == 0 && _tasks.empty())  {
                _condition.notify_all();  // 这里只是为了通知waitForAllDone
            }
        }
    }
}

bool ThreadPool::waitForAllDone(int millsecond) { // 指定超时的时间1000ms
    std::unique_lock<std::mutex> lock(_mutex);

    if (_tasks.empty()) {
        return true;
    }
    // 超时,任务队列已清空
    if (millsecond < 0) {
        _condition.wait(lock, [this] { return _tasks.empty(); });
        return true;
    }
    // 不超时,1.等待时间结束,2.任务队列已清空
    else {
        return _condition.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return _tasks.empty(); });
    }
}

int gettimeofday(struct timeval &tv) {
#if WIN32
    time_t clock;
    struct tm tm;
    SYSTEMTIME wtm;
    GetLocalTime(&wtm);
    tm.tm_year   = wtm.wYear - 1900;
    tm.tm_mon   = wtm.wMonth - 1;
    tm.tm_mday   = wtm.wDay;
    tm.tm_hour   = wtm.wHour;
    tm.tm_min   = wtm.wMinute;
    tm.tm_sec   = wtm.wSecond;
    tm. tm_isdst  = -1;
    clock = mktime(&tm);
    tv.tv_sec = clock;
    tv.tv_usec = wtm.wMilliseconds * 1000;

    return 0;
#else
    return ::gettimeofday(&tv, 0);
#endif
}

void getNow(timeval *tv) {
#if TARGET_PLATFORM_IOS || TARGET_PLATFORM_LINUX

    int idx = _buf_idx;
    *tv = _t[idx];
    if(fabs(_cpu_cycle - 0) < 0.0001 && _use_tsc) {
        addTimeOffset(*tv, idx);
    }
    else {
        TC_Common::gettimeofday(*tv);
    }
#else
    gettimeofday(*tv);
#endif
}

int64_t getNowMs() {
    struct timeval tv;
    getNow(&tv);

    return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000;
}

// main.cc
#include <iostream>
#include "threadpool.h"
using namespace std;

void func0() {
    cout << "func0()" << endl;
}

void func1(int a) {
    cout << "func1 int =" << a << endl;
}

void func2(int a, string b) {
    cout << "func2() a=" << a << ", b=" << b<< endl;
}

void test1() {// 简单测试线程池
    ThreadPool threadpool; // 封装一个线程池
    threadpool.init(1); // 设置线程的数量
    threadpool.start(); // 启动线程池,创建线程,开始调度
    // 装入要执行的任务
    threadpool.exec(1000,func0); 
    threadpool.exec(func1, 10);
    threadpool.exec(func2, 20, "hello"); // 插入任务
    threadpool.waitForAllDone(); 
    threadpool.stop();     
}

int func1_future(int a) {
    cout << "func1() a=" << a << endl;
    return a;
}

string func2_future(int a, string b) {
    cout << "func1() a=" << a << ", b=" << b<< endl;
    return b;
}

// 测试任务函数返回值
void test2() {
    ThreadPool threadpool;
    threadpool.init(1);
    threadpool.start(); // 启动线程池
    // 假如要执行的任务
    std::future<decltype (func1_future(0))> result1 = threadpool.exec(func1_future, 10);
    auto result2 = threadpool.exec(func2_future, 20, "hello");

    std::cout << "result1: " << result1.get() << std::endl;
    std::cout << "result2: " << result2.get() << std::endl;
    threadpool.waitForAllDone();
    threadpool.stop();
}

class Test {
public:
    int test(int i){
        cout << _name << ", i = " << i << endl;
        return i;
    }

    void setName(string name){
        _name = name;
    }
    string _name;
};

// 测试类对象函数的绑定
void test3() { 
    ThreadPool threadpool;
    threadpool.init(1);
    threadpool.start(); // 启动线程池
    Test t1;
    Test t2;
    t1.setName("Test1");
    t2.setName("Test2");
    auto f1 = threadpool.exec(std::bind(&Test::test, &t1, std::placeholders::_1), 10);
    auto f2 = threadpool.exec(std::bind(&Test::test, &t2, std::placeholders::_1), 20);
    threadpool.waitForAllDone();
    cout << "t1 " << f1.get() << endl;
    cout << "t2 " << f2.get() << endl;
}

void func2_1(int a, int b) {
    cout << "func2_1 a + b = " << a+b << endl;
}

int func2_1(string a, string b) {
    cout << "func2_1 a + b = " << a << b<< endl;
    return 0;
}

// 简单测试线程池 threadpool;
void test4() {    
    ThreadPool threadpool; 
    threadpool.init(1);             
    threadpool.start();              
    // 假如要执行的任务
    threadpool.exec((void(*)(int, int))func2_1, 10, 20); // 插入任务
    threadpool.exec((int(*)(string, string))func2_1, "Tom", " and Jerry");
    threadpool.waitForAllDone(); // 等待都执行完退出
    threadpool.stop(); // 这里才是真正执行退出
}

int main() {
    // test1(); // 简单测试线程池
    // test2(); // 测试任务函数返回值
    // test3(); // 测试类对象函数的绑定
    test4();
    cout << "main finish!" << endl;
    return 0;
}

版权声明:本文为you_fathe原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。