【转】1.4异步编程:轻量级线程同步基元对象
?
開始《異步編程:同步基元對象(下)》
示例:異步編程:輕量級線程同步基元對象.rar
?????????在《異步編程:線程同步基元對象》中我介紹了.NET4.0之前為我們提供的各種同步基元(包括Interlocked、Monitor\lock、EventWaitHandle、Mutex、Semaphore等),隨著.NET框架的進化,.NET4.0|.NET4.5又為我們帶來了更多優化的同步基元選擇。這當然不是告訴我們完全放棄.NET4.0之前所提供的同步基元,只是需要我們“因地制宜”。那我們如何判斷適合使用哪種同步基元結構呢,就需要我們對各種同步基元有個本質的理解和清楚.NET所做的優化本質是什么。
?
基元用戶模式構造、基元內核模式構造、混合構造
基元線程同步構造分為:基元用戶模式構造和基元內核模式構造。
1.?????????基元用戶模式構造
應盡量使用基元用戶模式構造,因為它們使用特殊的CPU指令來協調線程,這種協調發生硬件中,速度很快。但也因此Windows操作系統永遠檢測不到一個線程在一個用戶模式構造上阻塞了,這種檢測不到有利有弊:
1)?????????利:因為用戶模式構造上阻塞的一個線程,池線程永遠不認為已經阻塞,所以不會出現“線程池根據CPU使用情況誤判創建更多的線程以便執行其他任務,然而新創建的線程也可能因請求的共享資源而被阻塞,惡性循環,徒增線程上下文切換的次數”的問題。
2)?????????弊:當你想要取得一個資源但又短時間取不到時,一個線程會一直在用戶模式中運行,造成CPU資源的浪費,此時我們更希望像內核模式那樣停止一個線程的運行讓出CPU。
在《異步編程:線程同步基元對象》中包含的用戶模式構造有:volatile關鍵字、Interlocked靜態類、Thread的VolatileWrite()與VolatileRead()方法。
2.?????????基元內核模式構造
是Windows操作系統自身提供的。它們要求我們調用在操作系統內核中實現的函數,調用線程將從托管代碼轉換為本地用戶模式代碼,再轉換為本地內核模式代碼,然后還要朝相反的方向一路返回,會浪費大量CPU時間,同時還伴隨著線程上下文切換,因此盡量不要讓線程從用戶模式轉到內核模式。
內核模式的構造具有基元用戶模式構造所不具有的一些優點:
1)?????????一個內核模式的構造檢測到在一個資源上的競爭時,Windows會阻塞輸掉的線程,使它不占著一個CPU“自旋”,無謂地浪費處理器資源。
2)?????????內核模式的構造可實現本地和托管線程相互之間的同步。
3)?????????內核模式的構造可同步在一臺機器的不同進程中運行的線程。
4)?????????內核模式的構造可應用安全性設置,防止未經授權的帳戶訪問它們。
5)?????????一個線程可一直阻塞,直到一個集合中的所有內核模式的構造都可用,或者直到一個集合中的任何一個內核模式的構造可用。
6)?????????在內核模式的構造上阻塞的一個線程可以指定一個超時值;如果在指定的時間內訪問不到希望的資源,線程可以解除阻塞并執行其他任務。
在《異步編程:線程同步基元對象》中包含的內核模式構造有:EventWaitHandle(以及AutoResetEvent與ManualResetEvent)、Mutex、Semaphore。(另外:ReaderWriterLock)
3.?????????混合構造
對于在一個構造上等待的線程,如果擁有這個構造的線程一直不釋放它,則會出現:
1)?????????如果是用戶模式構造,則線程將一直占用CPU,我們稱之為“活鎖”。
2)?????????如果是內核模式構造,則線程將一直被阻塞,我們稱之為“死鎖”。
然后兩者之間,死鎖總是優于活鎖,因為活鎖既浪費CPU時間,又浪費內存。而死鎖只浪費內存。
混合構造正是為了解決這種場景。其通過合并用戶模式和內核模式實現:在沒有線程競爭的時候,混合構造提供了基元用戶模式構造所具有的性能優勢。多個線程同時競爭一個構造的時候,混合構造則使用基元內核模式的構造來提供不“自旋”的優勢。由于在大多數應用程序中,線程都很少同時競爭一個構造,所以在性能上的增強可以使你的應用程序表現得更出色。
混合結構優化的本質:兩階段等待操作
線程上下文切換需要花費幾千個周期(每當線程等待內核事件WaitHandle時都會發生)。我們暫且稱其為C。假如線程所等待的時間小于2C(1C用于等待自身,1C用于喚醒),則自旋等待可以降低等待所造成的系統開銷和滯后時間,從而提升算法的整體吞吐量和可伸縮性。
在多核計算機上,當預計資源不會保留很長一段時間時,如果讓等待線程以用戶模式旋轉數十或數百個周期,然后重新嘗試獲取資源,則效率會更高。如果在旋轉后資源變為可用的,則可以節省數千個周期。如果資源仍然不可用,則只花費了少量周期,并且仍然可以進行基于內核的等待。這一旋轉-等待的組合稱為“兩階段等待操作”。
?????????在《異步編程:線程同步基元對象》中包含的有:Monitor\lock;
本節將給大家介紹.NET4.0中加入的混合結構:ManualResetEventSlim、SemaphoreSlim、CountdownEvent、Barrier、ReaderWriterLockSlim。
?
??????? 另外:看到園友寫了篇《理解Windows內核模式與用戶模式》,講的是內核架構及用戶模式調用內核模式方式。
?
在介紹.NET4.0新同步結構前,我們需要:
1)?????????認識兩個協作對象:CancellationTokenSource和cancellationToken。因為它們常常被用于混合結構中。Eg:使一個線程強迫解除其構造上的等待阻塞。
2)?????????認識兩個自旋結構:SpinWait和SpinLock
?
協作式取消
?????????對于長時間運行的計算限制操作來說,支持取消是一件很“棒”的事情。.NET 4.0提供了一個標準的取消操作模式。即通過使用CancellationTokenSource創建一個或多個取消標記CancellationToken(cancellationToken可在線程池中線程或 Task 對象之間實現協作取消),然后將此取消標記傳遞給應接收取消通知的任意數量的線程或Task對象。當調用CancellationToken關聯的CancellationTokenSource對象的 Cancle()時,每個取消標記(CancellationToken)上的IsCancellationRequested屬性將返回true。異步操作中可以通過檢查此屬性做出任何適當響應。
1.?????????CancellationTokenSource相關API:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | // 通知System.Threading.CancellationToken,告知其應被取消。 public?class?CancellationTokenSource : IDisposable { ????// 構造一個CancellationTokenSource將在指定的時間跨度后取消。 ????public?CancellationTokenSource(int?millisecondsDelay); ? ????// 獲取是否已請求取消此CancellationTokenSource。 ????public?bool?IsCancellationRequested { get; } ????? ????// 獲取與此CancellationTokenSource關聯的CancellationToken。 ????public?CancellationToken Token { get; } ? ????// 傳達取消請求。參數throwOnFirstException:指定異常是否應立即傳播。 ????public?void?Cancel(); ????public?void?Cancel(bool?throwOnFirstException); ????// 在此CancellationTokenSource上等待指定時間后“取消”操作。 ????public?void?CancelAfter(int?millisecondsDelay); ? ????// 創建一組CancellationToken關聯的CancellationTokenSource。 ????public?static?CancellationTokenSource CreateLinkedTokenSource(paramsCancellationToken[] tokens); ? ????// 釋放由CancellationTokenSource類的當前實例占用的所有資源。 ????public?void?Dispose(); ????…… } |
分析:
1)?????????CancellationTokenSource.CreateLinkedTokenSource()方法
將一組CancellationToken連接起來并創建一個新的CancellationTokenSource。任何一個CancellationToken對應的舊CancellationTokenSource被取消,這個新的CancellationTokenSource對象也會被取消。
原理:創建一個新的CancellationTokenSource實例,并將該實例的Cancel()委托分別傳遞給這組CancellationToken實例的Register()方法,然后返回新創建的CancellationTokenSource實例。
2)?????????CancellationTokenSource實例Cancel()方法做了什么:
a)?????????將CancellationTokenSource實例的IsCancellationRequested屬性設置為true。CancellationToken實例的IsCancellationRequested屬性是調用CancellationTokenSource實例的IsCancellationRequested屬性。
b)?????????調用CancellationTokenSource實例的CreateLinkedTokenSource()注冊的Cancel()委托回調;
c)?????????調用CancellationToken實例的Register()注冊的回調;
d)?????????處理回調異常。(參數throwOnFirstException)
???????????????????????????????????????i.??????????????若為Cancel()傳遞true參數,那么拋出了未處理異常的第一個回調方法會阻止其他回調方法的執行,異常會立即從Cancel()中拋出;
??????????????????????????????????????ii.??????????????若為Cancel()傳遞false(默認為false),那么登記的所有回調方法都會調用。所有未處理的異常都會封裝到一個AggregateException對象中待回調都執行完后返回,其InnerExceptions屬性包含了所有異常的詳細信息。
? ? ? ? e) ? ?給CancellationToken對象的ManualResetEvent對象Set()信號。
2.?????????CancellationToken相關API
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | // 傳播有關應取消操作的通知。 public?struct?CancellationToken { ????public?CancellationToken(bool?canceled); ????public?static?CancellationToken None { get; } ? ????// 獲取此標記是否能處于已取消狀態。 ????public?bool?CanBeCanceled { get; } ????// 獲取是否已請求取消此標記。 ????public?bool?IsCancellationRequested { get; } ? ????// 獲取內部ManualResetEvent,在CancellationTokenSource執行Cancel()時收到set()通知。 ????public?WaitHandle WaitHandle{ get; } ? ????// 注冊一個將在取消此CancellationToken時調用的委托。 ????// 參數:useSynchronizationContext: ????//一個布爾值,該值指示是否捕獲當前SynchronizationContext并在調用 callback 時使用它。 ????public?CancellationTokenRegistration Register(Action<object> callback, object?state ????????????????????????????, bool?useSynchronizationContext); ? ????// 如果已請求取消此標記,則引發OperationCanceledException。 ????public?void?ThrowIfCancellationRequested(); ????…… } |
分析:
1)?????????CancellationToken是結構struct,值類型。
2)?????????CancellationTokenSource與CancellationToken關聯是“一一對應”的
a)?????????無論CancellationTokenSource是通過構造函數創建還是CreateLinkedTokenSource()方法創建,與之對應的CancellationToken只有一個。
b)?????????每個CancellationToken都會包含一個私有字段,保存唯一與之對應的CancellationTokenSource引用。
3)?????????CancellationToken實例的None屬性與參數不是true的CancellationToken構造函數
它們返回一個特殊的CancellationToken實例,該實例不與任何CancellationTokenSource實例關聯(即不可能調用Cancel()),其CanBeCanceled實例屬性為false。
4)?????????CancellationToken的Register()方法返回的CancellationTokenRegistration對象,可調用其Dispose()方法刪除一個Register()登記的回調方法。
5)?????????CancellationToken實例的WaitHandle屬性
會先判斷若沒有對應的CancellationTokenSource,則創建一個默認的CancellationTokenSource對象。然后再判斷若沒有內部事件等待句柄則new?ManualResetEvent(false),在CancellationTokenSource執行Cancel()時收到set()通知。;
6)?????????CancellationToken實例的ThrowIfCancellationRequested()方法如下:
| 1 2 3 4 5 6 7 8 | public?void?ThrowIfCancellationRequested() { ????if?(this.IsCancellationRequested) ????{ ????????throw?new?OperationCanceledException( ????????Environment.GetResourceString("OperationCanceled"), this); ????} } |
3.?????????示例
示例:一個線程池線程協作取消的例子:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | public?static?void?ThreadPool_Cancel_test() { ????CancellationTokenSource cts = new?CancellationTokenSource(); ? ????ThreadPool.QueueUserWorkItem( ????????token => ????????{ ????????????CancellationToken curCancelToken = (CancellationToken)token; ????????????while?(true) ????????????{ ????????????????// 耗時操作 ????????????????Thread.Sleep(400); ????????????????if?(curCancelToken.IsCancellationRequested) ????????????????{ ????????????????????break;?? // 或者拋異常curCancelToken.ThrowIfCancellationRequested(); ????????????????} ????????????} ????????????Console.WriteLine(String.Format("線程{0}上,CancellationTokenSource操作已取消,退出循環" ????????????????, Thread.CurrentThread.ManagedThreadId)); ????????} ????????, cts.Token ?????); ? ????ThreadPool.QueueUserWorkItem( ????????token => ????????{ ????????????Console.WriteLine(String.Format("線程{0}上,調用CancellationToken實例的WaitHandle.WaitOne() " ?????????????????, Thread.CurrentThread.ManagedThreadId)); ????????????CancellationToken curCancelToken = (CancellationToken)token; ????????????curCancelToken.WaitHandle.WaitOne(); ????????????Console.WriteLine(String.Format("線程{0}上,CancellationTokenSource操作已取消,WaitHandle獲得信號" ?????????????????, Thread.CurrentThread.ManagedThreadId)); ????????} ????????, cts.Token ?????); ? ????Thread.Sleep(2000); ????Console.WriteLine("執行CancellationTokenSource實例的Cancel()"); ????cts.Cancel();??????????? } |
結果:
?
SpinWait結構----自旋等待
一個輕量同步類型(結構體),提供對基于自旋的等待的支持。SpinWait只有在多核處理器下才具有使用意義。在單處理器下,自旋轉會占據CPU時間,卻做不了任何事。
SpinWait并沒有設計為讓多個任務或線程并發使用。因此,如果多個任務或者線程通過SpinWait的方法進行自旋,那么每一個任務或線程都應該使用自己的SpinWait實例。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | public?struct?SpinWait { ????// 獲取已對此實例調用SpinWait.SpinOnce() 的次數。 ????public?int?Count { get; } ????// 判斷對SpinWait.SpinOnce() 的下一次調用是否觸發上下文切換和內核轉換。 ????public?bool?NextSpinWillYield { get; } ? ????// 重置自旋計數器。 ????public?void?Reset(); ????// 執行單一自旋。 ????public?void?SpinOnce(); ????// 在指定條件得到滿足(Func<bool>委托返回true)之前自旋。 ????public?static?void?SpinUntil(Func<bool> condition); ????// 在指定條件得到滿足或指定超時過期之前自旋。參數condition為在返回 true 之前重復執行的委托。 ????// 返回結果: ????// 如果條件在超時時間內得到滿足,則為 true;否則為 false ????public?static?bool?SpinUntil(Func<bool> condition, int?millisecondsTimeout); ????public?static?bool?SpinUntil(Func<bool> condition, TimeSpan timeout); } |
分析SpinWait關鍵實現代碼:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | public?bool?NextSpinWillYield { ????get ????{ ????????if?(this.m_count<= 10)? // 自旋轉計數 ????????{ ????????????return?Environment.ProcessorCount == 1; ????????} ????????return?true; ????} } public?void?SpinOnce() { ????if?(this.NextSpinWillYield) ????{ ????????Int num = (this.m_count>= 10) ? (this.m_count - 10) :this.m_count; ????????if?((num % 20) == 0x13) ????????{ ????????????Thread.Sleep(1); ????????} ????????else?if?((num % 5) == 4) ????????{ ????????????Thread.Sleep(0); ????????} ????????else ????????{ ????????????Thread.Yield(); ????????} ????} ????else ????{ ????????Thread.SpinWait(((int) 4) <<this.m_count); ????} ????this.m_count = (this.m_count == 0x7fffffff) ?10 : (this.m_count + 1); } |
從代碼中我們可知:
1)?????????SpinWait自旋轉是調用Thread.SpinWait()。
2)?????????由NextSpinWillYield屬性代碼可知,若SpinWait運行在單核計算機上,它總是進行上下文切換(讓出處理器)。
3)?????????SpinWait不僅僅是一個空循環。它經過了精心實現,可以針對一般情況提供正確的旋轉行為以避免內核事件所需的高開銷的上下文切換和內核轉換;在旋轉時間足夠長的情況下自行啟動上下文切換,SpinWait甚至還會在多核計算機上產生線程的時間片(Thread.Yield())以防止等待線程阻塞高優先級的線程或垃圾回收器線程。
4)?????????SpinOnce()自旋一定次數后可能導致頻繁上下文切換。注意只有等待時間非常短時,SpinOnce()或SpinUntil()提供的智能行為才會獲得更好的效率,否則您應該在SpinWait自行啟動上下文切換之前調用自己的內核等待。
通常使用SpinWait來封裝自己“兩階段等待操作”,避免內核事件所需的高開銷的上下文切換和內核轉換。
實現自己的“兩階段等待操作”:
| 1 2 3 4 | if?(!spinner.NextSpinWillYield) {spinner.SpinOnce();} else {自己的事件等待句柄;} |
?
SpinLock結構----自旋鎖
一個輕量同步類型,提供一個相互排斥鎖基元,在該基元中,嘗試獲取鎖的線程將在重復檢查的循環中等待,直至該鎖變為可用為止。SpinLock是結構體,如果您希望兩個副本都引用同一個鎖,則必須通過引用顯式傳遞該鎖。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | ????public?struct?SpinLock ????{ ????????// 初始化SpinLock結構的新實例,參數標識是否啟動線程所有權跟蹤以助于調試。 ????????public?SpinLock(bool?enableThreadOwnerTracking); ? ????????// 獲取鎖當前是否已由任何線程占用。 ????????public?bool?IsHeld { get; } ????????// 獲取是否已為此實例啟用了線程所有權跟蹤。 ????????public?bool?IsThreadOwnerTrackingEnabled { get; } ????????// 若IsThreadOwnerTrackingEnabled=true,則可獲取鎖是否已由當前線程占用。 ????????public?bool?IsHeldByCurrentThread { get; } ? ????????// 采用可靠的方式獲取鎖,這樣,即使在方法調用中發生異常的情況下,都能采用可靠的方式檢查lockTaken以確定是否已獲取鎖。 ????????public?void?Enter(ref?boollockTaken); ????????public?void?TryEnter(ref?boollockTaken); ????????public?void?TryEnter(int?millisecondsTimeout, ref?bool?lockTaken); ????????public?void?TryEnter(TimeSpan timeout, ref?bool?lockTaken); ????????// Enter(ref boollockTaken)與TryEnter(ref bool lockTaken)效果一樣,TryEnter(ref boollockTaken)會跳轉更多方法降低的性能。 ? ????????// 釋放鎖。參數useMemoryBarrier:指示是否應發出內存屏障,以便將退出操作立即發布到其他線程(默認為true)。 ????????public?void?Exit(); ????????public?void?Exit(bool?useMemoryBarrier); } |
使用需注意:
1)?????????SpinLock支持線程跟蹤模式,可以在開發階段使用此模式來幫助跟蹤在特定時間持有鎖的線程。雖然線程跟蹤模式對于調試很有用,但此模式可能會導致性能降低。(構造函數:可接受一個bool值以指示是否啟用調試模式,跟蹤線程所有權)
2)?????????SpinLock不可重入。在線程進入鎖之后,它必須先正確地退出鎖,然后才能再次進入鎖。通常,任何重新進入鎖的嘗試都會導致死鎖。
如果在調用?Exit?前沒有調用?Enter,SpinLock的內部狀態可能被破壞。
3)?????????Enter與TryEnter的選擇
a)?????????Enter(ref boollockTaken)??????????在獲取不到鎖時會阻止等待鎖可用,自旋等待,相當于等待時間傳入-1(即無限期等待)。
b)?????????TryEnter(ref boollockTaken)??????????????在獲取不到鎖時立即返回而不行進任何自旋等待,相當于等待時間傳入0。
c)?????????TryEnter(時間參數, ref boollockTaken)???????????在獲取不到鎖時,會在指定時間內自旋等待。
d)??在指定時間內,若自旋等待足夠長時間,內部會自動切換上下文進行內核等待,切換邏輯類似SpinWait結構(即,并沒有使用等待事件,只是使用Thread.Sleep(0)、Thread.Sleep(1)以及Thread.Yield()),所以也可能導致頻繁上下文切換。
4)?????????在多核計算機上,當等待時間預計較短且極少出現爭用情況時,SpinLock的性能將高于其他類型的鎖(長時或預期有大量阻塞,由于旋轉過多,性能會下降)。但需注意的一點是,SpinLock比標準鎖更耗費資源。建議您僅在通過分析確定?Monitor方法或?Interlocked?方法顯著降低了程序的性能時使用SpinLock。
5)?????????在保持一個自旋鎖時,應避免任何這些操作:
a)?????????阻塞,
b)?????????調用本身可能阻塞的任何內容,
c)?????????一個SpinLock結構上保持過多自旋鎖,
d)?????????進行動態調度的調用(接口和虛方法)
e)?????????非托管代碼的調度,或分配內存。
6)?????????不要將SpinLock聲明為只讀字段,因為如果這樣做的話,會導致每次調用這個字段都返回SpinLock的一個新副本,而不是同一個SpinLock。這樣所有對Enter()的調用都能成功獲得鎖,因此受保護的臨界區不會按照預期進行串行化。
?
驚奇的Monitor\lock
?????????說到Monitor(監視器)相信大家早已銘記于心了,此結構在.NET早期版本就已經存在。但是大家可能對他是“混合構造”這一說法感到驚奇,分析下它的幾個步驟:
1)?????????執行Monitor.Enter()/lock的線程會首先測試Monitor的鎖定位。如果該位為OFF(解鎖),那么線程就會在該位上設置一下(加鎖),且不需要等待便繼續。這通常只需執行1~2個機器指令。
2)?????????如果Monitor被鎖定,線程就會進入一個旋轉等待持有鎖。而線程在旋轉期間會反復測試鎖定位。單處理器系統會立即放棄,而在多核處理器系統上則旋轉一段時間才會放棄。在此之前,線程都在用戶模式下運行。
3)?????????一旦線程放棄測試鎖定位(在單處理器上立即如此),線程使用信號量在內核進入等待狀態。
4)?????????執行Monitor.Exit()或代碼退出了lock塊。如果存在等待線程,則使用ReleaseSemaphore()通知內核。
在第二步中,提到的旋轉等待。正是:SpinWait。
?
ManualResetEventSlim
當等待時間預計非常短時,并且當事件不會跨越進程邊界時,可使用ManualResetEventSlim類以獲得更好的性能(ManualResetEvent的優化版本)。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | public?class?ManualResetEventSlim : IDisposable { ????// 初始化 ManualResetEventSlim 類的新實例。 ????//?? initialState:(默認為false) ????//???? 如果為 true,則IsSet屬性設置為true,此時為有信號狀態,不會阻止線程。 ????//?? spinCount: ????//???? 設置在回退到基于內核的等待操作之前需發生的自旋等待數量,默認為10。 ????public?ManualResetEventSlim(bool?initialState, int?spinCount); ? ????// 獲取是否設置了事件。Reset()將其設置為false;Set()將其設置為true ????public?bool?IsSet { get; } ????// 獲取在回退到基于內核的等待操作之前發生需的自旋等待數量,由構造函數設置。 ????public?int?SpinCount { get; } ????// 獲取此ManualResetEventSlim的基礎WaitHandle(ManualResetEvent) ????public?WaitHandle WaitHandle { get; } ? ????// 將事件狀態設置為非終止狀態,從而阻塞線程。 ????public?void?Reset(); ????// 將事件狀態設置為終止,從而允許一個或多個等待該事件的線程繼續。 ????public?void?Set(); ????// 阻止當前線程,直到Set()了當前ManualResetEventSlim為止。無限期等待。 ????public?void?Wait(); ????// 阻止當前線程,直到Set()了當前ManualResetEventSlim為止,并使用 32 位帶符號整數測量時間間隔, ????// 同時觀察.CancellationToken。在指定時間內收到信號,則返回true。 ????public?bool?Wait(int?millisecondsTimeout, CancellationToken cancellationToken); ? ????// 釋放由ManualResetEventSlim類的當前實例占用的所有資源。 ????public?void?Dispose(); ????…… } |
1.?????????分析
1)?????????首先要明確的是ManualResetEventSlim是ManualResetEvent的優化版本,但并不是說其混合構造就是基于自旋+ManualResetEvent完成。ManualResetEventSlim是基于自旋+Monitor完成。
2)?????????可在ManualResetEventSlim的構造函數中指定切換為內核模式之前需發生的自旋等待數量(只讀的SpinCount屬性),默認為10。
3)?????????訪問WaitHandle屬性會延遲創建一個ManualResetEvent(false)對象。在調用ManualResetEventSlim的set()方法時通知WaitHandle.WaitOne()獲得信號。
2.?????????示例
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | public?static?void?Test() { ????ManualResetEventSlim manualSlim = new?ManualResetEventSlim(false); ????Console.WriteLine("ManualResetEventSlim示例開始"); ????Thread thread1 = new?Thread(o => ????{ ????????Thread.Sleep(500); ????????Console.WriteLine("調用ManualResetEventSlim的Set()"); ????????manualSlim.Set(); ????}); ????thread1.Start(); ????Console.WriteLine("調用ManualResetEventSlim的Wait()"); ????manualSlim.Wait(); ????Console.WriteLine("調用ManualResetEventSlim的Reset()"); ????manualSlim.Reset();? // 重置為非終止狀態,以便下一次Wait()等待 ????CancellationTokenSource cts = new?CancellationTokenSource(); ????Thread thread2 = new?Thread(obj => ????{ ????????Thread.Sleep(500); ????????CancellationTokenSource curCTS = obj as?CancellationTokenSource; ????????Console.WriteLine("調用CancellationTokenSource的Cancel()"); ????????curCTS.Cancel(); ????}); ????thread2.Start(cts); ????try ????{ ????????Console.WriteLine("調用ManualResetEventSlim的Wait()"); ????????manualSlim.Wait(cts.Token); ????????Console.WriteLine("調用CancellationTokenSource后的輸出"); ????} ????catch?(OperationCanceledException) ????{ ????????Console.WriteLine("異常:OperationCanceledException"); ????} } |
?????????結果:
?????????
?
SemaphoreSlim
?????????SemaphoreSlim是Semaphore的優化版本。限制可同時訪問某一資源或資源池的線程數。
?????????SemaphoreSlim利用SpinWait結構+Monitor可重入的特性+引用計數實現,并且提供的異步API:返回Task的WaitAsync();重載方法。
?????????注意CurrentCount屬性的使用,此屬性能夠獲取進入信號量的任務或線程的數目。因為這個值總是在變化,所以當信號量在執行并發的Release和Wait方法時,某一時刻CurrentCount等于某個值并不能說明任務或線程執行下一條指令的時候也一樣。因此,一定要通過Wait方法和Release方法進入和退出由信號量所保護的資源。
使用很簡單,請參考ManualResetEventSlim小節示例。
?
CountdownEvent
這個構造阻塞一個線程,直到它的內部計數器變成0(與信號量相反,信號量是在計數位0時阻塞線程)。CountdownEvent是對ManualResetEventSlim的一個封裝。
CountdownEvent簡化了fork/join模式。盡管基于新的任務編程模型通過Task實例、延續和Parallel.Invoke可以更方便的表達fork-join并行。然而,CountdownEvent對于任務而言依然有用。使用Task.WaitAll()或TaskFactory.ContinueWhenAll()方法要求有一組等待的Task實例構成的數組。CountdownEvent不要求對象的引用,而且可以用于最終隨著時間變化的動態數目的任務。
使用方式:
1)?????????CurrentCount屬性標識剩余信號數(和InitialCount屬性一起由構造函數初始化);
2)?????????Wait()阻止當前線程,直到CurrentCount計數為0(即所有的參與者都完成了);
3)?????????Signal()向CountdownEvent注冊一個或指定數量信號,通知任務完成并且將CurrentCount的值減少一或指定數量。注意不能將事件的計數遞減為小于零;
4)?????????允許使用AddCount()\TryAddCount()增加CurrentCount一個或指定數量信號(且只能增加)。一旦一個CountdownEvent的CurrentCount變成0,就不允許再更改了。
5)?????????Reset()將CurrentCount重新設置為初始值或指定值,并且允許大于InitialCount屬性,此方法為非線程安全方法。
示例:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | public?static?void?Test() { ????Console.WriteLine("初始化CountdownEvent計數為1000"); ????CountdownEvent cde = new?CountdownEvent(1000);??????????? ????// CurrentCount(當前值)1200允許大于InitialCount(初始值)1000 ????cde.AddCount(200); ????Console.WriteLine("增加CountdownEvent計數200至1200"); ????Thread thread = new?Thread(o => ????????{ ????????????int?i = 1200;?????????????????? ????????????for?(int?j = 1; j <= i; j++) ????????????{ ????????????????if(j==i) ????????????????????Console.WriteLine("CurrentCount為1200,所以必須調用Signal()1200次"); ????????????????cde.Signal();?????????????????????? ????????????}?????????????????? ????????} ????); ????thread.Start(); ????Console.WriteLine("調用CountdownEvent的Wait()方法"); ????cde.Wait(); ????Console.WriteLine("CountdownEvent計數為0,完成等待"); } |
結果:
?????????
?
ReaderWriterLockSlim(多讀少寫鎖)
?????????為了保證線程同步構造介紹的完整性,我這邊提下這個對象,因為此對象相對復雜且自身沒有接觸類似對象,所以不展開講,后續單獨開貼分享。
?????????ReaderWriterLockSlim是.NET3.5引入了,是對.NET1.0中的ReaderWriterLock構造的改進,ReaderWriterLockSlim的性能明顯優于ReaderWriterLock,建議在所有新的開發工作中使用ReaderWriterLockSlim。它們目的都是用于多讀少寫的場景,都是線程關聯對象。
ReaderWriterLockSlim是通過封裝“自旋+AutoResetEvent+ManualResetEvent”實現。
?
Barrier(關卡)
?????????Barrier適用于并行操作是分階段執行的,并且每一階段要求各任務之間進行同步。使用Barrier可以在并行操作中的所有任務都達到相應的關卡之前,阻止各個任務繼續執行。
?????????情景:當你需要一組任務并行地運行一連串的階段,但是每一個階段都要等待所有其他任務都完成前一階段之后才能開始。
?????????Barrier構造由SpinWait結構+ManualResetEventSlim實現。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | public?class?Barrier : IDisposable { ????// 指定參與線程數與后期階段操作的委托來初始化 Barrier 類的新實例。 ????public?Barrier(int?participantCount, Action<Barrier> postPhaseAction); ? ????// 獲取屏障的當前階段的編號。 ????public?long?CurrentPhaseNumber { get; internal?set; } ????// 獲取屏障中參與者的總數。 ????public?int?ParticipantCount { get; } ????// 獲取屏障中尚未在當前階段發出信號的參與者的數量。 ????public?int?ParticipantsRemaining { get; } ? ????// 增加一個或指定數量參與者。返回新參與者開始參與關卡的階段編號。 ????public?long?AddParticipant(); ????public?long?AddParticipants(int?participantCount); ????//? 減少一個或指定數量參與者。 ????public?void?RemoveParticipant(); ????public?void?RemoveParticipants(int?participantCount); ? ????// 發出參與者已達到關卡的信號,并等待所有其他參與者也達到關卡, ????// 使用 System.TimeSpan 對象測量時間間隔,同時觀察取消標記。 ????// 返回結果:如果所有其他參與者已達到屏障,則為 true;否則為 false。 ????public?bool?SignalAndWait(TimeSpan timeout, CancellationToken cancellationToken); ? ????// 釋放由 Barrier 類的當前實例占用的所有資源。 ????public?void?Dispose(); ????…… } |
?????????使用方式:
1)?????????構造一個Barrier時,要告訴它有多少線程準備參與工作(0<=x<=32767),還可以傳遞一個Action<Barrier>委托來引用所有參與者完成一個簡短的工作后要執行的后期階段操作(此委托內部會傳入當前Barrier實例,如果后期階段委托引發異常,則在 BarrierPostPhaseException 對象中包裝它,然后將其傳播到所有參與者,需要用try-catch塊包裹SignalAndWait()方法)。
2)?????????可以調用AddParticipant和RemoveParticipant方法在Barrier中動態添加和刪除參與線程。如果關卡當前正在執行后期階段(即Action<Barrier>委托)操作,此調用將被阻止,直到后期階段操作完成且該關卡已轉至下一階段。
3)?????????每個線程完成它的階段性工作后,應調用SignalAndWait(),告訴Barrier線程已經完成一個階段的工作,并阻塞當前線程。待所有參與者都調用了SignalAndWait()后,由最后一個調用SignalAndWait()的線程調用Barrier構造函數指定的Action<Barrier>委托,然后解除正在等待的所有線程的阻塞,使它們開始下一個階段。
如果有一個參與者未能到達關卡,則會發生死鎖。若要避免這些死鎖,可使用SignalAndWait方法的重載來指定超時期限和取消標記。(SignalAndWait() 內部由SpinWait結構實現)
4)?????????每當Barrier完成一個階段時ParticipantsRemaining屬性(獲取屏障中尚未在當前階段發出信號的參與者的數量)會重置,在Barrier調用Action<Barrier>委托之前就已被重置。
5)?????????當執行階段后操作的委托時,屏障的CurrentPhaseNumber屬性的值會等于已經完成的階段的數值,而不是新的階段數。
示例:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | private?static?int?m_count = 3; private?static?int?m_curCount = 0; private?static?Barrier pauseBarr = new?Barrier(2); public?static?void?Test() { ????Thread.VolatileWrite(ref?m_curCount, 0); ????Barrier barr = new?Barrier(m_count, new?Action<Barrier>(Write_PhaseNumber)); ????Console.WriteLine("Barrier開始第一階段"); ????AsyncSignalAndWait(barr, m_count); ? ????// 暫停等待 barr 第一階段執行完畢 ????pauseBarr.SignalAndWait(); ? ????Console.WriteLine("Barrier開始第二階段"); ????Thread.VolatileWrite(ref?m_curCount, 0); ????AsyncSignalAndWait(barr, m_count); ? ????// 暫停等待 barr 第二階段執行完畢 ????pauseBarr.SignalAndWait(); ????pauseBarr.Dispose(); ????barr.Dispose(); ????Console.WriteLine("Barrier兩個階段執行完畢"); } // 執行 SignalAndWait 方法 private?static?void?AsyncSignalAndWait(Barrier barr, int?count) { ????for?(int?i = 1; i <= count; i++) ????{ ????????ThreadPool.QueueUserWorkItem(o => ????????????{ ????????????????Thread.Sleep(200); ????????????????Interlocked.Increment(ref?m_curCount); ????????????????barr.SignalAndWait(); ????????????} ????????); ????} } // 輸出當前Barrier的當前階段 private?static?void?Write_PhaseNumber(Barrier b) { ????Console.WriteLine(String.Format("Barrier調用完{0}次SignalAndWait()", m_curCount)); ????Console.WriteLine("階段編號為:"?+ b.CurrentPhaseNumber); ????Console.WriteLine("ParticipantsRemaining屬性值為:"?+ b.ParticipantsRemaining); ????pauseBarr.SignalAndWait(); } |
結果:
?
?
Dispose()的好習慣
?????????使用完資源后釋放是個好習慣。同步基元WaitHandle、ManualResetEventSlim、SemaphoreSlim、CountdownEvent、Barrier、ReaderWriterLockSlim都實現了IDisposable接口,即我們使用完都應該進行釋放。
1)?????????WaitHandle的Dispose()方法是關閉SafeWaitHandle引用的Win32內核對象句柄。
2)?????????ManualResetEventSlim、SemaphoreSlim、CountdownEvent、Barrier、ReaderWriterLockSlim由于都提供了WaitHanle屬性,以延遲創建內核等待事件,所以調用Dispose實質上是間接的調用WaitHandle的Dispose()方法。
?
同步構造的最佳實踐
線程同步構造選擇可以遵循下面規則:????
1.?????????代碼中盡量不要阻塞任何線程。因為創建線程不僅耗費內存資源也影響性能,如果創建出來的線程因阻塞而不做任何事太浪費。
2.?????????對于簡單操作,盡量使用Thread類的VolatileRead()方法、VolatileWrite()方法和Interlocked靜態類方法。
3.?????????對于復雜操作:
1)?????????如果一定要阻塞線程,為了同步不在AppDomain或者進程中運行的線程,請使用內核對象構造。
2)?????????否則,使用混合構造Monitor鎖定一個靜態私有的引用對象方式(ManualResetEventSlim、SemaphoreSlim、CountdownEvent構造都是對Monitor進行封裝)。
3)?????????另外,還可以使用一個reader-writer鎖來代替Monitor。reader-writer鎖通常比Monitor慢一些,但它允許多個線程并發的以只讀方式訪問數據,這提升了總體性能,并將阻塞線程的幾率降至最低。
4.?????????避免不必要地使用可變字段。大多數的時間、鎖或并發集合?(System.Collections.Concurrent.*)?更適合于在線程之間交換數據。在一些情況下,可以使用可變字段來優化并發代碼,但您應該使用性能度量來驗證所得到的利益勝過復雜性的增加。
5.?????????應該使用?System.Lazy<T>?和?System.Threading.LazyInitializer?類型,而不是使用可變字段自己實現遲緩初始化模式。
6.?????????避免輪詢循環。通常,您可以使用?BlockingCollection<T>、Monitor.Wait/Pulse、事件或異步編程,而不是輪詢循環。
7.?????????盡可能使用標準?.NET?并發基元,而不是自己實現等效的功能。
8.?????????在使用任何同步機制的時候,提供超時和取消是一件非常重要的事情。因為代碼中的錯誤或不可預知的情形都可能導致任務或線程永遠等待。
?
?
?
? ? 本博文主要介紹用戶模式\內核模式,如何實現協作式取消,.NET4.0中新同步基元對象:ManualResetSlim\SemaphoreSlim\CountdownEvent\Barrier(關卡)\ReaderWriterLockSlim,自旋等待SpinWait和自旋鎖SpinLock……
目前,我已經為大家介紹了線程、線程池、線程同步知識,接下來我將給大家介紹.NET4.X給我們帶來的新異步編程方式Task。(想先了解Task相關知識的,可以查看《關于Async與Await的FAQ》)
本節到此結束,謝謝大家的觀看,贊的還請多推薦。
?
推薦閱讀:
???????????????《理論與實踐中的 C# 內存模型》
?
參考資料:
?????????《CLR via C#(第三版)》
?????????MSDN
總結
以上是生活随笔為你收集整理的【转】1.4异步编程:轻量级线程同步基元对象的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【转】阿里技术专家详解DDD系列 第二讲
- 下一篇: 传雷帝嘎嘎出演小丑女