如何运用并行编程Parallel提升任务执行效率
本文來自小易,【DoTNET技術圈】公眾號已獲得轉載授權。
《.NET并發變成實戰》讀后感:并行編程Parallel
手打目錄:
一、前言
二、任務并行庫(TPL)的介紹
三、Parallel.Invoke的使用
四、Parallel.For的使用
五、Parallel.ForEach+Partitioner的使用
六、指定最大并行度MaxDegreeOfParallelism
七、退出循環以及捕捉異常
八、參考的資料
一、前言
背景:在物聯網場景下,由于數據吞吐量較大,常規的Task異步執行存在明顯的性能瓶頸,后通過參考Riccardo Terrel(里卡爾多Dian·特雷爾)著,葉偉民老師翻譯的《.NET并發編程實戰》,使用了Parallel并行編程,以及分區器Partitioner,將兩者結合使用提高了設備數據綁定及數據更新速度,也做到了對CPU的性能比較極致使用。
萌新記錄,大佬多加斧正!
可跳過概念,直接抵達使用實例—>五、Parallel.ForEach+Partitioner的使用
l?并行編程的原理
在《.net并發編程實戰》(以下稱《實戰》)中這樣解釋并行編程——同時執行多個任務。
從開發人員的角度看,當我們考慮這些問題是,“我的程序可以同時執行多項操作嗎?”或“我的程序如何更快地解決一個問題”我們會想到并行。并行是指同時在不同的內核上執行多個任務,以提高應用程序的速度,這需要硬件支持(多核),且并行只能在多核設備中實現,是提高程序性能和吞吐量的手段。
l?并行與并發編程簡單區分
1、?并發編程一次處理多個操作,不需要硬件支持(使用一個或多個內核)。
2、?并行編程在多個CPU或多個內核上同時執行多個操作。所有并行程序都是并發的,同時運行的,但并非所有并發都是并行的。原因是并行只能在多核設備上實現。
3、?多任務同時執行來自不同進程的多個線程。多任務并不一定意味著并行執行,只有在使用多個CPU 或多個內核時才能實現并行執行。
?
l?為什么需要使用并行編程
《實戰》對不同程序CPU使用資源使用的程度做了一個對比:?
?
《實戰》中認為,在一臺多核計算機上運行一個沒有考慮到并發的應用程序,就是在浪費計算機的生產力,因為應用程序在順序處理過程中只能使用一部分可用的計算能力,在這種情況下任何CPU性能計數器會發現只有一個內核運行得很快,可能為100%,而其他內核未充分利用或空閑,在上圖的8內核的計算機中,運行的非并行程序意味著資源的總體使用率可能不到15%。
l?使用并行編程的兩種方式
1、?任務并行庫(TPL),本文中只使用了這種方式
2、?并行LINQ(PLINQ)—》官方文檔直達:https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-programming/introduction-to-plinq
二、任務并行庫(TPL)的并行介紹
.Net Framework4 引入了新的Task Parallel Library(任務并行庫,TPL),它支持數據并行、任務并行和流水線。
當并行循環運行時,TPL會將數據源按照內置的分區算法(或者你可以自定義一個分區算法)將數據劃分為多個不相交的子集,然后,從線程池中選擇線程并行地處理這些數據子集,每個線程只負責處理一個數據子集。在后臺,任務計劃程序將根據系統資源和工作負荷來對任務進行分區。如有可能,計劃程序會在工作負荷變得不平衡的情況下在多個線程和處理器之間重新分配工作。
在對任何代碼(包括循環)進行并行化時,一個重要的目標是利用盡可能多的處理器,但不要過度并行化到使行處理的開銷讓任何性能優勢消耗殆盡的程度。比如:對于嵌套循環,只會對外部循環進行并行化,原因是不會在內部循環中執行太多工作。少量工作和不良緩存影響的組合可能會導致嵌套并行循環的性能降低。
由于循環體是并行運行的,迭代范圍的分區是根據可用的邏輯內核數、分區大小以及其他因素動態變化的,因此無法保證迭代的執行順序。
? ? TPL引入了System.Threading.Tasks ,主類是Task,這個類表示一個異步的并發的操作,然而我們不一定要使用Task類的實例,可以使用Parallel靜態類。它提供了Parallel.Invoke,?Parallel.For,Parallel.Forecah?三個方法,以下分別介紹3個方法的簡單實例,每個方法都有多個重載,可自行查看源代碼
三、Parallel.Invoke的使用
????????
static void Main(){try{Parallel.Invoke(BasicAction,// Param #0 - 靜態方法() =>// Param #1 - lambda表達式{Console.WriteLine("干飯人干飯, Thread={0}", Thread.CurrentThread.ManagedThreadId);},delegate ()// Param #2 - 委托{Console.WriteLine("委托方法中, Thread={0}", Thread.CurrentThread.ManagedThreadId);});}// 在本例中不期望出現異常,但如果任務中仍然拋出異常,// 它將被包裝在AggregateException中,并傳播到主線程。catch (AggregateException e){Console.WriteLine("捕捉異常 \n{0}", e.InnerException.ToString());}}static void BasicAction(){Console.WriteLine("打工人打工, Thread={0}", Thread.CurrentThread.ManagedThreadId);}?
?
注解:
l?此方法可用于執行可能并行執行的一組操作。
l?不保證執行操作的順序,或是否并行執行操作。
l?此方法在每個提供的操作都已完成后才會返回,無論是由于正常終止還是異常終止而發生。
?
四、Parallel.For的使用
我們先用一個簡單的插入,來比較并行的for循環與串行for循環的速度。
?
這里因為Parallel.For在對處理器分配任務時候也有性能消耗,速度提升并不明顯。
?
接下來我們看一下Parallel.For的其中重載之一
?
??????????
var?list?=?new?List<int>()?{?10,?20,?30,?40?}; var options = new ParallelOptions(); var total = 0; var result = Parallel.For(0, list.Count, () =>{Console.WriteLine("------------ thead --------------");return 1;},(i, loop, j) =>{Console.WriteLine("------------ body --------------");Console.WriteLine("i=" + list[i] + " j=" + j);return list[i];},(b)?=>{Console.WriteLine("------------ tfoot --------------");Interlocked.Add(ref?total,?b);Console.WriteLine("total="?+?total); });Console.WriteLine("iscompleted:" + result.IsCompleted); Console.Read();注解:
l?因為并行任務當中不保證執行順序,且多任務可能會同時嘗試更新total變量,所以這里使用了?Interlocked.Add執行,來保證它是作為原子操作來執行。
五、Parallel.ForEach+Partitioner的結合使用
Partitioner分區器:
首先我們來看看分區器源代碼,看他是如何對數據源進行分區的:
?
Partitioner.Create?若只指定的數據源的起始于結束的索引位置,創建分區則主要是根據邏輯內核數(PlatformHelper.ProcessorCount)決定的。
?
?
?
大部分情況下,TPL在幕后使用的負載均衡機制都是非常高效的,比如我們不使用分區器,直接對數據源進行負載均衡的并行執行,案例請看—>六、指定最大并行度。
當然我們也可以自定義分區大小,以下我們進入到實際的開發環境中,當前實驗電腦為6核12線程處理器
注解:
dataList —>實時數據的數據源
Index —>數據源總數,此處假設1W條數據
rangesize—>區塊大小,由此可以計算 ?10000/12+1=834(+1是為了適應可能除不盡的情況)
Partitioner.Create(0,Index,rangesize) —>分區器將數據源0-1W條數據分成了12個數據塊,每一塊為834條,當然最后一塊沒有834條數據
?
?
打上斷點可以看到range.Item2-range.Item1=834,已經分好區塊了,然后就是并行處理業務代碼了。
這里貼上示例,粘貼可用:
????????????
int index = 10000; var rangesize = (int)(index / Environment.ProcessorCount) + 1; var?rangePartitioner?=?Partitioner.Create(1,?index,?rangesize); System.Threading.Tasks.Parallel.ForEach(rangePartitioner, range =>{#region 業務代碼#endregion });六、指定最大并行度MaxDegreeOfParallelism
參考博客文章:Parallel.ForEach 之 MaxDegreeOfParallelism
https://www.cnblogs.com/QinQouShui/p/12134232.html
System.Threading.Tasks.Parallel.ForEach(list, new ParallelOptions() { MaxDegreeOfParallelism = 12 }, range =>{#region 業務代碼#endregion});此Parallel.ForEach并沒有使用分區器,而是用TPL進行負載均衡的并行。
該重載的源代碼為:
?
七、退出循環以及捕捉異常
和串行運行中的break不同,ParallelLoopState 提供了兩個方法用于停止Parallel.For 和 Parallel.ForEach的執行。
?
public class ParallelLoopState {// 獲取循環的任何迭代是否已引發相應迭代未處理的異常。public bool IsExceptional { get; }// 獲取循環的任何迭代是否已調用 ParallelLoopState.Stop()。public bool IsStopped { get; }// 獲取在Parallel循環中調用 ParallelLoopState.Break() 的最低循環迭代。public long? LowestBreakIteration { get; }// 獲取循環的當前迭代是否應基于此迭代或其他迭代發出的請求退出。public bool ShouldExitCurrentIteration { get; }//通知Parallel循環當前迭代”之后”的其他迭代不需要運行。public void Break();//通知Parallel循環當前迭代“之外”的所有其他迭代不需要運行。public void Stop(); }?
l?Break:用于通知Parallel循環當前迭代“之后”的其他迭代不需要運行。例如,對于從 0 到 1000 并行迭代的 for 循環,如果在第 100 次迭代調用 Break(),則低于 100 的所有迭代仍會運行(即使還未開始處理),并在退出循環之前處理完。從 101 到 1000 中還未開啟的迭代則會被放棄。對于已經在執行的長時間運行迭代,Break()將為已運行還未結束的迭代對應ParallelLoopResult結構的LowestBreakIteration屬性設置為調用Bread()迭代項的索引。
l?Stop:Stop() 用于通知Parallel循環當前迭代“之外”的所有其他迭代不需要運行,無論它們是位于當前迭代的上方還是下方。對于已經在執行的長時間運行迭代,可以檢查 IsStopped屬性,在觀測到是 true 時提前退出。Stop 通常在基于搜索的算法中使用,在找到一個結果之后就不需要執行其他任何迭代。(比如在看視頻或漫畫時自動匹配響應最快的服務器)
var loopresult = System.Threading.Tasks.Parallel.ForEach(rangePartitioner, range => {#region 業務代碼loopState.Stop();#endregion });?
?
?
?當并行迭代中調用的委托拋出異常,這個異常沒有在委托中被捕獲到時,就會變成一組異常,新的System.AggregateException負責處理這一組異常。
try {System.Threading.Tasks.Parallel.ForEach(rangePartitioner, range =>{#region 業務代碼#endregion}); } Catch(AggregateException ex) {foreach (var innerEx in ex.InnerExceptions){Console.WriteLine(innerEx.ToString());} }八、參考的資料
l?《.net并發編程實戰》
l?官方文檔《.NET 中的并行編程》https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-programming/
l?博客園《.Net并行編程高級教程--Parallel》https://www.cnblogs.com/stoneniqiu/p/4857021.html
l?博客園《8天玩轉并發》
https://www.cnblogs.com/huangxincheng/category/368987.html
l?《異步編程:.NET4.X 數據并行》
https://www.cnblogs.com/heyuquan/archive/2013/03/13/parallel-for-foreach-invoke.html
l?博客園《Parallel.ForEach 之 MaxDegreeOfParallelism》
https://www.cnblogs.com/QinQouShui/p/12134232.html
?end
?
輸入優惠碼同樣也能享受到當當送的優惠。
RCKZEV(長按復制)10元優惠碼(滿99元可用)
7R99RD(長按復制)30元優惠碼(滿199元可用)
使用渠道:當當小程序或當當APP
有效期:3月23日至3月25日
總結
以上是生活随笔為你收集整理的如何运用并行编程Parallel提升任务执行效率的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: IOT必备之MQTT结构分析,不进来看看
- 下一篇: 通过 GitHub Actions 自动