.NET 6 新特性 Parallel ForEachAsync
.NET 6 新特性 Parallel ForEachAsync
Intro
在 .NET 6 中有一個(gè) API Parallel.ForEachAsync 在官方的博客中一直被忽略,但是我覺得這個(gè) API 非常的實(shí)用,類似于同步版本的 Parallel.ForEach,可以比較高效地控制多個(gè)異步任務(wù)的并行度。之前的版本中我們會(huì)使用信號(hào)量來控制異步任務(wù)的并發(fā)度,使用這個(gè) API 之后就可以大大簡化我們的代碼,詳細(xì)可以看下面的示例代碼。
為什么需要這個(gè) API
API definition
在使用同步任務(wù)并行執(zhí)行的時(shí)候, 我們可以使用 Parallel.ForEach 來比較方便的控制多個(gè)任務(wù)的并行度,以便更好的利用系統(tǒng)資源,比如任務(wù)中如果有對(duì)受限的系統(tǒng)資源進(jìn)行訪問的時(shí)候,此時(shí)最好就能夠控制并行度, 避免系統(tǒng)資源爭用,效率反而不高。
Parallel.ForEachAsync 相關(guān)的 API 定義如下:
public?static?System.Threading.Tasks.Task?ForEachAsync<TSource>(System.Collections.Generic.IEnumerable<TSource>?source,?System.Func<TSource,?CancellationToken,?ValueTask>?body);public?static?System.Threading.Tasks.Task?ForEachAsync<TSource>(System.Collections.Generic.IEnumerable<TSource>?source,?CancellationToken?cancellationToken,?System.Func<TSource,?CancellationToken,?ValueTask>?body);public?static?System.Threading.Tasks.Task?ForEachAsync<TSource>(System.Collections.Generic.IEnumerable<TSource>?source,?System.Threading.Tasks.ParallelOptions?parallelOptions,?System.Func<TSource,?CancellationToken,?ValueTask>?body);public?static?System.Threading.Tasks.Task?ForEachAsync<TSource>(System.Collections.Generic.IAsyncEnumerable<TSource>?source,?System.Func<TSource,?CancellationToken,?ValueTask>?body);public?static?System.Threading.Tasks.Task?ForEachAsync<TSource>(System.Collections.Generic.IAsyncEnumerable<TSource>?source,?CancellationToken?cancellationToken,?System.Func<TSource,?CancellationToken,?ValueTask>?body);public?static?System.Threading.Tasks.Task?ForEachAsync<TSource>(System.Collections.Generic.IAsyncEnumerable<TSource>?source,?System.Threading.Tasks.ParallelOptions?parallelOptions,?System.Func<TSource,?CancellationToken,?ValueTask>?body);通過 ParallelOptions 我們可以限制最大并行度以及自定義 TaskScheduler 和取消令牌
public?class?ParallelOptions {private?TaskScheduler?_scheduler;private?int?_maxDegreeOfParallelism;private?CancellationToken?_cancellationToken;public?ParallelOptions(){this._scheduler?=?TaskScheduler.Default;this._maxDegreeOfParallelism?=?-1;this._cancellationToken?=?CancellationToken.None;}public?TaskScheduler??TaskScheduler{get?=>?this._scheduler;set?=>?this._scheduler?=?value;}public?int?MaxDegreeOfParallelism{get?=>?this._maxDegreeOfParallelism;set?=>?this._maxDegreeOfParallelism?=?value?!=?0?&&?value?>=?-1???value?:?throw?new?ArgumentOutOfRangeException(nameof?(MaxDegreeOfParallelism));}public?CancellationToken?CancellationToken{get?=>?this._cancellationToken;set?=>?this._cancellationToken?=?value;} }Sample
來看一個(gè)實(shí)際的示例吧,多個(gè)任務(wù)并行執(zhí)行,通常我們會(huì)使用 Task.WhenAll 來并行多個(gè) Task 的執(zhí)行,但是 Task.WhenAll 不能限制并發(fā)度,通常我們是會(huì)在異步 task 上封裝一層,使用信號(hào)量來限制并發(fā),示例如下:
using?var?semaphore?=?new?SemaphoreSlim(10,?10); await?Task.WhenAll(Enumerable.Range(1,?100).Select(async?_?=> {try{await?semaphore.WaitAsync();await?Task.Delay(1000);}finally{semaphore.Release();} }));使用 Parallel.ForEachAsync 之后,我們就可以大大簡化我們的代碼:
await?Parallel.ForEachAsync(Enumerable.Range(1,?100),?new?ParallelOptions() {MaxDegreeOfParallelism?=?10 },?async?(_,?_)?=>?await?Task.Delay(1000));這樣是不是簡單了很多。
再來看一個(gè)所有情況的對(duì)比,來看一下是不是符合我們的預(yù)期:
using?System; using?System.Diagnostics; using?System.Linq; using?System.Threading; using?System.Threading.Tasks; using?static?System.Console;var?watch?=?Stopwatch.StartNew(); await?Task.WhenAll(Enumerable.Range(1,?100).Select(_?=>?Task.Delay(1000))); watch.Stop(); WriteLine(watch.ElapsedMilliseconds);watch.Restart(); using?var?semaphore?=?new?SemaphoreSlim(10,?10); await?Task.WhenAll(Enumerable.Range(1,?100).Select(async?_?=> {try{await?semaphore.WaitAsync();await?Task.Delay(1000);}finally{semaphore.Release();} })); watch.Stop(); WriteLine(watch.ElapsedMilliseconds);WriteLine($"{nameof(Environment.ProcessorCount)}:?{Environment.ProcessorCount}");watch.Restart(); await?Parallel.ForEachAsync(Enumerable.Range(1,?100),?async?(_,?_)?=>?await?Task.Delay(1000)); watch.Stop(); WriteLine(watch.ElapsedMilliseconds);watch.Restart(); await?Parallel.ForEachAsync(Enumerable.Range(1,?100),?new?ParallelOptions() {MaxDegreeOfParallelism?=?10 },?async?(_,?_)?=>?await?Task.Delay(1000)); watch.Stop(); WriteLine(watch.ElapsedMilliseconds);watch.Restart(); await?Parallel.ForEachAsync(Enumerable.Range(1,?100),?new?ParallelOptions() {MaxDegreeOfParallelism?=?100 },?async?(_,?_)?=>?await?Task.Delay(1000)); watch.Stop(); WriteLine(watch.ElapsedMilliseconds);watch.Restart(); await?Parallel.ForEachAsync(Enumerable.Range(1,?100),?new?ParallelOptions() {MaxDegreeOfParallelism?=?int.MaxValue },?async?(_,?_)?=>?await?Task.Delay(1000)); watch.Stop(); WriteLine(watch.ElapsedMilliseconds);可以先想一下,每種方式執(zhí)行需要的耗時(shí)大概是多久,之后再嘗試運(yùn)行一下看一下結(jié)果
輸出結(jié)果如下:
outputMore
執(zhí)行結(jié)果是不是符合你的預(yù)期呢?
默認(rèn)情況下,Parallel.ForEachAsync 的最大并行度是當(dāng)前機(jī)器的 CPU 數(shù)量,也就是 Environment.ProcessorCount,如果要不限制可以指定最大并行度為 int.MaxValue
References
https://github.com/WeihanLi/SamplesInPractice/tree/master/net6sample/ParallelSample
https://github.com/dotnet/runtime/pull/46943
https://github.com/dotnet/runtime/blob/911640b3a891f92ff66e9c82ce65f71d203f11a2/src/libraries/System.Threading.Tasks.Parallel/ref/System.Threading.Tasks.Parallel.cs#L39-L44
總結(jié)
以上是生活随笔為你收集整理的.NET 6 新特性 Parallel ForEachAsync的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 哼!看你能坚持多久
- 下一篇: WPF实现用户头像裁剪