UE4异步编程专题 - 线程池FQueuedThreadPool
1. FQueuedThreadPool & IQueuedWork
FQueuedThreadPool是UE4中抽象出的線程池。線程池由若干個Worker線程,和一個同步隊列構成。UE4把同步隊列執行的任務抽象為IQueuedWork. 線程池的同步隊列,就是一個IQueuedWork的隊列了。借用wiki上線程池的圖, UE4的FQueuedThreadPool也是如圖中所示的結構:
Thread pool
生產者生產IQueuedWork的實例對象。線程池會向生產者提供入隊的接口。線程池中的Worker線程都是消費者,會不停地從隊列中取出IQueuedWork,并執行work.
下面的代碼就是FQueuedThreadPool給用戶使用的接口:
class CORE_API FQueuedThreadPool { public:// 創建線程池,指定線程數,還有每個worker棧大小及優先級virtual bool Create( uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority) = 0;// 銷毀線程池,對Task Queue和每個worker線程執行清理操作virtual void Destroy() = 0;// 生產者使用這個接口,向同步隊列添加IQueuedWorkvirtual void AddQueuedWork(IQueuedWork* InQueuedWork) = 0;// 生產者使用這個接口,嘗試刪除一個IQueuedWorkvirtual bool RetractQueuedWork(IQueuedWork* InQueuedWork) = 0;// 獲取線程池中worker線程的數目virtual int32 GetNumThreads() const = 0; };需要提及的是,RetractQueuedWork接口只能嘗試去刪除或取消一個work對象。如果work不在隊列當中,或者請求刪除時已經在執行和執行完成,都無法取消。
IQueuedWork是同步隊列中,任務對象的抽象。代碼如下:
class IQueuedWork { public:virtual void DoThreadedWork() = 0;virtual void Abandon() = 0; };IQueuedWork的接口很簡單,我們只需要實現代碼中的兩個接口,分別是任務的執行流程和廢棄當前任務的接口。
2. FQueuedThread
FQueuedThread就是線程池worker線程的實現了。它是一個FRunnable的實現類,并內聚了一個FRunnableThread的實例對象。
class FQueuedThread : public FRunnable { protected:FRunnableThread* Thread;virtual uint32 Run() override; };FQueuedThread實現的Run函數,就是類似上一篇我們實現的MyRunnable的空閑等待的流程。我們回顧一下,實現所需的部件:
按照上面的思路,我們繼續補完代碼:
class FQueuedThread : public FRunnable { protected:FEvent* DoWorkEvent;TAtomic<bool> TimeToDie;FRunnableThread* Thread;virtual uint32 Run() override{while(TimeToDie.Load(EMemoryOrder::Relaxed)){DoWorkEvent->Wait();// TODO ... do work}} };這樣的實現有很嚴重的缺陷。無窮時間的等待,線程被掛起后,UE4無法獲取這些線程的狀態了。因此,UE4采用的是等待10ms,再check是否繼續等待。
while(TimeToDie.Load(EMemoryOrder::Relaxed)) {bool bContinueWaiting = true; while(bContinueWaiting){DECLARE_SCOPE_CYCLE_COUNTER(...); // record statusbContinueWaiting = !DoWorkEvent->Wait( 10 );}// TODO ... do work }被喚醒后意味著兩種情況:
把執行Work的代碼加入,如下所示:
class FQueuedThread : public FRunnable { protected:FEvent* DoWorkEvent;TAtomic<bool> TimeToDie;FRunnableThread* Thread;IQueuedWork* volatile QueuedWork;virtual uint32 Run() override{while(TimeToDie.Load(EMemoryOrder::Relaxed)){bool bContinueWaiting = true; while(bContinueWaiting){DECLARE_SCOPE_CYCLE_COUNTER(...); // record statusbContinueWaiting = !DoWorkEvent->Wait( 10 );}IQueuedWork* LocalQueuedWork = QueuedWork;QueuedWork = nullptr;FPlatformMisc::MemoryBarrier();check(LocalQueuedWork || TimeToDie.Load(EMemoryOrder::Relaxed));while (LocalQueuedWork){LocalQueuedWork->DoThreadedWork();LocalQueuedWork = OwningThreadPool->ReturnToPoolOrGetNextJob(this);} }return 0;} };QueuedWork就是需要執行的work對象的指針,它被volatile修飾,說明還有其他的線程會修改這個指針,防止編譯器生成直接從緩存中讀取代碼的優化。check方法,明顯地指明了被喚醒時只有前面提及的兩種情況。如果Work不為空,則調用IQueuedWork的DoThreadedWork接口。任務完成后的下一行代碼,就是向所屬線程池的同步隊列再申請一個任務。如果隊列中有任務,則繼續執行新的任務。若隊列已經為空,則將線程歸還到線程池。線程池有一個QueuedThreads成員,記錄線程池中的空閑的線程。
個人覺得UE4在check之后的實現略有不妥。在同時有Work要執行和TimeToDie為true時,UE4選擇了繼續執行完Work再退出。筆者認為TimeToDie為true時,應該放棄執行當前的work,直接退出。當然,這里不同的策略差別也不大,也不重要。
還有一個重要的函數,就是FQueuedThread::DoWork. 它是由生產者調用線程池的AddQueuedWork,線程池對象在進行調度的時候調用的。DoWork函數代碼如下:
void FQueuedThread::DoWork(IQueuedWork* InQueuedWork) {// ...QueuedWork = InQueuedWork;FPlatformMisc::MemoryBarrier();// Tell the thread to wake up and do its jobDoWorkEvent->Trigger(); }值得提及的是兩個函數中的內存屏障代碼,FPlatformMisc::MemoryBarrier(). DoWork中會對QueuedWork進行寫操作,而在Run函數中會對QueuedWork進行讀操作,而且DoWork與Run發生在不同的線程,這樣就產生了競爭條件(race condition). 一般的情況是上一個mutex lock,而UE4卻沒有,只使用了內存屏障。原因是這個競爭條件發生的時候,有且僅有一個線程寫,有且僅有一個線程讀;并且DoWork中的DoWorkEvent->Trigger(),發出一個事件告知已經準備好一個IQueuedWork,一定發生在Run函數中讀取IQueuedWork之前。所以UE4使用內存屏障來保證順序一致性,讓Run函數從另外一個線程讀取IQueuedWork時,能夠讀取到已經同步過后的值。關于無鎖編程,大家感興趣可以上purecpp的相關專題一起討論。
3. FQueuedThreadPoolBase
再來看看線程池的實現類。FQueuedThreadPool的實現類只有一個,就是FQueuedThreadPoolBase類。我們從它的數據成員,可以很清晰地可以看出,該線程池的結構與第一節的所示的線程池的結構圖是基本吻合的:
class FQueuedThreadPoolBase : public FQueuedThreadPool { protected:/** The work queue to pull from. */TArray<IQueuedWork*> QueuedWork;/** The thread pool to dole work out to. */TArray<FQueuedThread*> QueuedThreads;/** All threads in the pool. */TArray<FQueuedThread*> AllThreads;/** The synchronization object used to protect access to the queued work. */FCriticalSection* SynchQueue;/** If true, indicates the destruction process has taken place. */bool TimeToDie;// .... }數組QueuedWork和互斥鎖SynchQueue,組成了一個線程安全的同步隊列。AllThreads管理著全部的worker線程。TimeToDie是標識線程池生命狀態,如果置為true,線程池的清理工作正在進行,或者已經進行完畢了。還有一個QueuedThreads成員,它管理著空閑的線程,也就是上一節FQueuedThread歸還自己到線程池的空閑隊列。
線程池的創建,會依次創建每個worker線程。線程池銷毀的時候,會依次向每個worker線程發出銷毀的命令,并等待線程退出。線程池的銷毀會放棄還未執行的work. 創建和銷毀的流程較為簡單,就不詳細展開了。后文著重討論生產者向線程池添加work的流程。
生產者創建了一個IQueuedWork實現對象后,會調用第一節提及的AddQueuedWork接口,向線程池添加要執行的work. UE4控制線程池添加work的流程,實現的較為精細。它將線程池的狀態分成了兩類,來分別處理。這兩種狀態分別為: 1. 線程池中還有空閑線程,即QueuedThreads不為空,并且QueuedWork一定為空; 2. 線程池中已經沒有空閑的線程,即QueuedThreads為空;
第一個情景的處理策略是從空閑線程數組中,取一個線程,并直接喚醒該線程執行由生產者當前傳遞進來的work. 第二個情景,較為簡單,由于沒有空閑線程可用,就直接將work入隊即可。
void FQueuedThreadPoolBase::AddQueuedWork(IQueuedWork* InQueuedWork) /*override*/ {// ....FQueuedThread* Thread = nullptr;{FScopeLock sl(SynchQueue);const int32 AvailableThreadCount = QueuedThreads.Num();if (AvailableThreadCount == 0){// situation 2:QueuedWork.Add(InQueuedWork);return;}// situation 1:const int32 ThreadIndex = AvailableThreadCount - 1;Thread = QueuedThreads[ThreadIndex];QueuedThreads.RemoveAt(ThreadIndex, 1, false);}Thread->DoWork(InQueuedWork); }UE4處理情景一的實現,有兩個優點。
第一,UE4并不是簡單地讓每個線程搶占任務隊列中work. 而是在當有空閑線程的時候,小心地獲取一個空閑線程,指定work并喚醒這一個線程。這樣做的好處,是不會出現驚群效應,而讓CPU浪費時間做無用的線程調度。
第二,從代碼中可以看出,UE4每次獲取空閑線程都是取數組的最末尾的空閑線程,也就是最近歸還的work線程。這樣做的好處是,最近歸還的線程意味著它相比其他空閑線程是更近期使用過的。它有更大的概率,操作系統還未對它進行context切換,或者它的context數據還留存在緩存當中。優先使用該線程,就有更大的概率獲取較為低廉的線程切換開銷。
最后,線程池為worker線程提供的,從線程池獲取下一個可用的work和歸還空閑線程的接口,ReturnToPoolOrGetNextJob函數:
IQueuedWork* FQueuedThreadPoolBase::ReturnToPoolOrGetNextJob(FQueuedThread* InQueuedThread) /*override*/ {// ... omitted codesIQueuedWork* Work = nullptr;FScopeLock sl(SynchQueue);// ... omitted codesif (QueuedWork.Num() > 0){Work = QueuedWork[0];QueuedWork.RemoveAt(0, 1, false);}if (!Work)QueuedThreads.Add(InQueuedThread);return Work; }當任務隊列中還有work時,就從隊列頭部取出一個,是一個FIFO的同步隊列。當任務隊列為空,無法取出新的任務時,線程就將自己歸還給到線程池中,標記為空閑隊列。UE4這里實現的不太妥當的就是QueuedWork是一個TArray<IQueuedWork*>數組。數組對非尾部元素的Remove操作,是會對數組元素進行移動的。雖然移動指針并不是很昂貴,而且UE4也禁止了Remove導致的shrink操作,但開銷依然是存在的。這里最好的方案是使用一個可以擴容的環狀隊列。
4. 小結
本文討論了UE4中線程池的實現細節。線程池FQueuedThreadPool的實現是由一個元素為IQueuedWork*的同步隊列,及若干個worker線程所組成。UE4中的線程池,將IQueuedWork隊列化,并用FIFO的調度策略。線程池為IQueuedWork的生產者提供了入隊接口,并為worker線程(消費者)提供了獲取出隊接口。UE4對線程池的性能優化也做了不少的工作。例如避免線程池搶占IQueuedWork時可能會發生的驚群現象,以及取最近使用的線程,還有無鎖編程等。
專題的下一篇,我們將討論UE4中的AsyncTask. 這也是UE4邁向現代C++設計的有力步伐。
總結
以上是生活随笔為你收集整理的UE4异步编程专题 - 线程池FQueuedThreadPool的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: landscape 1
- 下一篇: 2016广发信用卡申请条件及提交材料