std::future和std::promise和std::packaged_task
std::future
其實future有兩個兄弟,一個是std::future, 一個是它大哥std::shared_future。他們的區別就是std::future只支持移動語義,它所引用的共享狀態不與另一異步返回對象共享。換成人話就是如果你想在多個線程共享一個future,那么你應該用std::shared_future,換成大白話就是你要是想多個線程等待一個future,那么你應該用std::shared_future。如果還是不明白,文章最后會給一個小例子說明。這篇文章主要講的是std::future。
同時講std::future離不開它的承載者和創建者, 也就是std::async?,?std::packaged_task,std::promise,會在今后的文章一一描述。
首先我們先看std::future是如何定義的(如果感覺枯燥可略過這節):
| 定義于頭文件?<future> | ? | ? |
| template<?class?T?>?class?future; | (1) | (C++11 起) |
| template<?class?T?>?class?future<T&>; | (2) | (C++11 起) |
| template<>?? ? ? ? ?class?future<void>; | (3) | (C++11 起) |
類模板 std::future 提供訪問異步操作結果的機制:
-
(通過 std::async 、 std::packaged_task 或 std::promise 創建的)異步操作能提供一個 std::future 對象給該異步操作的創建者(線程)。
-
然后,異步操作的創建者能用各種方法查詢、等待或從 std::future 提取值。若異步操作仍未提供值,則這些方法可能阻塞。
-
異步操作準備好發送結果給創建者時,它(異步操作)能通過修改鏈接到創建者的 std::future 的共享狀態(例如 std::promise::set_value )進行。?
成員函數
| (構造函數) | 構造 future 對象?(公開成員函數) |
| (析構函數) | 析構 future 對象?(公開成員函數) |
| operator= | 移動future對象?(公開成員函數) |
| share | 從?*this?轉移共享狀態給?shared_future?并返回它? (公開成員函數) |
| 獲取結果 | |
| get | 返回結果?(公開成員函數) |
| 狀態 | |
| valid | 檢查 future 是否擁有共享狀態?(公開成員函數) |
| wait | 等待結果變得可用?(公開成員函數) |
| wait_for | 等待結果,如果在指定的超時間隔后仍然無法得到結果,則返回。(公開成員函數) |
| wait_until | 等待結果,如果在已經到達指定的時間點時仍然無法得到結果,則返回。(公開成員函數) |
std::future?涉及到了并行編程的概念。為了更好的理解,我們先討論一個我們會經常遇到的場景。現在你要做三件事,一個是燉排骨一個是燒開水,同時你還在追最火的電視劇。那么作為一個正常人,你不會傻傻的先燒開水,等水燒完了燉排骨,等排骨好了再去看電視劇吧。一個正常人大概率會選擇燉上排骨燒上水后就去看電視劇,然后不斷的查看排骨和水是不是好了。
當然排骨和水的處理方式不一樣,燒熱水等開了會響,而排骨就要根據個人經驗還有排骨當前的狀態放入佐料并且控制時間。這就涉及到我們編程中的架構模型,std::future要討論的就是排骨這種情況,而不是燒水。
在編程的過程中,程序中往往會有一些功能或者很耗時或者很占計算資源,這樣的程序我們往往傾向于讓它在另一個thread中運行,我們的主thread可以先干其他事情,等干完了其他事情在來看看之前的工作干完了沒有,如果干完了,那么拿出結果,如果沒干完就等它干完或者只等一會。
下邊就舉一個例子:
#include <iostream> #include <future> #include <thread> #include <chrono>int main() {std::future<int> future = std::async(std::launch::async, [](){ std::this_thread::sleep_for(std::chrono::seconds(3));return 8; }); std::cout?<<?"開始燉排骨...\n";std::future_status status;do {// 干點別的活,比如看電視劇status = future.wait_for(std::chrono::seconds(1));if (status == std::future_status::deferred) {std::cout << "deferred\n";} else if (status == std::future_status::timeout) {std::cout?<<?"一分鐘以后排骨還沒好\n";} else if (status == std::future_status::ready) {std::cout << "排骨好了!\n";}} while (status != std::future_status::ready); std::cout << "result is " << future.get() << '\n'; }可能的輸出:
開始燉排骨...一分鐘以后排骨還沒好一分鐘以后排骨還沒好排骨好了!result is 8當然在實踐中我們不會死等,我們會隔一會看看排骨好沒好。
std::shared_future
類模板 std::shared_future 提供訪問異步操作結果的機制,類似 std::future ,除了允許多個線程等候同一共享狀態。不同于僅可移動的 std::future (故只有一個實例能指代任何特定的異步結果),std::shared_future 可復制而且多個 shared_future 對象能指代同一共享狀態。
若每個線程通過其自身的 shared_future 對象副本訪問,則從多個線程訪問同一共享狀態是安全的。?
shared_future?不是文章的重點,這里僅僅舉一個例子說明:
#include <iostream> #include <future> #include <chrono>int main() { std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;std::shared_future<void> ready_future(ready_promise.get_future());std::chrono::time_point<std::chrono::high_resolution_clock> start;auto fun1 = [&, ready_future]() -> std::chrono::duration<double, std::milli> {t1_ready_promise.set_value();ready_future.wait(); // 等待來自 main() 的信號return std::chrono::high_resolution_clock::now() - start;};auto fun2 = [&, ready_future]() -> std::chrono::duration<double, std::milli> {t2_ready_promise.set_value();ready_future.wait(); // 等待來自 main() 的信號return std::chrono::high_resolution_clock::now() - start;};auto result1 = std::async(std::launch::async, fun1);auto result2 = std::async(std::launch::async, fun2);// 等待線程變為就緒t1_ready_promise.get_future().wait();t2_ready_promise.get_future().wait();// 線程已就緒,開始時鐘start = std::chrono::high_resolution_clock::now();// 向線程發信 使之運行ready_promise.set_value();std::cout << "Thread 1 received the signal "<< result1.get().count() << " ms after start\n"<< "Thread 2 received the signal "<< result2.get().count() << " ms after start\n"; }可能的輸出:?
Thread 1 received the signal 0.072 ms after startThread 2 received the signal 0.041 ms after startstd::promise
promise 對象可以保存某一類型 T 的值,該值可被 future 對象讀取(可能在另外一個線程中),因此 promise 也提供了一種線程同步的手段。在 promise 對象構造時可以和一個共享狀態(通常是std::future)相關聯,并可以在相關聯的共享狀態(std::future)上保存一個類型為 T 的值。
可以通過 get_future 來獲取與該 promise 對象相關聯的 future 對象,調用該函數之后,兩個對象共享相同的共享狀態(shared state)
promise 對象是異步 Provider,它可以在某一時刻設置共享狀態的值。
future 對象可以異步返回共享狀態的值,或者在必要的情況下阻塞調用者并等待共享狀態標志變為 ready,然后才能獲取共享狀態的值。
下面我們來看看官方定義:
| 定義于頭文件 <future> | ? | ? |
| template< class R > class promise; | (1) | (C++11 起) |
| template< class R > class promise<R&>; | (2) | (C++11 起) |
| template<> ? ? ? ? ?class promise<void>; | (3) | (C++11 起) |
1) 空模板
2) 非 void 特化,用于在線程間交流對象
3) void 特化,用于交流無狀態事件
類模板 std::promise 提供存儲值或異常的設施,之后通過 std::promise 對象所創建的 std::future 對象異步獲得結果。注意 std::promise 只應當使用一次。
每個 promise 與共享狀態關聯,共享狀態含有一些狀態信息和可能仍未求值的結果,它求值為值(可能為 void )或求值為異常。promise 可以對共享狀態做三件事:
-
使就緒:promise 存儲結果或異常于共享狀態。標記共享狀態為就緒,并解除阻塞任何等待于與該共享狀態關聯的 future 上的線程。
-
釋放:promise 放棄其對共享狀態的引用。若這是最后一個這種引用,則銷毀共享狀態。除非這是 std::async 所創建的未就緒的共享狀態,否則此操作不阻塞。
-
拋棄:promise 存儲以 std::future_errc::broken_promise 為 error_code 的 std::future_error 類型異常,令共享狀態為就緒,然后釋放它。
promise 是 promise-future 交流通道的“推”端:存儲值于共享狀態的操作同步于(定義于 std::memory_order )任何在共享狀態上等待的函數(如 std::future::get )的成功返回。其他情況下對共享狀態的共時訪問可能沖突:例如, std::shared_future::get 的多個調用方必須全都是只讀,或提供外部同步。
成員函數
| (構造函數) | 構造std::promise對象? (公開成員函數) |
| (析構函數) | 析構std::promise對象? (公開成員函數) |
| operator= | 賦值共享狀態? (公開成員函數) |
| swap | 交換二個 promise 對象? (公開成員函數) |
| 獲取結果 | |
| get_future | 返回與承諾的結果關聯的 future? (公開成員函數) |
| 設置結果 | |
| set_value | 設置結果為指定值? (公開成員函數) |
| set_value_at_thread_exit | 設置結果為指定值,同時僅在線程退出時分發提醒? (公開成員函數) |
| set_exception | 設置結果為指示異常? (公開成員函數) |
| set_exception_at_thread_exit | 設置結果為指示異常,同時僅在線程退出時分發提醒? (公開成員函數) |
下面舉一個小例子,將promise<int> 用作線程間信號:
#include <vector>#include <thread>#include <future>#include <numeric>#include <iostream>#include <chrono> void accumulate(std::vector<int>::iterator first, std::vector<int>::iterator last, std::promise<int> accumulate_promise){ int sum = std::accumulate(first, last, 0); accumulate_promise.set_value(sum); // 提醒 future} void do_work(std::promise<void> barrier){ std::this_thread::sleep_for(std::chrono::seconds(1)); barrier.set_value();} int main(){ // 演示用 promise<int> 在線程間傳遞結果。 std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 }; std::promise<int> accumulate_promise; std::future<int> accumulate_future = accumulate_promise.get_future(); std::thread work_thread(accumulate, numbers.begin(), numbers.end(), std::move(accumulate_promise)); // future::get() 將等待直至該 future 擁有合法結果并取得它 // 無需在 get() 前調用 wait() //accumulate_future.wait(); // 等待結果 std::cout << "result=" << accumulate_future.get() << '\n'; work_thread.join(); // wait for thread completion // 演示用 promise<void> 在線程間對狀態發信號 std::promise<void> barrier; std::future<void> barrier_future = barrier.get_future(); std::thread new_work_thread(do_work, std::move(barrier)); barrier_future.wait(); new_work_thread.join();}輸出:
result=21?
?std::packaged_task
其實std::packaged_task并沒有引入新的概念, 它是一個包裝類,它包裝任何可調用 (Callable)?目標(函數、 lambda 表達式、 bind 表達式或其他函數對象),使得能異步調用它。其返回值或所拋異常被存儲于能通過?std::future?對象訪問的共享狀態中。?
正如 std::function , std::packaged_task 是多態、具分配器的容器:可在堆上或以提供的分配器分配存儲的可調用對象。?
std::packaged_task本身很簡單,但是卻給了我們很多想象空間,這篇文章就不展開講它的具體應用場景。
下面我們來看一下具體的定義:
| 定義于頭文件 <future> | ? | ? |
| template< class > class packaged_task; // 不定義 | (1) | (C++11 起) |
| template< class R, class ...Args >? | (2) | (C++11 起) |
下邊是一些比較枯燥的API解釋, 不喜歡看可以略過,直接看例子。
成員函數
| (構造函數) | 構造任務對象? (公開成員函數) |
| (析構函數) | 析構任務對象? (公開成員函數) |
| operator= | 移動任務對象? (公開成員函數) |
| valid | 檢查任務對象是否擁有合法函數? (公開成員函數) |
| swap | 交換二個任務對象? (公開成員函數) |
| 獲取結果 | |
| get_future | 返回與承諾的結果關聯的 std::future? (公開成員函數) |
| 執行 | |
| operator() | 執行函數? (公開成員函數) |
| make_ready_at_thread_exit | 執行函數,并確保結果僅在一旦當前線程退出時就緒 (公開成員函數) |
| reset | 重置狀態,拋棄任何先前執行的存儲結果? (公開成員函數) |
非成員函數
| std::swap(std::packaged_task) (C++11) | 特化 std::swap 算法? (函數模板) |
輔助類
| std::uses_allocator<std::packaged_task> (C++11)(C++17 前) | 特化 std::uses_allocator 類型特征 ? (類模板特化) |
例子:
#include <iostream>#include <future>#include <thread>int main(){// 將一個返回值為7的 lambda 表達式封裝到 task 中// std::packaged_task 的模板參數為要封裝函數的類型std::packaged_task<int()> task([](){return 7;});// 獲得 task 的 futurestd::future<int> result = task.get_future(); // 在一個線程中執行 taskstd::thread(std::move(task)).detach(); std::cout << "Waiting...";result.wait();// 輸出執行結果std::cout << "Done!" << std:: endl << "Result is " << result.get() << '\n';}例子2:
#include <iostream>#include <cmath>#include <thread>#include <future>#include <functional> // 避免對 std::pow 重載集消歧義的獨有函數int f(int x, int y) { return std::pow(x,y); } void task_lambda(){ std::packaged_task<int(int,int)> task([](int a, int b) { return std::pow(a, b); }); std::future<int> result = task.get_future(); task(2, 9); std::cout << "task_lambda:\t" << result.get() << '\n';} void task_bind(){ std::packaged_task<int()> task(std::bind(f, 2, 11)); std::future<int> result = task.get_future(); task(); std::cout << "task_bind:\t" << result.get() << '\n';} void task_thread(){ std::packaged_task<int(int,int)> task(f); std::future<int> result = task.get_future(); std::thread task_td(std::move(task), 2, 10); task_td.join(); std::cout << "task_thread:\t" << result.get() << '\n';} int main(){ task_lambda(); task_bind(); task_thread();}輸出:
task_lambda: 512task_bind: 2048task_thread: 1024總結
以上是生活随笔為你收集整理的std::future和std::promise和std::packaged_task的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: std::mutex
- 下一篇: 标准化条件变量 -- condition