Windows线程同步API
本文主要總結(jié)創(chuàng)建、結(jié)束線程和WIN32 API提供的一些線程同步方法。同步方法包括用戶態(tài)同步方式:InterLock、CriticalSection、SRWLock和內(nèi)核態(tài)同步方式:Event、Semaphore、Mutex等。本文通過簡(jiǎn)單的例子演示API的使用,沒有包含原理的說明,假定讀者具有其他語言或者平臺(tái)的并發(fā)編程經(jīng)驗(yàn)。
創(chuàng)建、結(jié)束線程
WIN32 API雖然提供了CreateThead和ExitThread方法,但是在C++中,永遠(yuǎn)不應(yīng)該使用這兩個(gè)方法創(chuàng)建或結(jié)束線程。而應(yīng)該使用VC++提供的_beginthread、_beginthreadex方法,相應(yīng)的結(jié)束線程方法_endthread、_endthreadex。后者除了在內(nèi)部調(diào)用CreateThread或ExitThread方法外,還負(fù)責(zé)CRT的初始化或銷毀。雖然有直接結(jié)束線程的方法,但在C++最好通過線程方法正常返回來結(jié)束線程。直接結(jié)束線程時(shí)C++對(duì)象的析構(gòu)函數(shù)不會(huì)被調(diào)用。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | #include "stdafx.h" ?? usingnamespacestd; ?? classObj { public: ????Obj() { cout <<"Obj() called"<< endl; } ????~Obj() { cout <<"~Obj() called"<< endl; } }; ?? unsigned intWINAPI ThreadFunc(void* pvParam){ ????cout <<static_cast<char*>(pvParam) << endl; ????Obj obj; ????_endthreadex(2); ????return1; } ?? int_tmain(intargc, _TCHAR* argv[]) { ????unsignedintthreadId; ????char*param = "param"; ????HANDLEthread = (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, param, 0, &threadId); ?? ????Sleep(100); ????DWORDexitCode; ????GetExitCodeThread(thread, &exitCode); ????cout <<"ExitCode:"<< exitCode << endl; ????system("pause"); ????return0; } |
這段代碼的輸出為:
param
Obj() called
ExitCode:2
請(qǐng)按任意鍵繼續(xù). . .
_beginthreadex的第一個(gè)參數(shù)為SECURITY_ATTRIBUTES結(jié)構(gòu)指針,可以指定NULL使用默認(rèn)的安全配置。第二參數(shù)為cbStackSize,線程棧大小;設(shè)置為0使用默認(rèn)值,可以通過鏈接參數(shù)/STACK:[reserve][,commit]控制。第三個(gè)參數(shù)為線程入口方法地址,方法簽名如ThreadFunc所示。第四個(gè)三處為傳遞給入口方法的參數(shù)(值傳遞),具體意義由程序自己解釋。最后一個(gè)參數(shù)是返回的線程ID。返回值為新創(chuàng)建線程的句柄。__endthreadex方法唯一的參數(shù)指定線程的ExitCode。可以通過GetExitCodeThread方法獲得線程退出碼。
InterLocked系列原子方法
InterLocked系列方法可視為原子的。完成其功能時(shí),保證其他線程不會(huì)訪問同一個(gè)資源。例如最簡(jiǎn)單的InterLockedIncrement方法原子自增一個(gè)共享變量。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | longg_sum(0); ?? unsigned intWINAPI ThreadFunc(void* pvParam){ ????for(inti = 0; i < 100000; ++i) ????{ ????????InterlockedIncrement(&g_sum); ????} ????return0; } ?? int_tmain(intargc, _TCHAR* argv[]) { ????unsignedintthreadId; ????HANDLEthread1 = (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, NULL, 0, &threadId); ????HANDLEthread2 = (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, NULL, 0, &threadId); ?? ????Sleep(1000); ????cout <<"Sum:" << g_sum << endl; ????system("pause"); ????return0; } |
其他方法包括:
InterlockedIncrement64:自增一個(gè)64位整數(shù)。
InterlockedExchangeAdd、InterlockedExchangeAdd64:加和兩個(gè)數(shù)并賦值給第一個(gè)值。
InterlockedCompareExchange:比較并交換一個(gè)數(shù)。
還有很多InterLocked方法,具體參考MSDN文檔。
CriticalSection
通過EnterCriticalSection和LeaveCriticalSection方法,可以控制同步一段代碼的訪問。使用前需要使用InitializeCriticalSection初始化CRITICAL_SECTION。使用方法也很簡(jiǎn)單。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | CRITICAL_SECTION g_cs; longg_sum(0); ?? unsigned intWINAPI ThreadFunc(void* pvParam){ ????for(inti = 0; i < 100000; ++i) ????{ ????????EnterCriticalSection(&g_cs); ????????g_sum += 2; ????????LeaveCriticalSection(&g_cs); ????} ????return0; } ?? int_tmain(intargc, _TCHAR* argv[]) { ????InitializeCriticalSection(&g_cs); ????unsignedintthreadId; ????HANDLEthread1 = (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, NULL, 0, &threadId); ????HANDLEthread2 = (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, NULL, 0, &threadId); ?? ????Sleep(1000); ????cout <<"Sum:" << g_sum << endl; ????DeleteCriticalSection(&g_cs); ????system("pause"); ????return0; } |
這里有一個(gè)問題是,如果同步的代碼塊不是簡(jiǎn)單g_sum += 2,而是可能拋出異常的復(fù)雜代碼。就需要確保LeaveCriticalSection一定被調(diào)用。不再使用后使用DeleteCriticalSection方法刪除之。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | classCSManager { public: ????CSManager(CRITICAL_SECTION *cs) : m_cs(cs) ????{ ????????EnterCriticalSection(m_cs); ????} ????~CSManager() ????{ ????????LeaveCriticalSection(m_cs); ????} private: ????CRITICAL_SECTION *m_cs; }; ?? //... ????for(inti = 0; i < 100000; ++i) ????{ ????????CSManager CSMgr(&g_cs); ????????g_sum += 2; ????} //... |
CSManager在構(gòu)造函數(shù)中調(diào)用EnterCriticalSection,析構(gòu)函數(shù)中調(diào)用LeaveCriticalSection。保證在代碼塊結(jié)束時(shí)調(diào)用Leave方法。
另外除了使用阻塞的Enter方法,還有一個(gè)TryEnterCriticalSection,該方法嘗試進(jìn)去CriticalSetion,如果失敗,不會(huì)阻塞,而是立即返回FALSE。
SRWLOCK
SRWLOCK具有和CriticalSection類似的功能。另外還具有讀寫鎖分離的分離的功能。可以使用AcquireSRWLockShared獲取共享的讀鎖。使用AcquireSRWLockExclusive獲取獨(dú)占的寫鎖。使用對(duì)應(yīng)的ReleaseSRWLockShared/Exclusive方法施放鎖。同樣地,使用前需要使用InitializeSRWLock初始化。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | SRWLOCK g_lock; longg_sum(0); ?? unsigned intWINAPI ReadThreadFunc(void* pvParam){ ????for(inti = 0; i < 10; ++i) ????{ ????????AcquireSRWLockShared(&g_lock); ????????cout << g_sum << endl; ????????ReleaseSRWLockShared(&g_lock); ????????Sleep(1); ????} ????return0; } ?? unsigned intWINAPI WriteThreadFunc(void* pvParam){ ????for(inti = 0; i < 100000; ++i) ????{ ????????AcquireSRWLockExclusive(&g_lock); ????????g_sum += 2; ????????ReleaseSRWLockExclusive(&g_lock); ????} ????return0; } ?? int_tmain(intargc, _TCHAR* argv[]) { ????InitializeSRWLock(&g_lock); ?? ????unsignedintthreadId; ????HANDLEthread1 = (HANDLE)_beginthreadex(NULL, 0, ReadThreadFunc, NULL, 0, &threadId); ????HANDLEthread2 = (HANDLE)_beginthreadex(NULL, 0, ReadThreadFunc, NULL, 0, &threadId); ????HANDLEthread3 = (HANDLE)_beginthreadex(NULL, 0, WriteThreadFunc, NULL, 0, &threadId); ?? ????Sleep(1000); ????cout <<"Sum:" << g_sum << endl; ????system("pause"); ????return0; } |
SRWLOCK不具備類似于TryEnterCriticalSection的非阻塞方法。大多數(shù)情況下,SRWLOCK比CRITICAL_SECTION有更好的性能。
Condition Variable
為實(shí)現(xiàn)近點(diǎn)的生產(chǎn)者消費(fèi)者問題。我們可以使用兩個(gè)CONDITION_VARIABLE:g_full,g_empty來實(shí)現(xiàn)。在緩沖區(qū)滿的時(shí)候,生產(chǎn)者線程調(diào)用SleepConditionVariableSRW(&g_full, &g_lock, INFINITE, 0)施放獲得的鎖并等待g_full。緩沖區(qū)空的時(shí)候,消費(fèi)者可以調(diào)用leepConditionVariableSRW(&g_empty, &g_lock, INFINITE, 0)施放獲得的鎖并等待g_empty。掉進(jìn)滿足后,可是使用WakeAllConditionVariable喚醒所有等待的線程或者使用WakeConditionVariable喚醒一個(gè)等待的線程。
和Condition Variable配置使用的可以使CrticalSection也可以使SRWLock。
| 1 2 3 4 5 6 7 8 9 10 | BOOLSleepConditionVariableCS( ???PCONDITION_VARIABLE pConditionVariable, ???PCRITICAL_SECTION pCriticalSection, ???DWORDdwMilliseconds); ?? BOOLSleepConditionVariableSRW( ???PCONDITION_VARIABLE pConditionVariable, ???PSRWLOCK pSRWLock, ???DWORDdwMilliseconds, ???ULONGFlags); |
參數(shù)dwMilliseconds指定等待超時(shí)的時(shí)間,如果超時(shí)方法返回FASLE;INFINITE指定等待不超時(shí)。參數(shù)Flags指定被喚醒時(shí)嘗試獲得的鎖的類型。CONDITION_VARIABLE_LOCKMODE_ SHARED指定獲得共享鎖或者0指定獲得獨(dú)占鎖。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | constintMAX_SIZE = 10; ?? CONDITION_VARIABLE g_full; CONDITION_VARIABLE g_empty; SRWLOCK g_lock; ?? list<Product> products; ?? unsigned intWINAPI ProduceThreadFunc(void* pvParam) { ????inti(0); ????while(true) ????{ ????????Sleep(rand() % 100); ????????AcquireSRWLockExclusive(&g_lock); ?? ????????if(products.size() >= MAX_SIZE) ????????{ ????????????SleepConditionVariableSRW(&g_full, &g_lock, INFINITE, 0); ????????} ????????else ????????{ ????????????cout <<"Produce Product:"<< i << " by thread "<< GetThreadId(GetCurrentThread()) << endl; ????????????products.push_back(Product(i++)); ????????} ?? ????????WakeAllConditionVariable(&g_empty); ????????ReleaseSRWLockExclusive(&g_lock); ?????????? ????} ????return0; } ?? unsigned intWINAPI ConsumeThreadFunc(void* pvParam) { ????while(true) ????{ ????????Sleep(rand() % 100); ?? ????????AcquireSRWLockExclusive(&g_lock); ????????if(products.size() == 0) ????????{ ????????????SleepConditionVariableSRW(&g_empty, &g_lock, INFINITE, 0); ????????} ????????else ????????{ ????????????Product p = products.front(); ????????????products.pop_front(); ????????????cout <<"Consume Product:"<< p.m_no << " by thread "<< GetThreadId(GetCurrentThread()) << endl; ????????} ?? ????????WakeAllConditionVariable(&g_full); ????????ReleaseSRWLockExclusive(&g_lock); ?????????? ?? ????} ????return0; } ?? int_tmain(intargc, _TCHAR* argv[]) { ????srand((unsigned)time(NULL)); ????InitializeSRWLock(&g_lock); ????unsignedintthreadId; ????HANDLEthread1 = (HANDLE)_beginthreadex(NULL, 0, ProduceThreadFunc, NULL, 0, &threadId); ????HANDLEthread2 = (HANDLE)_beginthreadex(NULL, 0, ConsumeThreadFunc, NULL, 0, &threadId); ????HANDLEthread3 = (HANDLE)_beginthreadex(NULL, 0, ConsumeThreadFunc, NULL, 0, &threadId); ?? ????WaitForSingleObject(thread1, INFINITE); ????WaitForSingleObject(thread2, INFINITE); ????WaitForSingleObject(thread3, INFINITE); ????system("pause"); ????return0; } |
內(nèi)核態(tài)線程同步方法
除了上面介紹的用戶態(tài)的線程同步方法。本文繼續(xù)通過幾個(gè)簡(jiǎn)單例子演示內(nèi)核態(tài)的線程同步方法的使用。內(nèi)核態(tài)線程同步方法在性能上肯定比用戶態(tài)同步方法要差很多。但可以在多個(gè)進(jìn)程間共享。
創(chuàng)建所有的內(nèi)核態(tài)同步對(duì)象都范圍一個(gè)內(nèi)核對(duì)象句柄HANDLE。通過WaitForSingleObject或者WaitForMultipleObjects等待內(nèi)核同步對(duì)象轉(zhuǎn)換為已傳信狀態(tài)(signaled)。如果等待的是線程或者進(jìn)程對(duì)象,那么對(duì)應(yīng)線程或進(jìn)程結(jié)束后即轉(zhuǎn)換為已傳信狀態(tài)。同時(shí)還可以指定一個(gè)超時(shí)時(shí)間。WaitForSingleObject包括WAIT_OBJECT_0,WAIT_TIMEOUT和WAIT_FAILED。不再使用后調(diào)用CloseHandle釋放引用。
| 1 2 3 4 5 6 7 8 9 10 11 12 | DWORDdw = WaitForSingleObject(hProcess, 5000); switch(dw) { ???caseWAIT_OBJECT_0: ?????// The process terminated. ?????break; ???caseWAIT_TIMEOUT: ??????// The process did not terminate within 5000 milliseconds. ??????break; ???caseWAIT_FAILED: ??????// Bad call to function (invalid handle?) ??????break; } |
WaitForMultipleObjects如果指定參數(shù)bWaitAll為TRUE,則等待所有對(duì)象都轉(zhuǎn)換為已傳信狀態(tài)后才返回,如果為指定bWaitAll為FALSE,則任意對(duì)象轉(zhuǎn)換為已傳信狀態(tài)即返回。可以通過以下方法來判斷是那個(gè)內(nèi)核同步對(duì)象。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | h[0] = hProcess1; h[1] = hProcess2; h[2] = hProcess3; DWORDdw = WaitForMultipleObjects(3, h, FALSE, 5000); switch(dw) { ???caseWAIT_FAILED: ??????// Bad call to function (invalid handle?) ??????break; ?? ???caseWAIT_TIMEOUT: ??????// None of the objects became signaled within 5000 milliseconds. ??????break; ?? ???caseWAIT_OBJECT_0 + 0: ??????// The process identified by h[0] (hProcess1) terminated. ??????break; ?? ???caseWAIT_OBJECT_0 + 1: ??????// The process identified by h[1] (hProcess2) terminated. ??????break; ?? ???caseWAIT_OBJECT_0 + 2: ??????// The process identified by h[2] (hProcess3) terminated. ??????break; } |
Event
Event語義上可以理解為一個(gè)事件是否發(fā)生。SetEvent方法設(shè)置Event為Signaled狀態(tài)。Event有兩種類型。第一種是自動(dòng)重置的事件,調(diào)用SetEvent方法后,喚醒一個(gè)等待的線程后即自動(dòng)轉(zhuǎn)換為未傳信狀態(tài)。第二種是手動(dòng)重置事件,調(diào)用SetEvent方法后,需要調(diào)用ResetEvent方法設(shè)置事件為未傳信狀態(tài)。PulseEvent相當(dāng)于調(diào)用SetEvent后立即調(diào)用ResetEvent。對(duì)于手動(dòng)重置時(shí)間,PulseEvent會(huì)喚醒所有等待的線程。而對(duì)于自動(dòng)重置的事件PulseEvent只會(huì)喚醒一個(gè)等待的線程。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | HANDLEg_taskEvent; ?? unsigned intWINAPI ComputationTask(void* pvParam) { ????WaitForSingleObject(g_taskEvent, INFINITE); ????for(inti = 0; i < 10; ++i) ????{ ????????cout <<"comput "<< i << endl; ????} ????return0; } ?? int_tmain(intargc, _TCHAR* argv[]) { ????g_taskEvent = CreateEvent(NULL, FALSE, FALSE, NULL); ????unsignedintthreadId; ????HANDLEthread1 = (HANDLE)_beginthreadex(NULL, 0, ComputationTask, NULL, 0, &threadId); ?? ????system("pause"); ????SetEvent(g_taskEvent); ????ResetEvent(g_taskEvent); ?? ????WaitForSingleObject(thread1, INFINITE); ????system("pause"); ????return0; } |
上面是一個(gè)簡(jiǎn)單的例子,ComputationTask線程等待用戶輸入后才開始計(jì)算任務(wù)。
Semaphore
Semaphore維護(hù)一個(gè)資源計(jì)數(shù)count和一個(gè)最大計(jì)數(shù)maxCount。
當(dāng)count大于0時(shí),semaphore處于已傳信狀態(tài)。
當(dāng)count等于0是,semaphore處于未傳信狀態(tài)。
通過ReleaseSemaphore增加count計(jì)數(shù),WaitForSingleObject減少cout計(jì)數(shù)。count不會(huì)小于0,也不能大于maxCount。
例如,可以使用semaphore控制能夠同時(shí)處理的最大任務(wù)線程數(shù)。當(dāng)有超過最大數(shù)的更多任務(wù)線程開啟時(shí)只能等待其他任務(wù)完成并調(diào)用ReleaseSemaphore方法施放資源引用計(jì)數(shù)。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | HANDLEg_semaphore; ?? unsigned intWINAPI RequstProcesor(void* pvParam) { ????WaitForSingleObject(g_semaphore, INFINITE); ????cout <<"Start process request "<< GetThreadId(GetCurrentThread()) << endl; ????Sleep(1000); ????ReleaseSemaphore(g_semaphore, 1, NULL); ????return0; } ?? int_tmain(intargc, _TCHAR* argv[]) { ?? ????g_semaphore = CreateSemaphore(NULL, 2, 2, NULL); ?????? ????HANDLEthreads[10]; ????for(inti = 0; i < 10; i++) ????{ ????????threads[i] = (HANDLE)_beginthreadex(NULL, 0, RequstProcesor, NULL, 0, NULL); ????} ?? ????WaitForMultipleObjects(10, threads, TRUE, INFINITE); ????system("pause"); ????return0; } |
上面的代碼,啟動(dòng)了10個(gè)線程,但只能有2個(gè)現(xiàn)場(chǎng)可以同時(shí)執(zhí)行,更多的線程只能等待。
Mutex
mutex的功能和CriticalSection功能很相似。都是控制一段臨界代碼的互斥訪問。通過WaitForSingleObject等待mutex。ReleaseMutex釋放mutex。
mutex維護(hù)一個(gè)threadId和一個(gè)使用計(jì)數(shù)count。如果CreateMutex的參數(shù)bInitialOwner為TRUE,這threadId為調(diào)用線程,cout為1。否則都初始為0。
如果threadId為0,mutex沒有被任何線程使用,處于已傳信狀態(tài)。如果threadId不為0,mutex處于未傳信狀態(tài)。mutex和其他內(nèi)核同步對(duì)象一個(gè)不同的特殊地方在于。即時(shí)mutex處于未傳信狀態(tài)。如果調(diào)用WaitForSingleObject的線程是mutex的threadId對(duì)應(yīng)的線程,WaitForSingleObject不會(huì)阻塞相當(dāng)于處于已傳信狀態(tài)。下面的例子演示了mutex的使用。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | HANDLEg_mutex; ?? voidProcessA() { ????WaitForSingleObject(g_mutex, INFINITE); ????cout <<"ProcessA"<< " by thread "<< GetThreadId(GetCurrentThread()) << endl; ????ReleaseMutex(g_mutex); } ?? voidProcessB() { ????WaitForSingleObject(g_mutex, INFINITE); ????ProcessA(); ????cout <<"ProcessB"<< " by thread "<< GetThreadId(GetCurrentThread()) << endl; ????ReleaseMutex(g_mutex); } ?? unsigned intWINAPI ThreadFunc(void* pvParam) { ????ProcessB(); ????return0; } ?? int_tmain(intargc, _TCHAR* argv[]) { ????g_mutex = CreateMutex(NULL, FALSE, NULL); ?????? ????HANDLEthreads[10]; ????for(inti = 0; i < 10; i++) ????{ ????????threads[i] = (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, NULL, 0, NULL); ????} ?? ????WaitForMultipleObjects(10, threads, TRUE, INFINITE); ????system("pause"); ????return0; } |
總結(jié)
以上是生活随笔為你收集整理的Windows线程同步API的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Windows线程同步机制的区别与比较及
- 下一篇: 分布式系统设计原理与方案