//threadpool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <future>
#include <functional>
#include <iostream>
#include <queue>
#include <mutex>
#include <memory>
#ifdef WIN32
#include <windows.h>
#else
#include <sys/time.h>
#endif
using namespace std;
void getNow(timeval *tv);
int64_t getNowMs();
#define TNOW getNow()
#define TNOWMS getNowMs()
/
/**
* @file thread_pool.h
* @brief c++11线程池类
* 使用说明:
* ThreadPool tpool; // 封装线程池
* tpool.init(5); // 初始化线程池线程数
* tpool.start(); // 启动线程方式
* tpool.exec(testFunction, 10); // 将任务丢到线程池中,返回future异步获取结果
* tpool.waitForAllDone(1000); // 等待线程池结束,参数<0时, 表示无限等待
* tpool.stop(); //外部结束线程池
*/
class ThreadPool {
protected:
// 任务结构体
struct TaskFunc {
TaskFunc(uint64_t expireTime): _expireTime(expireTime){}
std::function<void()> _func; // 要执行的任务函数
int64_t _expireTime = 0; //超时的绝对时间
};
// 指向任务结构体的指针
typedef shared_ptr<TaskFunc> TaskFuncPtr;
public:
/** @brief 构造函数 */
ThreadPool();
/** @brief 析构函数*/
virtual ~ThreadPool();
/** @brief 初始化 @param num 工作线程个数 */
bool init(size_t num);
/** @brief 获取线程个数 @return size_t 线程个数 */
size_t getThreadNum() {
std::unique_lock<std::mutex> lock(_mutex);
return _threads.size();
}
/** @brief 获取当前线程池的任务数 @return size_t 线程池的任务数 */
size_t getJobNum() {
std::unique_lock<std::mutex> lock(_mutex);
return _tasks.size();
}
/** @brief 停止所有线程, 会等待所有线程结束 */
void stop();
/** @brief 创建并启动线程 */
bool start();
/**
* @brief 用线程池启用任务(F是function, Args是参数)
* @param bind function
* @return 返回任务的future对象, 可以通过这个对象来获取返回值
*/
template <class F, class... Args>
auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
{
return exec(0, f, args...);
}
/**
* @brief 用线程池启用任务(F是function, Args是参数)
* @param 超时时间,单位ms (为0时不做超时控制);若任务超时,此任务将被丢弃
* @param bind function
* @return 返回任务的future对象, 可以通过这个对象来获取返回值
*/
template <class F, class... Args>
auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>
{
// 获取现在时间
int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs);
//定义返回值类型,推导返回值
using RetType = decltype(f(args...));
// 封装任务,绑定函数和函数的参数
auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
// 封装任务指针,设置过期时间
TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);
// 封装具体的执行函数
fPtr->_func = [task]() {
(*task)();
};
std::unique_lock<std::mutex> lock(_mutex);
// 插入任务
_tasks.push(fPtr);
// 唤醒阻塞的线程,可以考虑只有任务队列为空的情况再去notify
_condition.notify_one();
// 返回绑定任务的 future
return task->get_future();;
}
/**
* @brief 等待当前任务队列中, 所有工作全部结束(队列无任务).
* @param millsecond 等待的时间(ms), -1:永远等待
* @return true, 所有工作都处理完毕;false,超时退出
*/
bool waitForAllDone(int millsecond = -1);
protected:
/** @brief 获取任务 @return TaskFuncPtr */
bool get(TaskFuncPtr&task);
/** @brief 线程池是否退出 */
bool isTerminate() { return _bTerminate; }
/** @brief 线程运行态 */
void run();
protected:
queue<TaskFuncPtr> _tasks; //任务队列
std::vector<std::thread*> _threads; // 工作线程
std::mutex _mutex;
std::condition_variable _condition;
size_t _threadNum; // 线程的数量
bool _bTerminate; // 线程池终止标志位
std::atomic<int> _atomic{ 0 }; // 原子操作
};
#endif // THREADPOOL_H
#include "threadpool.h"
ThreadPool::ThreadPool()
: _threadNum(1), _bTerminate(false)
{}
ThreadPool::~ThreadPool() {
stop();
}
bool ThreadPool::init(size_t num) {
std::unique_lock<std::mutex> lock(_mutex);
if (!_threads.empty()) {
return false;
}
// 设置线程数量
_threadNum = num;
return true;
}
void ThreadPool::stop() {
{
std::unique_lock<std::mutex> lock(_mutex); //加锁
_bTerminate = true; // 触发退出
_condition.notify_all(); // 唤醒其他线程
}
for (size_t i = 0; i < _threads.size(); i++) {
if(_threads[i]->joinable()) {
_threads[i]->join(); // 等线程推出
}
delete _threads[i];
_threads[i] = NULL;
}
std::unique_lock<std::mutex> lock(_mutex); // 加锁
_threads.clear();
}
bool ThreadPool::start() {
std::unique_lock<std::mutex> lock(_mutex);
// 线程已经启动,不能再次启动
if (!_threads.empty()) {
return false;
}
for (size_t i = 0; i < _threadNum; i++) {
_threads.push_back(new thread(&ThreadPool::run, this));
}
return true;
}
bool ThreadPool::get(TaskFuncPtr& task) {
std::unique_lock<std::mutex> lock(_mutex); // 加锁
// 没有任务了
if (_tasks.empty()) {
_condition.wait(lock, [this] { return _bTerminate // 要么终止线程池 bTerminate_设置为true
|| !_tasks.empty(); // 要么补充了新的任务,任务队列不为空
}); // notify -> 1.退出线程池; 2.任务队列不为空(补充了新的任务)
}
// 线程池终止了
if (_bTerminate) {
return false;
}
// 有任务存在
if (!_tasks.empty()) {
task = std::move(_tasks.front()); // 移动语义,获取一个任务
_tasks.pop(); // 释放已被移动的任务
return true;
}
return false;
}
void ThreadPool::run() { // 执行任务的线程
//调用处理部分
while (!isTerminate()) { // 先判断是不是要停止
TaskFuncPtr task;
// 1、读取任务
bool ok = get(task);
// 拿到了任务
if (ok) {
// 任务的执行是原子操作,一气呵成
++_atomic;
try {
if (task->_expireTime != 0 && task->_expireTime < TNOWMS) {
//超时任务,是否需要处理?
}
else {
// 2、执行任务
task->_func();
}
}
catch (...)
{}
--_atomic;
// 至此,任务全都执行完毕了
std::unique_lock<std::mutex> lock(_mutex);
// 3、检测是否所有任务都运行完毕
if (_atomic == 0 && _tasks.empty()) {
_condition.notify_all(); // 这里只是为了通知waitForAllDone
}
}
}
}
bool ThreadPool::waitForAllDone(int millsecond) { // 指定超时的时间1000ms
std::unique_lock<std::mutex> lock(_mutex);
if (_tasks.empty()) {
return true;
}
// 超时,任务队列已清空
if (millsecond < 0) {
_condition.wait(lock, [this] { return _tasks.empty(); });
return true;
}
// 不超时,1.等待时间结束,2.任务队列已清空
else {
return _condition.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return _tasks.empty(); });
}
}
int gettimeofday(struct timeval &tv) {
#if WIN32
time_t clock;
struct tm tm;
SYSTEMTIME wtm;
GetLocalTime(&wtm);
tm.tm_year = wtm.wYear - 1900;
tm.tm_mon = wtm.wMonth - 1;
tm.tm_mday = wtm.wDay;
tm.tm_hour = wtm.wHour;
tm.tm_min = wtm.wMinute;
tm.tm_sec = wtm.wSecond;
tm. tm_isdst = -1;
clock = mktime(&tm);
tv.tv_sec = clock;
tv.tv_usec = wtm.wMilliseconds * 1000;
return 0;
#else
return ::gettimeofday(&tv, 0);
#endif
}
void getNow(timeval *tv) {
#if TARGET_PLATFORM_IOS || TARGET_PLATFORM_LINUX
int idx = _buf_idx;
*tv = _t[idx];
if(fabs(_cpu_cycle - 0) < 0.0001 && _use_tsc) {
addTimeOffset(*tv, idx);
}
else {
TC_Common::gettimeofday(*tv);
}
#else
gettimeofday(*tv);
#endif
}
int64_t getNowMs() {
struct timeval tv;
getNow(&tv);
return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000;
}
// main.cc
#include <iostream>
#include "threadpool.h"
using namespace std;
void func0() {
cout << "func0()" << endl;
}
void func1(int a) {
cout << "func1 int =" << a << endl;
}
void func2(int a, string b) {
cout << "func2() a=" << a << ", b=" << b<< endl;
}
void test1() {// 简单测试线程池
ThreadPool threadpool; // 封装一个线程池
threadpool.init(1); // 设置线程的数量
threadpool.start(); // 启动线程池,创建线程,开始调度
// 装入要执行的任务
threadpool.exec(1000,func0);
threadpool.exec(func1, 10);
threadpool.exec(func2, 20, "hello"); // 插入任务
threadpool.waitForAllDone();
threadpool.stop();
}
int func1_future(int a) {
cout << "func1() a=" << a << endl;
return a;
}
string func2_future(int a, string b) {
cout << "func1() a=" << a << ", b=" << b<< endl;
return b;
}
// 测试任务函数返回值
void test2() {
ThreadPool threadpool;
threadpool.init(1);
threadpool.start(); // 启动线程池
// 假如要执行的任务
std::future<decltype (func1_future(0))> result1 = threadpool.exec(func1_future, 10);
auto result2 = threadpool.exec(func2_future, 20, "hello");
std::cout << "result1: " << result1.get() << std::endl;
std::cout << "result2: " << result2.get() << std::endl;
threadpool.waitForAllDone();
threadpool.stop();
}
class Test {
public:
int test(int i){
cout << _name << ", i = " << i << endl;
return i;
}
void setName(string name){
_name = name;
}
string _name;
};
// 测试类对象函数的绑定
void test3() {
ThreadPool threadpool;
threadpool.init(1);
threadpool.start(); // 启动线程池
Test t1;
Test t2;
t1.setName("Test1");
t2.setName("Test2");
auto f1 = threadpool.exec(std::bind(&Test::test, &t1, std::placeholders::_1), 10);
auto f2 = threadpool.exec(std::bind(&Test::test, &t2, std::placeholders::_1), 20);
threadpool.waitForAllDone();
cout << "t1 " << f1.get() << endl;
cout << "t2 " << f2.get() << endl;
}
void func2_1(int a, int b) {
cout << "func2_1 a + b = " << a+b << endl;
}
int func2_1(string a, string b) {
cout << "func2_1 a + b = " << a << b<< endl;
return 0;
}
// 简单测试线程池 threadpool;
void test4() {
ThreadPool threadpool;
threadpool.init(1);
threadpool.start();
// 假如要执行的任务
threadpool.exec((void(*)(int, int))func2_1, 10, 20); // 插入任务
threadpool.exec((int(*)(string, string))func2_1, "Tom", " and Jerry");
threadpool.waitForAllDone(); // 等待都执行完退出
threadpool.stop(); // 这里才是真正执行退出
}
int main() {
// test1(); // 简单测试线程池
// test2(); // 测试任务函数返回值
// test3(); // 测试类对象函数的绑定
test4();
cout << "main finish!" << endl;
return 0;
}
版权声明:本文为you_fathe原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。