C++11中的std::future是一个模板类。std::future提供了一种用于访问异步操作结果的机制。std::future所引用的共享状态不能与任何其它异步返回的对象共享(与std::shared_future相反)( std::future references shared state that is not shared with any other asynchronous return objects (as opposed to std::shared_future))。一个future是一个对象,它可以从某个提供者的对象或函数中检索值,如果在不同的线程中,则它可以正确地同步此访问(A future is an object that can retrieve a value from some provider object or function, properly synchronizing this access if in different threads)。
有效的future是与共享状态(shared state)关联的future对象,可以通过调用以下函数(provider)来构造future对象:std::async、std::promise::get_future、std::packaged_task::get_future。future对象仅在它们是有效时才有用。
std::aysnc介绍参考:https://blog.csdn.net/fengbingchun/article/details/104133494
std::promise介绍参考:https://blog.csdn.net/fengbingchun/article/details/104124174
std::packaged_task介绍参考:https://blog.csdn.net/fengbingchun/article/details/104127352
模板类std::future成员函数包括:
1. 构造函数:(1).不带参数的默认构造函数,此对象没有共享状态,因此它是无效的,但是可以通过移动赋值的方式将一个有效的future值赋值给它;(2).禁用拷贝构造;(3).支持移动构造。
2. 析构函数:销毁future对象,它是异常安全的。
3. get函数:(1).当共享状态就绪时,返回存储在共享状态中的值(或抛出异常)。(2).如果共享状态尚未就绪(即提供者尚未设置其值或异常),则该函数将阻塞调用的线程直到就绪。(3).当共享状态就绪后,则该函数将取消阻塞并返回(或抛出)释放其共享状态,这使得future对象不再有效,因此对于每一个future共享状态,该函数最多应被调用一次。(4).std::future<void>::get()不返回任何值,但仍等待共享状态就绪并释放它。(5).共享状态是作为原子操作(atomic operation)被访问。
4. operator=:(1).禁用拷贝赋值。(2).支持移动赋值:如果在调用之前,此对象是有效的(即它已经访问共享状态),则将其与先前已关联的共享状态解除关联。如果它是与先前共享状态关联的唯一对象,则先前的共享状态也会被销毁。
5. share函数:获取共享的future,返回一个std::shared_future对象,该对象获取future对象的共享状态。future对象将不再有效。
6. valid函数:检查共享状态的有效性,返回当前的future对象是否与共享状态关联。一旦调用了std::future::get()函数,再调用此函数将返回false。
7. wait函数:(1).等待共享状态就绪。(2).如果共享状态尚未就绪(即提供者尚未设置其值或异常),则该函数将阻塞调用的线程直到就绪。(3).当共享状态就绪后,则该函数将取消阻塞并void返回。
8. wait_for函数:(1).等待共享状态在指定的时间内(time span)准备就绪。(2). 如果共享状态尚未就绪(即提供者尚未设置其值或异常),则该函数将阻塞调用的线程直到就绪或已达到设置的时间。(3).此函数的返回值类型为枚举类future_status。此枚举类有三种label:ready:共享状态已就绪;timeout:在指定的时间内未就绪;deferred:共享状态包含了一个延迟函数(deferred function)。
9. wait_until函数:(1). 等待共享状态在指定的时间点(time point)准备就绪。(2). 如果共享状态尚未就绪(即提供者尚未设置其值或异常),则该函数将阻塞调用的线程直到就绪或已达到指定的时间点。(3).此函数的返回值类型为枚举类future_status。
详细用法见下面的测试代码,下面是从其他文章中copy的测试代码,部分作了调整,详细内容介绍可以参考对应的reference:
C++11中的std::future是一个模板类。std::future提供了一种用于访问异步操作结果的机制。std::future所引用的共享状态不能与任何其它异步返回的对象共享(与std::shared_future相反)( std::future references shared state that is not shared with any other asynchronous return objects (as opposed to std::shared_future))。一个future是一个对象,它可以从某个提供者的对象或函数中检索值,如果在不同的线程中,则它可以正确地同步此访问(A future is an object that can retrieve a value from some provider object or function, properly synchronizing this access if in different threads)。
有效的future是与共享状态(shared state)关联的future对象,可以通过调用以下函数(provider)来构造future对象:std::async、std::promise::get_future、std::packaged_task::get_future。future对象仅在它们是有效时才有用。
std::async,std::future创建后台任务
希望线程返回一个结果std::async启动一个异步任务,返回一个std::future对象。什么叫启动一个异步任务,创建一个线程并开始执行对应的线程入口函数,它返回一个std::future对象。通过std::future成员函数get。有人称呼std::future提供了一种访问异步操作结果的机制,就是说这个结果你可能没有办法马上拿到结果。在线程执行过程完毕的时候,你就能够拿到结果了。
std::future<int> result = std::async(mythread)
cout << result.get() << endl;
std::package_task:打包任务,把任务包装起来
是一个类模板,他的模板参数是各种调用对象,通过std::packaged_task来把各种调用对象包装起来,方便将来作为线程入口
// project4.cpp: 定义控制台应用程序的入口点。
//
#include "stdafx.h"
#include <iostream>
#include <vector>
#include <map>
#include <string>
#include <thread>
#include <list>
#include <mutex>
#include <future>
using namespace std;
//int mythread(int mypar)
//{
// cout << "mythread() start" << " threadid = " << std::this_thread::get_id() << endl;
// std::chrono::milliseconds dura(5000); //1秒 = 1000毫秒,所以5000毫秒 = 5秒
// std::this_thread::sleep_for(dura); //休息一定的时长
// return 5;
//}
//
void mythread2(std::future<int> &tmpf) //注意参数
//void mythread2(std::shared_future<int> &tmpf) //注意参数
//{
// cout << "mythread2() start" << " threadid = " << std::this_thread::get_id() << endl;
// auto result = tmpf.get(); //获取值,只能get一次否则会报异常
// //为什么第二次get这个future我们会得到一个异常;主要是因为get函数的设计,是一个移动语义
// cout << "mythread2 result = " << result << endl;
// return;
//}
//int g_mycout = 0; //定义一个全局量
//std::atomic<int> g_mycout = 0; //我们封装了一个类型为int的 对象(值);我们可以象操作一个int类型变量一样来操作这个g_mycout
//std::mutex g_my_mutex; //互斥量
//
//void mythread() //线程入口函数
//{
//
// for (int i = 0; i < 10000000; i++)
// {
// //g_my_mutex.lock(); //7秒钟实现了2000万次加锁和解锁;
// //g_mycout++;
// //...
// //...
// //g_my_mutex.unlock();
// g_mycout++; //对应的的操作是原子操作(不会被打断)
// }
//
// return;
//}
std::atomic<bool> g_ifend = false; //线程退出标记,这里是原子操作,防止读和写乱套;
void mythread()
{
std::chrono::milliseconds dura(1000); //1秒钟
while (g_ifend == false)
{
//系统没要求线程退出,所以本线程可以干自己想干的事情
cout << "thread id = " << std::this_thread::get_id() << " 运行中...." << endl;
std::this_thread::sleep_for(dura);
}
cout << "thread id = " << std::this_thread::get_id() << " 运行结束...." << endl;
return;
}
int main()
{
//一:std::future的其他成员函数,get()函数转移数据
//二:std::shared_future:也是个类模板,get()函数复制数据
//cout << "main" << " threadid = " << std::this_thread::get_id() << endl;
//std::packaged_task<int(int)> mypt(mythread); //我们把函数mythread通过packaged_task包装起来;
//std::thread t1(std::ref(mypt), 1); //线程直接开始执行,第二个参数作为线程入口函数的参数;
//t1.join(); //我们可以调用这个等待线程执行完毕,不调用这个不太行,会崩溃
std::future<int> result = mypt.get_future(); //std::future大家不陌生了,这个对象里含有线程入口函
std::shared_future<int> result_s(std::move(result));
bool ifcanget = result.valid();
std::shared_future<int> result_s(result.share()); //执行完毕后result_s里有值,而result里空了
ifcanget = result.valid();
//std::shared_future<int> result_s(mypt.get_future()); //通过get_future返回值直接构造了一个shared_future对象。
//auto mythreadresult = result_s.get();
//mythreadresult = result_s.get();
//
std::thread t2(mythread2, std::ref(result));
//std::thread t2(mythread2, std::ref(result_s));
//t2.join(); //等线程执行完毕
//cout << "I Love China!" << endl;
//三:原子操作std::atomic
//(3.1)原子操作概念引出范例
//互斥量:多线程编程中 保护共享数据: 锁,操作共享数据,开锁
//有两个线程,对一个变量进行操作,这个线程读该变量值,另一个线程往这个变量中写值。
//int atomvalue = 5;
读线程A
//int tmpvalue = atomvalue; //这里这个atoomvalue代表的是多个线程之间要共享的变量;
写线程B
//atomvalue = 6; //汇编代码的话;
//大家可以把原子操作理解成一种:不需要用到互斥量加锁(无锁)技术的多线程并发编程方式
//原子操作: 是在多线程中 不会被打断的 程序执行片段;原子操作,比互斥量效率上更胜一筹。
//互斥量的加锁一般是针对一个代码段(几行代码),而原子操作针对的一般都是一个变量,而不是一个代码段;
//原子操作,一般都是指“不可分割的操作”;也就是说这种操作状态要么是完成的,要么是没完成的,不可能出现半完成状态;
//std::atomic来代表原子操作 ,std::atomic是个类模板。其实std::atomic这个东西是用来封装某个类型的值的;
//(3.2)基本的std::atomic用法范例
//(3.3)老师的心得,一般用于计数或者统计(累计发送出去了多少个数据包,累计接收到了多少个数据包;)
/*thread mytobj1(mythread);
thread mytobj2(mythread);
mytobj1.join();
mytobj2.join();
cout << "两个线程执行完毕,最终的g_mycount的结果是:" << g_mycout << endl;
*/
thread mytobj1(mythread);
thread mytobj2(mythread);
std::chrono::milliseconds dura(5000); //5秒钟
std::this_thread::sleep_for(dura);
g_ifend = true; //对原子对象的写操作,让线程自行运行结束;
mytobj1.join();
mytobj2.join();
cout << "程序执行完毕,退出" << endl;
return 0;
}
2. sync
C++11中的std::async是个模板函数。std::async异步调用函数,在某个时候以Args作为参数(可变长参数)调用Fn,无需等待Fn执行完成就可返回,返回结果是个std::future对象。Fn返回的值可通过std::future对象的get成员函数获取。一旦完成Fn的执行,共享状态将包含Fn返回的值并ready。
std::async有两个版本:
1.无需显示指定启动策略,自动选择,因此启动策略是不确定的,可能是std::launch::async,也可能是std::launch::deferred,或者是两者的任意组合,取决于它们的系统和特定库实现。
2.允许调用者选择特定的启动策略。
std::async的启动策略类型是个枚举类enum class launch,包括:
1. std::launch::async:异步,启动一个新的线程调用Fn,该函数由新线程异步调用,并且将其返回值与共享状态的访问点同步。
2. std::launch::deferred:延迟,在访问共享状态时该函数才被调用。对Fn的调用将推迟到返回的std::future的共享状态被访问时(使用std::future的wait或get函数)。
参数Fn:可以为函数指针、成员指针、任何类型的可移动构造的函数对象(即类定义了operator()的对象)。Fn的返回值或异常存储在共享状态中以供异步的std::future对象检索。
参数Args:传递给Fn调用的参数,它们的类型应是可移动构造的。
返回值:当Fn执行结束时,共享状态的std::future对象准备就绪。std::future的成员函数get检索的值是Fn返回的值。当启动策略采用std::launch::async时,即使从不访问其共享状态,返回的std::future也会链接到被创建线程的末尾。在这种情况下,std::future的析构函数与Fn的返回同步。
// project4.cpp: 定义控制台应用程序的入口点。
#include "stdafx.h"
#include <iostream>
#include <vector>
#include <map>
#include <string>
#include <thread>
#include <list>
#include <mutex>
#include <future>
using namespace std;
int mythread()
{
cout << "mythread() start" << " threadid = " << std::this_thread::get_id() << endl;
std::chrono::milliseconds dura(5000); //1秒 = 1000毫秒,所以5000毫秒 = 5秒
std::this_thread::sleep_for(dura); //休息一定的时长
return 1;
}
int main()
{
/*thread mytobj1(mythread);
thread mytobj2(mythread);
mytobj1.join();
mytobj2.join();
cout << "两个线程都执行完毕,最终的g_mycout的结果是" << g_mycout << endl;*/
//二:std::async深入谈
//(2.1)std::async参数详述 ,async用来创建 一个异步任务;
//cout << "main start" << " threadid = " << std::this_thread::get_id() << endl;
std::future<int> result = std::async(mythread);
std::future<int> result = std::async(std::launch::deferred,mythread); //deferred延迟调用,并且不创建新线程,延迟到future对象调用.get()或者.wait的时候才执行mythread()
std::future<int> result = std::async(std::launch::async, mythread);
std::future<int> result = std::async(std::launch::async| std::launch::deferred, mythread);
//std::future<int> result = std::async(mythread);
//cout << result.get() << endl;
//老师讲解过:参数 std::launch::deferred【延迟调用】 ,以及std::launch::async【强制创建一个线程】
//std::thread() 如果系统资源紧张,那么可能创建线程就会失败,那么执行std::thread()时整个程序可能崩溃。
//std::async()我们一般不叫创建线程(解释async能够创建线程),我们一般叫它创建 一个异步任务。
//std::async和std::thread最明显的不同,就是async有时候并不创建新线程。
//a)如果你用std::launch::deferred来调用async会怎么 样?
//std::launch::deferred延迟调用,并且不创建新线程,延迟到future对象调用.get()或者.wait的时候才执行mythread(),如果没有调用get或者wait,那么这个mythread()不会执行。
//b)std::launch::async:强制这个异步任务在新线程上执行,这 意味着,系统必须要给我创建出新线程来运行mythread();
//c)std::launch::async | std::launch::deferred
//这里这个 | :意味着调用async的行为可能是 “ 创建新线程并立即执行” 或者
//没有创建新线程并且延迟到调用 result.get()才开始执行任务入口函数, 两者居其一;
//d)我们不带额外参数;只给async函数一个 入口函数名;
//第九节老师长生了一点小错误, 其实,默认值应该是std::launch::async | std::launch::deferred;和c)效果完全一致。
//换句话说:系统会自行决定是异步(创建新线程)还是同步(不创建新线程)方式运行。
//自行决定是啥意思?系统如何决定是 异步(创建新线程)还是同步(不创建新线程)方式运行
//(2.2)std::async和std::thread的区别
//std::thread创建线程,如果系统资源紧张,创建线程失败,那么整个程序就会报异常崩溃(有脾气)
//int mythread(){return 1;}
//std::thread mytobj(mythread);
//mytobj.join();
//std::thread创建线程的方式,如果线程返回值,你想拿到这个值也不容易;
//std::async创建异步任务。可能创建也可能不创建线程。并且async调用方法很容易拿到线程入口函数的返回值;
//由于系统资源限制:
//(1)如果用std::thread创建的线程太多,则可能创建失败,系统报告异常,崩溃。
//(2)如果用std::async,一般就不会报异常不会崩溃,因为 如果系统资源紧张导致无法创建新线程的时候,
//std::async这种不加额外参数的调用 就不会创建新线程。而是后续谁调用了result.get()来请求结果,
//那么这个异步任务mythread就运行在执行这条get()语句所在的线程上。
//如果你强制std::async一定 要创建新线程,那么就必须使用 std::launch::async。承受的代价就是系统资源紧张时,程序崩溃。
//(3)经验:一个程序里,线程数量不宜超过100-200,时间片。
//(2.3)std::async不确定性问题的解决
//不加额外参数的std::async调用 ,让系统自行决定是否创建新线程。
//问题焦点在于 std::future<int> result = std::async(mythread); 写法
//这个异步任务到底有没有被推迟执行,(std::launch::async还是std::launch::deferred)
//std::future对象的wait_for函数,第10节讲过。
cout << "main start" << " threadid = " << std::this_thread::get_id() << endl;
std::future<int> result = std::async(mythread); //想判断async到底有没有创建新线程立即执行还是延迟(没创建新线程)执行。
std::future_status status = result.wait_for(0s); //(std::chrono::seconds(0));
if (status == std::future_status::deferred)
{
//线程被延迟执行了(系统资源紧张了,它给我采用std::launch::deferred策略了)
cout << result.get() << endl; //这个时侯才去调用了mythread();
}
else
{
//任务没有被推迟,已经开始运行了被,线程被创建了;
if (status == std::future_status::ready)
{
//线程成功返回
cout << "线程成功执行完毕并返回!" << endl;
cout << result.get() << endl;
}
else if (status == std::future_status::timeout)
{
//超时线程还没执行完
cout << "超时线程没执行完!" << endl;
cout << result.get() << endl;
}
}
return 0;
}
#include "future.hpp"
#include <iostream>
#include <future>
#include <chrono>
#include <utility>
#include <thread>
#include <functional>
#include <memory>
#include <exception>
#include <numeric>
#include <vector>
#include <cmath>
#include <string>
#include <mutex>
namespace future_ {
///
// reference: http://www.cplusplus.com/reference/future/async/
int test_async_1()
{
auto is_prime = [](int x) {
std::cout << "Calculating. Please, wait...\n";
for (int i = 2; i < x; ++i) if (x%i == 0) return false;
return true;
};
// call is_prime(313222313) asynchronously:
std::future<bool> fut = std::async(is_prime, 313222313);
std::cout << "Checking whether 313222313 is prime.\n";
// ...
bool ret = fut.get(); // waits for is_prime to return
if (ret) std::cout << "It is prime!\n";
else std::cout << "It is not prime.\n";
return 0;
}
///
// reference: http://www.cplusplus.com/reference/future/launch/
int test_async_2()
{
auto print_ten = [](char c, int ms) {
for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
std::cout << c;
}
};
std::cout << "with launch::async:\n";
std::future<void> foo = std::async(std::launch::async, print_ten, '*', 100);
std::future<void> bar = std::async(std::launch::async, print_ten, '@', 200);
// async "get" (wait for foo and bar to be ready):
foo.get(); // 注:注释掉此句,也会输出'*'
bar.get();
std::cout << "\n\n";
std::cout << "with launch::deferred:\n";
foo = std::async(std::launch::deferred, print_ten, '*', 100);
bar = std::async(std::launch::deferred, print_ten, '@', 200);
// deferred "get" (perform the actual calls):
foo.get(); // 注:注释掉此句,则不会输出'**********'
bar.get();
std::cout << '\n';
return 0;
}
///
// reference: https://en.cppreference.com/w/cpp/thread/async
std::mutex m;
struct X {
void foo(int i, const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << ' ' << i << '\n';
}
void bar(const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << '\n';
}
int operator()(int i) {
std::lock_guard<std::mutex> lk(m);
std::cout << i << '\n';
return i + 10;
}
};
template <typename RandomIt>
int parallel_sum(RandomIt beg, RandomIt end)
{
auto len = end - beg;
if (len < 1000)
return std::accumulate(beg, end, 0);
RandomIt mid = beg + len / 2;
auto handle = std::async(std::launch::async, parallel_sum<RandomIt>, mid, end);
int sum = parallel_sum(beg, mid);
return sum + handle.get();
}
int test_async_3()
{
std::vector<int> v(10000, 1);
std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
X x;
// Calls (&x)->foo(42, "Hello") with default policy:
// may print "Hello 42" concurrently or defer execution
auto a1 = std::async(&X::foo, &x, 42, "Hello");
// Calls x.bar("world!") with deferred policy
// prints "world!" when a2.get() or a2.wait() is called
auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
// Calls X()(43); with async policy
// prints "43" concurrently
auto a3 = std::async(std::launch::async, X(), 43);
a2.wait(); // prints "world!"
std::cout << a3.get() << '\n'; // prints "53"
return 0;
} // if a1 is not done at this point, destructor of a1 prints "Hello 42" here
///
// reference: https://thispointer.com/c11-multithreading-part-9-stdasync-tutorial-example/
int test_async_4()
{
using namespace std::chrono;
auto fetchDataFromDB = [](std::string recvdData) {
// Make sure that function takes 5 seconds to complete
std::this_thread::sleep_for(seconds(5));
//Do stuff like creating DB Connection and fetching Data
return "DB_" + recvdData;
};
auto fetchDataFromFile = [](std::string recvdData) {
// Make sure that function takes 5 seconds to complete
std::this_thread::sleep_for(seconds(5));
//Do stuff like fetching Data File
return "File_" + recvdData;
};
// Get Start Time
system_clock::time_point start = system_clock::now();
std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");
//Fetch Data from File
std::string fileData = fetchDataFromFile("Data");
//Fetch Data from DB
// Will block till data is available in future<std::string> object.
std::string dbData = resultFromDB.get();
// Get End Time
auto end = system_clock::now();
auto diff = duration_cast <std::chrono::seconds> (end - start).count();
std::cout << "Total Time Taken = " << diff << " Seconds" << std::endl;
//Combine The Data
std::string data = dbData + " :: " + fileData;
//Printing the combined Data
std::cout << "Data = " << data << std::endl;
return 0;
}
} // namespace future_
3. std::
C++11中的std::promise是个模板类。一个std::promise对象可以存储由future对象(可能在另一个线程中)检索的T类型的值或派生自std::exception的异常,并提供一个同步点。
在构造std::promise对象时,该对象与新的共享状态(shared state)关联。通过调用std::promise的get_future函数,可以将该共享状态与std::future对象关联。调用之后,两个对象共享相同的共享状态:(1).std::promise对象是异步提供程序(asynchronous provider),应 在某个时刻为共享状态设置一个值。(2).std::future对象是个异步返回对象,可以检索共享状态的值,并在必要时等待其准备就绪。
std::promise used to communicate objects between thread. The class template std::promise provides a facility to store a value or an exception that is later acquired asynchronously via a std::future object created by the std::promise object. Note that the std::promise object is meant to be used only once.
共享状态的生存期至少要持续到与之关联的最后一个对象释放或销毁为止。
std::future介绍参考:https://blog.csdn.net/fengbingchun/article/details/104115489
模板类std::promise成员函数包括:
1. 构造函数:(1).默认构造函数:通过访问新的空共享状态来初始化对象(The object is initialized with access to a new empty shared state)。(2).带allocator的构造函数。(3).禁用拷贝构造。(4).支持移动构造。
2. 析构函数:(1).放弃(abandon)共享状态并销毁promise对象。(2).如果有其它future对象关联到同一共享状态,则共享状态本身不会被销毁。(3).如果promise对象在共享状态准备就绪前被销毁,则共享状态自动准备就绪并包含一个std::future_error类型的异常。
3. get_future函数:(1).返回一个与promise对象的共享状态关联的std::future对象。(2).一旦准备就绪,返回的std::future对象就可以访问promise对象在共享状态上设置的值或异常。(3).每个promise共享状态只能被一个std::future对象检索(Only one future object can be retrieved for each promise shared state)。(4).调用此函数后,promise应在某个时候使其共享状态准备就绪(通过设置值或异常),否则将在销毁时自动准备就绪并包含一个std::future_error类型的异常。
4. operator=:(1).禁用拷贝赋值。(2).支持移动赋值。
5. set_exception:将异常指针存储进共享状态即设置共享状态的异常指针,准备就绪。
6. set_exception_at_thread_exit:设置共享状态的异常指针,但并不将该共享状态的标志设置为ready,当线程退出时,该promise对象会自动设置为ready (Stores the exception pointer p in the shared state without making it ready immediately. Instead, it will be made ready automatically at thread exit, once all objects of thread storage duration have been destroyed)。
7. set_value:(1).将值存储进共享状态即设置共享状态的值,准备就绪。(2).set_value(void)只是简单使共享状态就绪而无须设置任何值。
8. set_value_at_thread_exit:设置共享状态的值,但并不将该共享状态的标志设置为ready,当线程退出时,该promise对象会自动设置为ready(Stores val as the value in the shared state without making it ready immediately. Instead, it will be made ready automatically at thread exit, once all objects of thread storage duration have been destroyed)。
9. swap/非成员模板函数swap:交换共享状态。
详细用法见下面的测试代码,下面是从其他文章中copy的测试代码,部分作了调整,详细内容介绍可以参考对应的reference:
#include "future.hpp"
#include <iostream>
#include <future>
#include <chrono>
#include <utility>
#include <thread>
#include <functional>
#include <memory>
#include <exception>
#include <numeric>
#include <vector>
namespace future_ {
///
// reference: http://www.cplusplus.com/reference/future/promise/
int test_promise_1()
{
{ // constructor/get_future/set_value
std::promise<int> foo; // create promise
std::promise<int> bar = std::promise<int>(std::allocator_arg, std::allocator<int>());
std::future<int> fut = bar.get_future(); // engagement with future
//std::future<int> fut2 = bar.get_future(); // crash, 每个promise共享状态只能被一个std::future对象检索或关联
//std::future<int> fut = foo.get_future();
auto print_int = [&fut]() { int x = fut.get(); fprintf(stdout, "value: %d\n", x); };
std::thread th1(print_int); // send future to new thread
bar.set_value(10); // fulfill promise(synchronizes with getting the future)
//bar.set_value(10); // crash, 每个promise的set_value仅能被调用一次
//foo.set_value(10);
th1.join();
}
{ // operator =
std::promise<int> prom;
auto print_promise = [&prom]() {
std::future<int> fut = prom.get_future();
int x = fut.get();
std::cout << "value: " << x << '\n';
};
std::thread th1(print_promise);
prom.set_value(10);
th1.join();
prom = std::promise<int>(); // reset, by move-assigning a new promise
std::thread th2(print_promise);
prom.set_value(20);
th2.join();
}
{ // set_exception
std::promise<int> prom;
std::future<int> fut = prom.get_future();
auto get_int = [&prom]() {
int x;
std::cout << "Please, enter an integer value: ";
std::cin.exceptions(std::ios::failbit); // throw on failbit
try {
std::cin >> x; // sets failbit if input is not int
prom.set_value(x);
} catch (std::exception&) {
prom.set_exception(std::current_exception());
}
};
auto print_int = [&fut]() {
try {
int x = fut.get();
std::cout << "value: " << x << '\n';
} catch (std::exception& e) {
std::cout << "[exception caught: " << e.what() << "]\n";
}
};
std::thread th1(print_int);
std::thread th2(get_int);
th1.join();
th2.join();
}
return 0;
}
///
// reference: https://en.cppreference.com/w/cpp/thread/promise
void accumulate(std::vector<int>::iterator first, std::vector<int>::iterator last, std::promise<int> accumulate_promise)
{
int sum = std::accumulate(first, last, 0);
accumulate_promise.set_value(sum); // Notify future
}
void do_work(std::promise<void> barrier)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
barrier.set_value();
}
int test_promise_2()
{
// Demonstrate using promise<int> to transmit a result between threads.
std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
std::promise<int> accumulate_promise;
std::future<int> accumulate_future = accumulate_promise.get_future();
std::thread work_thread(accumulate, numbers.begin(), numbers.end(), std::move(accumulate_promise));
// future::get() will wait until the future has a valid result and retrieves it.
// Calling wait() before get() is not needed
//accumulate_future.wait(); // wait for result
std::cout << "result=" << accumulate_future.get() << '\n';
work_thread.join(); // wait for thread completion
// Demonstrate using promise<void> to signal state between threads.
std::promise<void> barrier;
std::future<void> barrier_future = barrier.get_future();
std::thread new_work_thread(do_work, std::move(barrier));
barrier_future.wait();
new_work_thread.join();
return 0;
}
///
// reference: https://en.cppreference.com/w/cpp/thread/promise/set_value_at_thread_exit
int test_promise_3()
{
#ifdef _MSC_VER
// set_value_at_thread_exit
using namespace std::chrono_literals;
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread([&p] {
std::this_thread::sleep_for(1s);
p.set_value_at_thread_exit(9); // gcc 4.9 don't support this function
}).detach();
std::cout << "Waiting..." << std::flush;
f.wait();
std::cout << "Done!\nResult is: " << f.get() << '\n';
#endif
return 0;
}
} // namespace future_
3. std::packaged_task
C++11中的std::packaged_task是个模板类。std::packaged_task包装任何可调用目标(函数、lambda表达式、bind表达式、函数对象)以便它可以被异步调用。它的返回值或抛出的异常被存储于能通过std::future对象访问的共享状态中。
std::packaged_task类似于std::function,但是会自动将其结果传递给std::future对象。
std::packaged_task对象内部包含两个元素:(1).存储的任务(stored task)是一些可调用的对象(例如函数指针、成员或函数对象的指针)( A stored task, which is some callable object (such as a function pointer, pointer to member or function object))。(2).共享状态,它可以存储调用存储的任务(stored task)的结果,并可以通过std::future进行异步访问(A shared state, which is able to store the results of calling the stored task and be accessed asynchronously through a future)。
通过调用std::packaged_task的get_future成员将共享状态与std::future对象关联。调用之后,两个对象共享相同的共享状态:(1).std::packaged_task对象是异步提供程序(asynchronous provider),应通过调用存储的任务(stored task)在某个时刻将共享状态设置为就绪。(2).std::future对象是一个异步返回对象,可以检索共享状态的值,并在必要时等待其准备就绪。
共享状态的生存期至少要持续到与之关联的最后一个对象释放或销毁为止。
std::packaged_task不会自己启动,你必须调用它(A packaged_task won’t start on it’s own, you have to invoke it)。
std::future介绍参考:https://blog.csdn.net/fengbingchun/article/details/104115489
模板类std::packaged_task成员函数包括:
1. 构造函数:(1).默认构造函数:无共享状态无存储任务(no shared state and no stored task)情况下初始化对象。(2). initialization constructor:该对象具有共享状态,且其存储的任务由fn初始化。(3). initialization constructor with allocator。(4).禁用拷贝构造。(5).支持移动构造。
2. 析构函数:(1).放弃(abandon)共享状态并销毁packaged_task对象。(2). 如果有其它future对象关联到同一共享状态,则共享状态本身不会被销毁。(3). 如果packaged_task对象在共享状态准备就绪前被销毁,则共享状态自动准备就绪并包含一个std::future_error类型的异常。
3. get_future函数:(1).返回一个与packaged_task对象的共享状态关联的std::future对象。(2).一旦存储的任务被调用,返回的std::future对象就可以访问packaged_task对象在共享状态上设置的值或异常。(3).每个packaged_task共享状态只能被一个std::future对象检索(Only one future object can be retrieved for each packaged_task shared state)。(4).调用此函数后,packaged_task应在某个时候使其共享状态准备就绪(通过调用其存储的任务),否则将在销毁后自动准备就绪并包含一个std::future_error类型的异常。
4. make_ready_at_thread_exit函数:在线程退出时才使共享状态ready而不是在调用完成后就立即ready。
5. operator=:(1).禁用拷贝赋值。(2).支持移动赋值。
6. operator():(1).call stored task。(2).如果对存储任务的调用成功完成或抛出异常,则返回的值或捕获的异常存储在共享状态,共享状态准备就绪(解除阻塞当前等待它的所有线程)。
7. reset函数:(1).在保持相同存储的任务的同时,以新的共享状态重置对象。(2).允许再次调用存储的任务。(3).与对象关联的之前的共享状态被放弃(就像packaged_task被销毁了一样)。(4).在内部,该函数的行为就像是移动赋值了一个新构造的packaged_task一样(Internally, the function behaves as if move-assigned a newly constructed packaged_task (with its stored task as argument))。
8. swap函数/非成员模板函数swap:交换共享状态和存储的任务(stored task)。
9. valid函数:检查packaged_task对象是否具有共享状态。
#include "future.hpp"
#include <iostream>
#include <future>
#include <chrono>
#include <utility>
#include <thread>
#include <functional>
#include <memory>
#include <exception>
#include <numeric>
#include <vector>
#include <cmath>
#include <string>
namespace future_ {
///
// reference: http://www.cplusplus.com/reference/future/packaged_task/
int test_packaged_task_1()
{
{ // constructor/get_future/operator=/valid
std::packaged_task<int(int)> foo; // default-constructed
std::packaged_task<int(int)> bar([](int x) { return x * 2; }); // initialized
foo = std::move(bar); // move-assignment
std::cout << "valid: " << foo.valid() << "\n";
std::future<int> ret = foo.get_future(); // get future
std::thread(std::move(foo), 10).detach(); // spawn thread and call task
int value = ret.get(); // wait for the task to finish and get result
std::cout << "The double of 10 is " << value << ".\n";
}
{ // reset/operator()
std::packaged_task<int(int)> tsk([](int x) { return x * 3; }); // package task
std::future<int> fut = tsk.get_future();
tsk(33);
std::cout << "The triple of 33 is " << fut.get() << ".\n";
// re-use same task object:
tsk.reset();
fut = tsk.get_future();
std::thread(std::move(tsk), 99).detach();
std::cout << "Thre triple of 99 is " << fut.get() << ".\n";
}
{ // constructor/get_future
auto countdown = [](int from, int to) {
for (int i = from; i != to; --i) {
std::cout << i << '\n';
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Lift off!\n";
return from - to;
};
std::packaged_task<int(int, int)> tsk(countdown); // set up packaged_task
std::future<int> ret = tsk.get_future(); // get future
std::thread th(std::move(tsk), 5, 0); // spawn thread to count down from 5 to 0
int value = ret.get(); // wait for the task to finish and get result
std::cout << "The countdown lasted for " << value << " seconds.\n";
th.join();
}
return 0;
}
///
// reference: https://en.cppreference.com/w/cpp/thread/packaged_task
int test_packaged_task_2()
{
{ // lambda
std::packaged_task<int(int, int)> task([](int a, int b) { return std::pow(a, b);});
std::future<int> result = task.get_future();
task(2, 9);
std::cout << "task_lambda:\t" << result.get() << '\n';
}
{ // bind
std::packaged_task<int()> task(std::bind([](int x, int y) { return std::pow(x, y); }, 2, 11));
std::future<int> result = task.get_future();
task();
std::cout << "task_bind:\t" << result.get() << '\n';
}
{ // thread
std::packaged_task<int(int, int)> task([](int x, int y) { return std::pow(x, y); });
std::future<int> result = task.get_future();
std::thread task_td(std::move(task), 2, 10);
task_td.join();
std::cout << "task_thread:\t" << result.get() << '\n';
}
return 0;
}
///
// reference: https://thispointer.com/c11-multithreading-part-10-packaged_task-example-and-tutorial/
struct DBDataFetcher {
std::string operator()(std::string token)
{
// Do some stuff to fetch the data
std::string data = "Data From " + token;
return data;
}
};
int test_packaged_task_3()
{
// Create a packaged_task<> that encapsulated a Function Object
std::packaged_task<std::string(std::string)> task(std::move(DBDataFetcher()));
// Fetch the associated future<> from packaged_task<>
std::future<std::string> result = task.get_future();
// Pass the packaged_task to thread to run asynchronously
std::thread th(std::move(task), "Arg");
// Join the thread. Its blocking and returns when thread is finished.
th.join();
// Fetch the result of packaged_task<> i.e. value returned by getDataFromDB()
std::string data = result.get();
std::cout << data << std::endl;
return 0;
}
///
// reference: https://stackoverflow.com/questions/18143661/what-is-the-difference-between-packaged-task-and-async
int test_packaged_task_4()
{
// sleeps for one second and returns 1
auto sleep = []() {
std::this_thread::sleep_for(std::chrono::seconds(1));
return 1;
};
{ // std::packaged_task
// >>>>> A packaged_task won't start on it's own, you have to invoke it
std::packaged_task<int()> task(sleep);
auto f = task.get_future();
task(); // invoke the function
// You have to wait until task returns. Since task calls sleep
// you will have to wait at least 1 second.
std::cout << "You can see this after 1 second\n";
// However, f.get() will be available, since task has already finished.
std::cout << f.get() << std::endl;
}
{ // std::async
// >>>>> On the other hand, std::async with launch::async will try to run the task in a different thread :
auto f = std::async(std::launch::async, sleep);
std::cout << "You can see this immediately!\n";
// However, the value of the future will be available after sleep has finished
// so f.get() can block up to 1 second.
std::cout << f.get() << "This will be shown after a second!\n";
}
return 0;
}
} // namespace future_