Linux多线程实践(10) --使用 C++11 编写 Linux 多线程程序
在這個多核時代,如何充分利用每個?CPU?內核是一個繞不開的話題,從需要為成千上萬的用戶同時提供服務的服務端應用程序,到需要同時打開十幾個頁面,每個頁面都有幾十上百個鏈接的?web?瀏覽器應用程序,從保持著幾?t?甚或幾?p?的數據的數據庫系統,到手機上的一個有良好用戶響應能力的?app,為了充分利用每個?CPU?內核,都會想到是否可以使用多線程技術。這里所說的“充分利用”包含了兩個層面的意思,一個是使用到所有的內核,再一個是內核不空閑,不讓某個內核長時間處于空閑狀態。在?C++98?的時代,C++標準并沒有包含多線程的支持,人們只能直接調用操作系統提供的?SDK?API?來編寫多線程程序,不同的操作系統提供的?SDK?API?以及線程控制能力不盡相同,到了?C++11,終于在標準之中加入了正式的多線程的支持,從而我們可以使用標準形式的類來創建與執行線程,也使得我們可以使用標準形式的鎖、原子操作、線程本地存儲?(TLS)?等來進行復雜的各種模式的多線程編程,而且,C++11?還提供了一些高級概念,比如?promise/future,packaged_task,async?等以簡化某些模式的多線程編程。
多線程可以讓我們的應用程序擁有更加出色的性能,同時,如果沒有用好,多線程又是比較容易出錯的且難以查找錯誤所在,甚至可以讓人們覺得自己陷進了泥潭,希望本文能夠幫助您更好地使用?C++11?來進行?Linux?下的多線程編程。
?
認識多線程
首先我們應該正確地認識線程。維基百科對線程的定義是:線程是一個編排好的指令序列,這個指令序列(線程)可以和其它的指令序列(線程)并行執行,操作系統調度器將線程作為最小的?CPU?調度單元。在進行架構設計時,我們應該多從操作系統線程調度的角度去考慮應用程序的線程安排,而不僅僅是代碼。
當只有一個?CPU?內核可供調度時,多個線程的運行示意如下:
圖?1、單個?CPU?內核上的多個線程運行示意圖
我們可以看到,這時的多線程本質上是單個?CPU?的時間分片,一個時間片運行一個線程的代碼,它可以支持并發處理,但是不能說是真正的并行計算。
當有多個?CPU?或者多個內核可供調度時,可以做到真正的并行計算,多個線程的運行示意如下:
圖?2、雙核?CPU?上的多個線程運行示意圖
從上述兩圖,我們可以直接得到使用多線程的一些常見場景:
????進程中的某個線程執行了一個阻塞操作時,其它線程可以依然運行,比如,等待用戶輸入或者等待網絡數據包的時候處理啟動后臺線程處理業務,或者在一個游戲引擎中,一個線程等待用戶的交互動作輸入,另外一個線程在后臺合成下一幀要畫的圖像或者播放背景音樂等。
????將某個任務分解為小的可以并行進行的子任務,讓這些子任務在不同的?CPU?或者內核上同時進行計算,然后匯總結果,比如歸并排序,或者分段查找,這樣子來提高任務的執行速度。
需要注意一點,因為單個?CPU?內核下多個線程并不是真正的并行,有些問題,比如?CPU?緩存不一致問題,不一定能表現出來,一旦這些代碼被放到了多核或者多?CPU?的環境運行,就很可能會出現“在開發測試環境一切沒有問題,到了實施現場就莫名其妙”的情況,所以,在進行多線程開發時,開發與測試環境應該是多核或者多?CPU?的,以避免出現這類情況。
?
C++11?的線程類?std::thread
C++11?的標準類?std::thread?對線程進行了封裝,它的聲明放在頭文件?thread?中,其中聲明了線程類?thread,?線程標識符?id,以及名字空間?this_thread,按照?C++11?規范,這個頭文件至少應該兼容如下內容:
清單?1.例子?thread?頭文件主要內容
namespace std { struct thread {// native_handle_type 是連接 thread 類和操作系統 SDK API 之間的橋梁。// typedef pthread_t __gthread_t;typedef __gthread_t native_handle_type;native_handle_type native_handle();//struct id{id() noexcept;// 可以由==, < 兩個運算衍生出其它大小關系運算。bool operator==(thread::id x, thread::id y) noexcept;bool operator<(thread::id x, thread::id y) noexcept;template<class charT, class traits>basic_ostream<charT, traits>&operator<<(basic_ostream<charT, traits>&out, thread::id id);// 哈希函數template <class T> struct hash;template <> struct hash<thread::id>;};id get_id() const noexcept;// 構造與析構thread() noexcept;template<class F, class… Args> explicit thread(F&f, Args&&… args);~thread();thread(const thread&) = delete;thread(thread&&) noexcept;thread& operator=( const thread&) = delete;thread& operator=(thread&&) noexcept;//void swap(thread&) noexcept;bool joinable() const noexcept;void join();void detach();// 獲取物理線程數目static unsigned hardware_concurrency() noexcept; }namespace this_thead {thread::id get_id();void yield();template<class Clock, class Duration>void sleep_until(const chrono::time_point<Clock, Duration>& abs_time);template<class Rep, class Period>void sleep_for(const chromo::duration<Rep, Period>& rel_time); } }和有些語言中定義的線程不同,C++11?所定義的線程是和操作系的線程是一一對應的,也就是說我們生成的線程都是直接接受操作系統的調度的,通過操作系統的相關命令(比如?ps?-M?命令)是可以看到的,一個進程所能創建的線程數目以及一個操作系統所能創建的總的線程數目等都由運行時操作系統限定。
native_handle_type?是連接?thread?類和操作系統?SDK?API?之間的橋梁,在?g++(libstdc++)?for?Linux?里面,native_handle_type?其實就是?pthread?里面的?pthread_t?類型,當?thread?類的功能不能滿足我們的要求的時候(比如改變某個線程的優先級),可以通過?thread?類實例的?native_handle()?返回值作為參數來調用相關的?pthread?函數達到目的。thread::id?定義了在運行時操作系統內唯一能夠標識該線程的標識符,同時其值還能指示所標識的線程的狀態,其默認值?(thread::id())?表示不存在可控的正在執行的線程(即空線程,比如,調用?thread()?生成的沒有指定入口函數的線程類實例),當一個線程類實例的?get_id()?等于默認值的時候,即?get_id()?==?thread::id(),表示這個線程類實例處于下述狀態之一:
????尚未指定運行的任務
????線程運行完畢
????線程已經被轉移?(move)?到另外一個線程類實例
????線程已經被分離?(detached)
空線程?id?字符串表示形式依具體實現而定,有些編譯器為?0X0,有些為一句語義解釋。
有時候我們需要在線程執行代碼里面對當前調用者線程進行操作,針對這種情況,C++11?里面專門定義了一個名字空間?this_thread,其中包括?get_id()?函數可用來獲取當前調用者線程的?id,yield()?函數可以用來將調用者線程跳出運行狀態,重新交給操作系統進行調度,sleep_until?和?sleep_for?函數則可以讓調用者線程休眠若干時間。get_id()?函數實際上是通過調用?pthread_self()?函數獲得調用者線程的標識符,而?yield()?函數則是通過調用操作系統?API?sched_yield()?進行調度切換。
?
如何創建和結束一個線程
和?pthread_create?不同,使用?thread?類創建線程可以使用一個函數作為入口,也可以是其它的?Callable?對象,而且,可以給入口傳入任意個數任意類型的參數:
清單?2.例子?thread_run_func_var_args.cc
int funcReturnInt(const char* fmt, ...) {va_list ap;va_start(ap, fmt);vprintf( fmt, ap );va_end(ap);return 0xabcd; }void threadRunFunction(void) {thread* t = new thread(funcReturnInt, "%d%s\n", 100, "\%");t->join();delete t; }我們也可以傳入一個?Lambda?表達式作為入口,比如:
清單?3.例子?thread_run_lambda.cc
void threadRunLambda(void) {int a = 100, b = 200;thread* t = new thread( [](int ia, int ib){cout << (ia + ib) << endl;},a, b );t->join();delete t; }一個類的成員函數也可以作為線程入口:
清單?4.例子?thread_run_member_func.cc
struct God {void create(const char* anything){cout << "create " << anything << endl;} };void threadRunMemberFunction(void) {God god;thread* t = new thread( &God::create, god, "the world" );t->join();delete t; }雖然?thread?類的初始化可以提供這么豐富和方便的形式,其實現的底層依然是創建一個?pthread?線程并運行之,有些實現甚至是直接調用?pthread_create?來創建。
創建一個線程之后,我們還需要考慮一個問題:該如何處理這個線程的結束?一種方式是等待這個線程結束,在一個合適的地方調用?thread?實例的?join()?方法,調用者線程將會一直等待著目標線程的結束,當目標線程結束之后調用者線程繼續運行;另一個方式是將這個線程分離,由其自己結束,通過調用?thread?實例的?detach()?方法將目標線程置于分離模式。一個線程的?join()?方法與?detach()?方法只能調用一次,不能在調用了?join()?之后又調用?detach(),也不能在調用?detach()?之后又調用?join(),在調用了?join()?或者?detach()?之后,該線程的?id?即被置為默認值(空線程),表示不能繼續再對該線程作修改變化。如果沒有調用?join()?或者?detach(),那么,在析構的時候,該線程實例將會調用?std::terminate(),這會導致整個進程退出,所以,如果沒有特別需要,一般都建議在生成子線程后調用其?join()?方法等待其退出,這樣子最起碼知道這些子線程在什么時候已經確保結束。
在?C++11?里面沒有提供?kill?掉某個線程的能力,只能被動地等待某個線程的自然結束,如果我們要主動停止某個線程的話,可以通過調用?Linux?操作系統提供的?pthread_kill?函數給目標線程發送信號來實現,示例如下:
清單?5.例子?thread_kill.cc
int counter = 0; static void on_signal_term(int sig) {cout << "on SIGTERM:" << this_thread::get_id() << endl;cout << "Usage: pthread_self: " << pthread_self() << endl;cout << "counter = " << counter << endl;pthread_exit(NULL); } void threadPosixKill(void) {signal(SIGTERM, on_signal_term);thread* t = new thread( [](){while(true){++ counter;}});pthread_t tid = t->native_handle();cout << "tid=" << tid << endl;// 確保子線程已經在運行。this_thread::sleep_for( chrono::seconds(1) );pthread_kill(tid, SIGTERM);t->join();delete t;cout << "thread destroyed." << endl; }上述例子還可以用來給某個線程發送其它信號,具體的?pthread_exit?函數調用的約定依賴于具體的操作系統的實現,所以,這個方法是依賴于具體的操作系統的,而且,因為在?C++11?里面沒有這方面的具體約定,用這種方式也是依賴于?C++編譯器的具體實現的。
?
線程類?std::thread?的其它方法和特點
thread?類是一個特殊的類,它不能被拷貝,只能被轉移或者互換,這是符合線程的語義的,不要忘記這里所說的線程是直接被操作系統調度的。線程的轉移使用?move?函數,示例如下:
清單?6.例子?thread_move.cc
void threadMove(void) {int a = 1;thread t( [](int* pa){for(;;){*pa = (*pa * 33) % 0x7fffffff;if ( ( (*pa) >> 30) & 1) break;}}, &a);thread t2 = move(t); // 改為 t2 = t 將不能編譯。t2.join();cout << "a=" << a << endl; }在這個例子中,如果將?t2.join()?改為?t.join()?將會導致整個進程被結束,因為忘記了調用?t2?也就是被轉移的線程的?join()?方法,從而導致整個進程被結束,而?t?則因為已經被轉移,其?id?已被置空。
線程實例互換使用?swap?函數,示例如下:
清單?7.例子?thread_swap.cc
void threadSwap(void) {int a = 1;thread t( [](int* pa){for(;;){*pa = (*pa * 33) % 0x7fffffff;if ( ( (*pa) >> 30) & 1) break;}}, &a);thread t2;cout << "before swap: t=" << t.get_id()<< ", t2=" << t2.get_id() << endl;swap(t, t2);cout << "after swap : t=" << t.get_id()<< ", t2=" << t2.get_id() << endl;t2.join();cout << "a=" << a << endl; }互換和轉移很類似,但是互換僅僅進行實例(以?id?作標識)的互換,而轉移則在進行實例標識的互換之前,還進行了轉移目的實例(如下例的t2)的清理,如果?t2?是可聚合的(joinable()?方法返回?true),則調用?std::terminate(),這會導致整個進程退出,比如下面這個例子:
清單?8.例子?thread_move_term.cc
void threadMoveTerm(void) {int a = 1;thread t( [](int* pa){for(;;){*pa = (*pa * 33) % 0x7fffffff;if ( ( (*pa) >> 30) & 1) break;}}, &a);thread t2( [](){int i = 0;for(;;)i++;} );t2 = move(t); // 將會導致 std::terminate()cout << "should not reach here" << endl;t2.join(); }所以,在進行線程實例轉移的時候,要注意判斷目的實例的?id?是否為空值(即?id())。
如果我們繼承了?thread?類,則還需要禁止拷貝構造函數、拷貝賦值函數以及賦值操作符重載函數等,另外,thread?類的析構函數并不是虛析構函數。示例如下:
清單?9.例子?thread_inherit.cc
class MyThread : public thread { public:MyThread() noexcept : thread() {};template<typename Callable, typename... Args>explicitMyThread(Callable&& func, Args&&... args) :thread( std::forward<Callable>(func),std::forward<Args>(args)...){}~MyThread(){}// disable copy constructorsMyThread( MyThread& ) = delete;MyThread( const MyThread& ) = delete;MyThread& operator=(const MyThread&) = delete; };因為?thread?類的析構函數不是虛析構函數,在上例中,需要避免出現下面這種情況:
MyThread* tc = new MyThread(…); … thread* tp = tc; … delete tp;這種情況會導致?MyThread?的析構函數沒有被調用。
?
線程的調度
我們可以調用?this_thread::yield()?將當前調用者線程切換到重新等待調度(放棄當前所占用的CPU),但是不能對非調用者線程進行調度切換,也不能讓非調用者線程休眠(這是操作系統調度器干的活)。
清單?10.例子?thread_yield.cc
void threadYield(void) {unsigned int procs = thread::hardware_concurrency(), // 獲取物理線程數目i = 0;thread* ta = new thread( [](){struct timeval t1, t2;gettimeofday(&t1, NULL);for(int i = 0, m = 13; i < COUNT; i++, m *= 17){this_thread::yield();}gettimeofday(&t2, NULL);print_time(t1, t2, " with yield");} );thread** tb = new thread*[ procs ];for( i = 0; i < procs; i++){tb[i] = new thread( [](){struct timeval t1, t2;gettimeofday(&t1, NULL);for(int i = 0, m = 13; i < COUNT; i++, m *= 17){do_nothing();}gettimeofday(&t2, NULL);print_time(t1, t2, "without yield");});}ta->join();delete ta;for( i = 0; i < procs; i++){tb[i]->join();delete tb[i];};delete tb; }ta?線程因為需要經常切換去重新等待調度,它運行的時間要比?tb?要多,比如在作者的機器上運行得到如下結果:
without?yield?elapse?0.050199s
without?yield?elapse?0.051042s
without?yield?elapse?0.05139s
without?yield?elapse?0.048782s
with?yield?elapse?1.63366s
real????0m1.643s
user????0m1.175s
sys?0m0.611s
ta?線程即使扣除系統調用運行時間?0.611s?之后,它的運行時間也遠大于沒有進行切換的線程。
C++11?沒有提供調整線程的調度策略或者優先級的能力,如果需要,只能通過調用相關的?pthread?函數來進行,需要的時候,可以通過調用?thread?類實例的?native_handle()?方法或者操作系統?API?pthread_self()?來獲得?pthread?線程?id,作為?pthread?函數的參數。
?
線程間的數據交互和數據爭用?(Data?Racing)
同一個進程內的多個線程之間總是免不了要有數據互相來往的,隊列和共享數據是實現多個線程之間的數據交互的常用方式,封裝好的隊列使用起來相對來說不容易出錯一些,而共享數據則是最基本的也是較容易出錯的,因為它會產生數據爭用的情況,即有超過一個線程試圖同時搶占某個資源,比如對某塊內存進行讀寫等,如下例所示:
清單?11.例子?thread_data_race.cc
static void inc(int *p) {for(int i = 0; i < COUNT; i++)(*p)++; } void threadDataRacing(void) {int a = 0;thread ta( inc, &a);thread tb( inc, &a);ta.join();tb.join();cout << "a=" << a << endl; }這是簡化了的極端情況,我們可以一眼看出來這是兩個線程在同時對&a?這個內存地址進行寫操作,但是在實際工作中,在代碼的海洋中發現它并不一定容易。從表面看,兩個線程執行完之后,最后的?a?值應該是?COUNT?*?2,但是實際上并非如此,因為簡單如?(*p)++這樣的操作并不是一個原子動作,要解決這個問題,對于簡單的基本類型數據如字符、整型、指針等,C++提供了原子模版類?atomic(#include?<atomic>),而對于復雜的對象,則提供了最常用的鎖機制,比如互斥類?mutex,門鎖?lock_guard,唯一鎖?unique_lock,條件變量?condition_variable?等。
現在我們使用原子模版類?atomic?改造上述例子得到預期結果:
清單?12.例子?thread_atomic.cc
static void inc(atomic<int> *p ) {for(int i = 0; i < COUNT; i++)(*p)++; } void threadDataRacing(void) {atomic<int> a(0) ;thread ta( inc, &a);thread tb( inc, &a);ta.join();tb.join();cout << "a=" << a << endl; }我們也可以使用?lock_guard,lock_guard?是一個范圍鎖,本質是?RAII(Resource?Acquire?Is?Initialization),在構建的時候自動加鎖,在析構的時候自動解鎖,這保證了每一次加鎖都會得到解鎖。即使是調用函數發生了異常,在清理棧幀的時候也會調用它的析構函數得到解鎖,從而保證每次加鎖都會解鎖,但是我們不能手工調用加鎖方法或者解鎖方法來進行更加精細的資源占用管理,使用?lock_guard?示例如下:
清單?13.例子?thread_lock_guard.cc
static mutex g_mutex; static void inc(int *p ) {for(int i = 0; i < COUNT; i++){lock_guard<mutex> _(g_mutex);(*p)++;} } void threadLockGuard(void) {int a = 0;thread ta( inc, &a);thread tb( inc, &a);ta.join();tb.join();cout << "a=" << a << endl; }如果要支持手工加鎖,可以考慮使用?unique_lock?或者直接使用?mutex。unique_lock?也支持?RAII,它也可以一次性將多個鎖加鎖;如果使用?mutex(#include?<mutex>)?則直接調用?mutex?類的?lock,?unlock,?trylock?等方法進行更加精細的鎖管理:
清單?14.例子?thread_mutex.cc
static mutex g_mutex; static void inc(int *p ) {thread_local int i; // TLS 變量for(; i < COUNT; i++){g_mutex.lock();(*p)++;g_mutex.unlock();} } void threadMutex(void) {int a = 0;thread ta( inc, &a);thread tb( inc, &a);ta.join();tb.join();cout << "a=" << a << endl; }在上例中,我們還使用了線程本地存儲?(TLS,?其實現原理類似于線程特定數據,?關于線程特定數據的詳細說明請參考我的前面的一篇博客)?變量,我們只需要在變量前面聲明它是?thread_local?即可。TLS?變量在線程棧內分配,線程棧只有在線程創建之后才生效,在線程退出的時候銷毀,需要注意不同系統的線程棧的大小是不同的,如果?TLS?變量占用空間比較大,需要注意這個問題。TLS?變量一般不能跨線程,其初始化在調用線程第一次使用這個變量時進行,默認初始化為?0。
對于線程間的事件通知,C++11?提供了條件變量類?condition_variable(#include?<condition_variable>),可視為?pthread_cond_t?的封裝,使用條件變量可以讓一個線程等待其它線程的通知?(wait,wait_for,wait_until),也可以給其它線程發送通知?(notify_one,notify_all),條件變量必須和鎖配合使用(與pthread_cond_t類似),在等待時因為有解鎖和重新加鎖,所以,在等待時必須使用可以手工解鎖和加鎖的鎖,比如?unique_lock,而不能使用?lock_guard,示例如下:
清單?15.例子?thread_cond_var.cc
# define THREAD_COUNT 10 mutex m; condition_variable cv;void threadCondVar(void) {thread** t = new thread*[THREAD_COUNT];int i;for(i = 0; i < THREAD_COUNT; i++){t[i] = new thread( [](int index){unique_lock<mutex> lck(m);cv.wait_for(lck, chrono::hours(1000));cout << index << endl;}, i );this_thread::sleep_for( chrono::milliseconds(50));}for(i = 0; i < THREAD_COUNT; i++){lock_guard<mutex> _(m);cv.notify_one();}for(i = 0; i < THREAD_COUNT; i++){t[i]->join();delete t[i];}delete t; }從上例的運行結果也可以看到,條件變量是不保證次序的,即首先調用?wait?的不一定首先被喚醒。
?
幾個高級概念
C++11?提供了若干多線程編程的高級概念:promise/future(#include?<future>),?packaged_task,?async,來簡化多線程編程,尤其是線程之間的數據交互比較簡單的情況下,讓我們可以將注意力更多地放在業務處理上。
promise/future?可以用來在線程之間進行簡單的數據交互,而不需要考慮鎖的問題,線程?A?將數據保存在一個?promise?變量中,另外一個線程?B?可以通過這個?promise?變量的?get_future()?獲取其值,當線程?A?尚未在?promise?變量中賦值時,線程?B?也可以等待這個?promise?變量的賦值:
清單?16.例子?thread_promise_future.cc
promise<string> val; static void threadPromiseFuture() {thread ta([](){future<string> fu = val.get_future();cout << "waiting promise->future" << endl;cout << fu.get() << endl;});thread tb([](){this_thread::sleep_for( chrono::milliseconds(100) );val.set_value("promise is set");});ta.join();tb.join(); }一個?future?變量只能調用一次?get(),如果需要多次調用?get(),可以使用?shared_future,通過?promise/future?還可以在線程之間傳遞異常。
如果將一個?callable?對象和一個?promise?組合,那就是?packaged_task,它可以進一步簡化操作:
清單?17.例子?thread_packaged_task.cc
static mutex g_mutex; static void threadPackagedTask() {auto run = [=](int index){{lock_guard<mutex> _(g_mutex);cout << "tasklet " << index << endl;}this_thread::sleep_for( chrono::seconds(10) );return index * 1000;};packaged_task<int(int)> pt1(run);packaged_task<int(int)> pt2(run);thread t1([&](){pt1(2);} );thread t2([&](){pt2(3);} );int f1 = pt1.get_future().get();int f2 = pt2.get_future().get();cout << "task result=" << f1 << endl;cout << "task result=" << f2 << endl;t1.join();t2.join(); }我們還可以試圖將一個?packaged_task?和一個線程組合,那就是?async()?函數。使用?async()?函數啟動執行代碼,返回一個?future?對象來保存代碼返回值,不需要我們顯式地創建和銷毀線程等,而是由?C++11?庫的實現決定何時創建和銷毀線程,以及創建幾個線程等,示例如下:
清單?18.例子?thread_async.cc
static long do_sum(vector<long> *arr, size_t start, size_t count) {static mutex _m;long sum = 0;for(size_t i = 0; i < count; i++){sum += (*arr)[start + i];}{lock_guard<mutex> _(_m);cout << "thread " << this_thread::get_id()<< ", count=" << count<< ", sum=" << sum << endl;}return sum; }static void threadAsync() { # define COUNT 1000000vector<long> data(COUNT);for(size_t i = 0; i < COUNT; i++){data[i] = random() & 0xff;} //vector< future<long> > result;size_t ptc = thread::hardware_concurrency() * 2;for(size_t batch = 0; batch < ptc; batch++){size_t batch_each = COUNT / ptc;if (batch == ptc - 1){batch_each = COUNT - (COUNT / ptc * batch);}result.push_back(async(do_sum, &data, batch * batch_each, batch_each));}long total = 0;for(size_t batch = 0; batch < ptc; batch++){total += result[batch].get();}cout << "total=" << total << endl; }如果是在多核或者多?CPU?的環境上面運行上述例子,仔細觀察輸出結果,可能會發現有些線程?ID?是重復的,這說明重復使用了線程,也就是說,通過使用?async()?還可達到一些線程池的功能。
?
幾個需要注意的地方
thread?同時也是棉線、毛線、絲線等意思,我想大家都能體會面對一團亂麻不知從何處查找頭緒的感受,不要忘了,線程不是靜態的,它是不斷變化的,請想像一下面對一團會動態變化的亂麻的情景。所以,使用多線程技術的首要準則是我們自己要十分清楚我們的線程在哪里?線頭(線程入口和出口)在哪里?先安排好線程的運行,注意不同線程的交叉點(訪問或者修改同一個資源,包括內存、I/O?設備等),盡量減少線程的交叉點,要知道幾條線堆在一起最怕的是互相打結。
當我們的確需要不同線程訪問一個共同的資源時,一般都需要進行加鎖保護,否則很可能會出現數據不一致的情況,從而出現各種時現時不現的莫名其妙的問題,加鎖保護時有幾個問題需要特別注意:一是一個線程內連續多次調用非遞歸鎖?(non-recursive?lock)?的加鎖動作,這很可能會導致異常;二是加鎖的粒度;三是出現死鎖?(deadlock),多個線程互相等待對方釋放鎖導致這些線程全部處于罷工狀態。
第一個問題只要根據場景調用合適的鎖即可,當我們可能會在某個線程內重復調用某個鎖的加鎖動作時,我們應該使用遞歸鎖?(recursive?lock),在?C++11?中,可以根據需要來使用?recursive_mutex,或者?recursive_timed_mutex。
第二個問題,即鎖的粒度,原則上應該是粒度越小越好,那意味著阻塞的時間越少,效率更高,比如一個數據庫,給一個數據行?(data?row)?加鎖當然比給一個表?(table)?加鎖要高效,但是同時復雜度也會越大,越容易出錯,比如死鎖等。
對于第三個問題我們需要先看下出現死鎖的條件:
????資源互斥,某個資源在某一時刻只能被一個線程持有?(hold);
????請求和保持,持有一個以上的互斥資源的線程在等待被其它進程持有的互斥資源;
????不可搶占,只有在某互斥資源的持有線程釋放了該資源之后,其它線程才能去持有該資源;
????環形等待,有兩個或者兩個以上的線程各自持有某些互斥資源,并且各自在等待其它線程所持有的互斥資源。
我們只要不讓上述四個條件中的任意一個不成立即可。在設計的時候,非常有必要先分析一下會否出現滿足四個條件的情況,特別是檢查有無試圖去同時保持兩個或者兩個以上的鎖,當我們發現試圖去同時保持兩個或者兩個以上的鎖的時候,就需要特別警惕了。下面我們來看一個簡化了的死鎖的例子:
清單?19.例子?thread_deadlock.cc
static mutex g_mutex1, g_mutex2; static void inc1(int *p ) {for(int i = 0; i < COUNT; i++){g_mutex1.lock();(*p)++;g_mutex2.lock();// do something.g_mutex2.unlock();g_mutex1.unlock();} } static void inc2(int *p ) {for(int i = 0; i < COUNT; i++){g_mutex2.lock();g_mutex1.lock();(*p)++;g_mutex1.unlock();// do other thing.g_mutex2.unlock();} } void threadMutex(void) {int a = 0;thread ta( inc1, &a);thread tb( inc2, &a);ta.join();tb.join();cout << "a=" << a << endl; }在這個例子中,g_mutex1?和?g_mutex2?都是互斥的資源,任意時刻都只有一個線程可以持有(加鎖成功),而且只有持有線程調用?unlock?釋放鎖資源的時候其它線程才能去持有,滿足條件?1?和?3,線程?ta?持有了?g_mutex1?之后,在釋放?g_mutex1?之前試圖去持有?g_mutex2,而線程?tb?持有了?g_mutex2?之后,在釋放?g_mutex2?之前試圖去持有?g_mutex1,滿足條件?2?和?4,這種情況之下,當線程?ta?試圖去持有?g_mutex2?的時候,如果?tb?正持有?g_mutex2?而試圖去持有?g_mutex1?時就發生了死鎖。在有些環境下,可能要多次運行這個例子才出現死鎖,實際工作中這種偶現特性讓查找問題變難。要破除這個死鎖,我們只要按如下代碼所示破除條件?3?和?4?即可:
清單?20.例子?thread_break_deadlock.cc
static mutex g_mutex1, g_mutex2; static void inc1(int *p ) {for(int i = 0; i < COUNT; i++){g_mutex1.lock();(*p)++;g_mutex1.unlock();g_mutex2.lock();// do something.g_mutex2.unlock();} } static void inc2(int *p ) {for(int i = 0; i < COUNT; i++){g_mutex2.lock();// do other thing.g_mutex2.unlock();g_mutex1.lock();(*p)++;g_mutex1.unlock();} } void threadMutex(void) {int a = 0;thread ta( inc1, &a);thread tb( inc2, &a);ta.join();tb.join();cout << "a=" << a << endl; }在一些復雜的并行編程場景,如何避免死鎖是一個很重要的話題,在實踐中,當我們看到有兩個鎖嵌套加鎖的時候就要特別提高警惕,它極有可能滿足了條件?2?或者?4。
?
結束語
上述例子在?CentOS?6.5,g++?4.8.1/g++4.9?以及?clang?3.5?下面編譯通過,在編譯的時候,請注意下述幾點:
????設置?-std=c++11;
????鏈接的時候設置?-pthread;
????使用?g++編譯鏈接時設置?-Wl,–no-as-needed?傳給鏈接器,有些版本的?g++需要這個設置;
????設置宏定義?-D_REENTRANT,有些庫函數是依賴于這個宏定義來確定是否使用多線程版本的。
在用?gdb?調試多線程程序的時候,可以輸入命令?info?threads?查看當前的線程列表,通過命令?thread?n?切換到第?n?個線程的上下文,這里的?n?是?info?threads?命令輸出的線程索引數字,例如,如果要切換到第?2?個線程的上下文,則輸入命令?thread?2。
聰明地使用多線程,擁抱多線程吧。
注意:?本文只是簡單的概略性的簡單介紹了一下使用C++11進行線程并發編程,?如果讀者需要了解并掌握更加深入的內容,?請參考其他相關書籍或資料,?在此,?我推薦一個博客系列,?作者是中科院計算所碩士,?博客網址:http://www.cnblogs.com/haippy/p/3284540.html
總結
以上是生活随笔為你收集整理的Linux多线程实践(10) --使用 C++11 编写 Linux 多线程程序的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第四步:【核心】工艺及BOM中心
- 下一篇: VIM 快捷键(转)