用户模式下线程同步
原子訪問、調整緩存行、高級線程同步、關鍵段、Slim讀/寫鎖、條件變量
AcquireSRWLockExclusive
AcquireSRWLockShared
SleepConditionVariableSRW
?
/****************************************************************************** Module: Queue.cpp Notices: Copyright (c) 2008 Jeffrey Richter & Christophe Nasarre ******************************************************************************/#include <Windows.h> #include <process.h> #include <windowsx.h> #include <tchar.h> #include <StrSafe.h> #include <iostream>typedef unsigned (__stdcall *PTHREAD_START) (void *); #define chBEGINTHREADEX(psa, cbStackSize, pfnStartAddr, \pvParam, dwCreateFlags, pdwThreadId) \((HANDLE)_beginthreadex( \(void *) (psa), \(unsigned) (cbStackSize), \(PTHREAD_START) (pfnStartAddr), \(void *) (pvParam), \(unsigned) (dwCreateFlags), \(unsigned *) (pdwThreadId)))///class CQueue { public:struct ELEMENT {int m_nThreadNum;int m_nRequestNum;// Other element data should go here};typedef ELEMENT* PELEMENT;private:struct INNER_ELEMENT {int m_nStamp; // 0 means emptyELEMENT m_element;};typedef INNER_ELEMENT* PINNER_ELEMENT;private:PINNER_ELEMENT m_pElements; // Array of elements to be processedint m_nMaxElements; // Maximum # of elements in the arrayint m_nCurrentStamp; // Keep track of the # of added elementsprivate:int GetFreeSlot();int GetNextSlot(int nThreadNum);public:CQueue(int nMaxElements);~CQueue();BOOL IsFull();BOOL IsEmpty(int nThreadNum);void AddElement(ELEMENT e);BOOL GetNewElement(int nThreadNum, ELEMENT& e); };///CQueue::CQueue(int nMaxElements) {// Allocate and initialize the elementsm_pElements = (PINNER_ELEMENT) HeapAlloc(GetProcessHeap(), 0, sizeof(INNER_ELEMENT) * nMaxElements);ZeroMemory(m_pElements, sizeof(INNER_ELEMENT) * nMaxElements);// Initialize the element counterm_nCurrentStamp = 0;// Remember the max number of elementsm_nMaxElements = nMaxElements; }CQueue::~CQueue() {HeapFree(GetProcessHeap(), 0, m_pElements); }BOOL CQueue::IsFull() {return(GetFreeSlot() == -1); }BOOL CQueue::IsEmpty(int nThreadNum) {return(GetNextSlot(nThreadNum) == -1); }int CQueue::GetFreeSlot() {// Look for the first element with a 0 stampfor(int current = 0; current < m_nMaxElements; current++) {if (m_pElements[current].m_nStamp == 0)return(current);}// No free slot was foundreturn(-1); }int CQueue::GetNextSlot(int nThreadNum) {// By default, there is no slot for this threadint firstSlot = -1;// The element can't have a stamp higher than the last addedint firstStamp = m_nCurrentStamp+1;// Look for the even (thread 0) / odd (thread 1) element that is not free for(int current = 0; current < m_nMaxElements; current++) {// Keep track of the first added (lowest stamp) in the queue// --> so that "first in first out" behavior is ensuredif ((m_pElements[current].m_nStamp != 0) && // free element((m_pElements[current].m_element.m_nRequestNum % 2) == nThreadNum) &&(m_pElements[current].m_nStamp < firstStamp)) {firstStamp = m_pElements[current].m_nStamp;firstSlot = current;}}return(firstSlot); }void CQueue::AddElement(ELEMENT e) {// Do nothing if the queue is fullint nFreeSlot = GetFreeSlot();if (nFreeSlot == -1)return;// Copy the content of the elementm_pElements[nFreeSlot].m_element = e;// Mark the element with the new stampm_pElements[nFreeSlot].m_nStamp = ++m_nCurrentStamp; }BOOL CQueue::GetNewElement(int nThreadNum, ELEMENT& e) {int nNewSlot = GetNextSlot(nThreadNum);if (nNewSlot == -1)return(FALSE);// Copy the content of the elemente = m_pElements[nNewSlot].m_element;// Mark the element as readm_pElements[nNewSlot].m_nStamp = 0;return(TRUE); }///CQueue g_q(10); // The shared queue volatile LONG g_fShutdown;// Signals client/server threads to die HWND g_hWnd; // How client/server threads give status SRWLOCK g_srwLock; // Reader/writer lock to protect the queue CONDITION_VARIABLE g_cvReadyToConsume; // Signaled by writers CONDITION_VARIABLE g_cvReadyToProduce; // Signaled by readers// Handles to all reader/writer threads HANDLE g_hThreads[MAXIMUM_WAIT_OBJECTS];// Number of reader/writer threads int g_nNumThreads = 0;///void AddText(PCTSTR pszFormat, ...) {va_list argList;va_start(argList, pszFormat);TCHAR sz[20 * 1024];_vstprintf_s(sz, _countof(sz), pszFormat, argList);std::wcout<<sz<<std::endl;va_end(argList); }BOOL ConsumeElement(int nThreadNum, int nRequestNum) {// Get access to the queue to consume a new elementAcquireSRWLockShared(&g_srwLock); // Fall asleep until there is something to read.// Check if, while it was asleep, // it was not decided that the thread should stopwhile (g_q.IsEmpty(nThreadNum) && !g_fShutdown) {// There was not a readable elementAddText(TEXT("Consume thread[%d] Nothing to process"), nThreadNum);// The queue is empty// --> Wait until a writer adds a new element to read// and come back with the lock acquired in shared modeSleepConditionVariableSRW(&g_cvReadyToConsume, &g_srwLock, INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED);}// When thread is exiting, the lock should be released for writer// and readers should be signaled through the condition variableif (g_fShutdown) {// Show that the current thread is exitingAddText(TEXT("Consume thread[%d] bye bye"), nThreadNum);// Another writer thread might still be blocked on the lock// --> release it before exitingReleaseSRWLockShared(&g_srwLock);// Notify other readers that it is time to exit// --> release readersWakeConditionVariable(&g_cvReadyToConsume);return(FALSE);}// Get the first new elementCQueue::ELEMENT e;// Note: No need to test the return value since IsEmpty// returned FALSEg_q.GetNewElement(nThreadNum, e);// No need to keep the lock any longerReleaseSRWLockShared(&g_srwLock);// Show result of consuming the elementAddText(TEXT("Consume thread[%d] Processing Generate thread[%d]:%d"), nThreadNum, e.m_nThreadNum, e.m_nRequestNum);// A free slot is now available for writer threads to produce// --> wake up a writer threadWakeConditionVariable(&g_cvReadyToProduce);return(TRUE); }DWORD WINAPI ReaderThread(PVOID pvParam) {int nThreadNum = PtrToUlong(pvParam);for (int nRequestNum = 1; !g_fShutdown; nRequestNum++) {if (!ConsumeElement(nThreadNum, nRequestNum))return(0);Sleep(2500); // Wait before reading another element}// g_fShutdown has been set during Sleep// --> Show that the current thread is exitingAddText(TEXT("Generate thread[%d] bye bye"), nThreadNum);return(0); }///DWORD WINAPI WriterThread(PVOID pvParam) {int nThreadNum = PtrToUlong(pvParam);for (int nRequestNum = 1; !g_fShutdown; nRequestNum++) {CQueue::ELEMENT e = { nThreadNum, nRequestNum };// Require access for writingAcquireSRWLockExclusive(&g_srwLock);// If the queue is full, fall asleep as long as the condition variable // is not signaled// Note: During the wait for acquiring the lock, // a stop might have been receivedif (g_q.IsFull() & !g_fShutdown) {// No more room in the queueAddText(TEXT("Generate thread[%d] Queue is full: impossible to add %d"), nThreadNum, nRequestNum);// --> Need to wait for a reader to empty a slot before acquiring // the lock again SleepConditionVariableSRW(&g_cvReadyToProduce, &g_srwLock, INFINITE, 0);}// Other writer threads might still be blocked on the lock// --> Release the lock and notify the remaining writer threads to quitif (g_fShutdown) {// Show that the current thread is exitingAddText(TEXT("Generate thread[%d] bye bye"), nThreadNum);// No need to keep the lock any longerReleaseSRWLockExclusive(&g_srwLock);// Signal other blocked writers threads that it is time to exitWakeAllConditionVariable(&g_cvReadyToProduce);// Bye byereturn(0);} else {// Add the new ELEMENT into the queueg_q.AddElement(e);Sleep(1500); // Show result of processing elementAddText(TEXT("Generate thread[%d] Adding %d"), nThreadNum, nRequestNum);// No need to keep the lock any longerReleaseSRWLockExclusive(&g_srwLock);// Signal reader threads that there is an element to consumeWakeAllConditionVariable(&g_cvReadyToConsume);// Wait before adding a new elementSleep(1500);}}// Show that the current thread is exitingAddText(TEXT("Generate thread[%d] bye bye"), nThreadNum);return(0); }///void init() {// Prepare the SRWLock to be usedInitializeSRWLock(&g_srwLock);// Prepare the condition variables to be usedInitializeConditionVariable(&g_cvReadyToConsume);InitializeConditionVariable(&g_cvReadyToProduce);// Will be set to TRUE in order to end threadsg_fShutdown = FALSE;// Create the writer threadsDWORD dwThreadID;for (int x = 0; x < 4; x++)g_hThreads[g_nNumThreads++] = chBEGINTHREADEX(NULL, 0, WriterThread, (PVOID) (INT_PTR) x, 0, &dwThreadID);// Create the reader threadsfor (int x = 0; x < 2; x++)g_hThreads[g_nNumThreads++] = chBEGINTHREADEX(NULL, 0, ReaderThread, (PVOID) (INT_PTR) x, 0, &dwThreadID);WaitForMultipleObjects(g_nNumThreads, g_hThreads, TRUE, INFINITE);}///void uninit() {if (!g_fShutdown) {// Ask all threads to endInterlockedExchange(&g_fShutdown, TRUE);// Free all threads waiting on condition variablesWakeAllConditionVariable(&g_cvReadyToConsume);WakeAllConditionVariable(&g_cvReadyToProduce);// Wait for all the threads to terminate & then clean upWaitForMultipleObjects(g_nNumThreads, g_hThreads, TRUE, INFINITE);// Don't forget to clean up kernel resources// Note: This is not really mandatory since the process is exitingwhile (g_nNumThreads--)CloseHandle(g_hThreads[g_nNumThreads]);} }int main() {init();uninit();return(0); }End of File //
總結
- 上一篇: 进程权限
- 下一篇: 内核对象用于线程同步