关于 Task 简单梳理
〇、前言
Task 是微軟在 .Net 4.0 時代推出來的,也是微軟極力推薦的一種多線程的處理方式。
在 Task 之前有一個高效多線程操作累 ThreadPool,雖然線程池相對于 Thread,具有很多優勢避免頻繁創建和銷毀線程等,但是線程池也有一些使用上的不便,比如不支持取消、完成、失敗通知等,也不支持線程執行的先后順序配置。
為了解決上述痛點,Task 誕生了。Task 就是站在巨人的肩膀上而生,它是基于 ThreadPool 封裝。Task 的控制和擴展性很強,在線程的延續、阻塞、取消、超時等方面遠勝于 ThreadPool。
本文將對 Task 進行一個詳細的介紹。
一、任務如何創建和啟動?
創建任務和執行任務是可以分離的,也可以同時進行。如下代碼有四種開啟任務的方式:
- 第一種:任務 t1 通過調用 Task 類構造函數進行實例化,但僅在任務 t2 啟動后調用其 Start() 方法啟動?!緞摻?未啟動】
- 第二種:任務 t2 通過調用 TaskFactory.StartNew(Action<Object>, Object) 方法在單個方法調用中實例化和啟動。【創建+啟動】
- 第三種:任務 t3 通過調用 Run(Action) 方法在單個方法調用中實例化和啟動。【創建+啟動】
- 第四種:任務 t4 通過調用 RunSynchronously() 方法在主線程上同步執行?!緞摻?未啟動】
static void Main(string[] args)
{
// 用于異步調用的委托函數,接受類型為 Object 的參數
Action<object> action = (object obj) =>
{
// Task.CurrentId :任務 ID
// Thread.CurrentThread.ManagedThreadId :線程 ID
Console.WriteLine($"Task={Task.CurrentId}, obj={obj}, Thread={Thread.CurrentThread.ManagedThreadId}");
// throw new Exception();
};
// 【第一種】創建一個就緒,但【未啟動】的任務,需要在后文通過 t1.Start() 啟動
Task t1 = new Task(action, "甲"); // alpha:初始
// 【第二種】【創建并啟動】一個任務
Task t2 = Task.Factory.StartNew(action, "乙");
// 占用主線程,等待任務 t2 完成
t2.Wait();
// 啟動第一個任務 t1
t1.Start();
Console.WriteLine($"t1 已啟動 (主線程 = {Thread.CurrentThread.ManagedThreadId})");
// 通過 Wait() 占用主線程,等待 t1 執行完畢
t1.Wait();
// 【第三種】通過 Task.Run() 【創建并啟動】一個任務
string taskData = "丙";
Task t3 = Task.Run(() =>
{
Console.WriteLine($"Task={Task.CurrentId}, obj={taskData}, Thread={Thread.CurrentThread.ManagedThreadId}");
});
// 通過 Wait() 占用主線程,等待 t3 執行完畢
t3.Wait();
// 【第四種】創建一個就緒,但【未啟動】的任務 t4
Task t4 = new Task(action, "丁");
// Synchronously:同步的
// 開啟同步任務 t4,在主線程上運行
t4.RunSynchronously();
// t4 是以同步的方式運行的,此時的 Wait() 可以捕捉到異常
t4.Wait();
Console.ReadLine();
}
如下圖輸出結果,最先開啟的 t2,由于是工廠中啟動的,所以不占用主線程運行。Task.Run() 同樣是非主線程運行,但它并未新開線程,而是直接用了 t2 執行的線程。
線程編號為 1 的是主線程,t1 是主線程最先創建的,所以直接由主線程運行。t4 是在同步執行的任務,因此也是主線程來執行。
??
二、等待一個或多個任務
用于等待任務的方法有很多個,如下:
| Wait() | task1.Wait() | 單線程等待 |
| WaitAll() | Task.WaitAll(tasks) | 等待任務集合 tasks 中的全部任務完成 |
| WaitAny() | int index = Task.WaitAny(tasks) | 等待任一任務完成,并返回這一任務的編號 |
| WhenAll() | Task t = Task.WhenAll(tasks) | 返回一個新的任務,這個任務的完成狀態在【tasks 集合中全部任務都完成時】完成 |
| WhenAny() | Task t = Task.WhenAny(tasks) | 返回在任務集合 tasks 中第一個執行完成的任務對象 |
下面幾個示例來實操下。
2.1 Wait()
對于 Wait() 單線程等待,沒啥好說的,看代碼:
static void Main(string[] args)
{
// 創建并執行一個任務執行匿名函數
Task taskA = Task.Run(() => Thread.Sleep(2000));
Console.WriteLine($"taskA Status: {taskA.Status}"); // taskA Status: WaitingToRun
try
{
taskA.Wait(1000); // 主線程等待任務 1s 此時任務尚未完成
Console.WriteLine($"taskA Status: {taskA.Status}"); // taskA Status: Running
taskA.Wait(); // 線程等待任務 taskA 完成
Console.WriteLine($"taskA Status: {taskA.Status}"); // taskA Status: RanToCompletion
}
catch (AggregateException)
{
Console.WriteLine("Exception in taskA.");
}
}
2.2 Wait(Int32, CancellationToken) 支持手動取消
關于 Wait(Int32, CancellationToken) 任務可手動取消的重載。在任務完成之前,超時或調用了 Cancel() 方法,等待終止。
如下示例,一個線程一個任務,線程中將 CabcellationTokenSource 的實例 cts 取消掉,導致后續任務等待時調用 cts.Token 導致異常 OperationCanceledException 的發生。
static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
Thread thread = new Thread(CancelToken); // 新開一個線程執行方法:CancelToken()
thread.Start(cts);
Task t = Task.Run(() => // 新增一個任務執行匿名函數
{
Task.Delay(5000).Wait(); // 延遲等待 5s
Console.WriteLine("Task ended delay...");
});
try
{
Console.WriteLine($"About to wait completion of task {t.Id}"); // 以上兩個操作都有延遲,所以此處消息先打印
// 等待任務 t 1.51s,保證線程已執行完成,就是保證 CancellationTokenSource 已執行過取消操作
// 由于 cts 已經取消,因此次數就拋異常:OperationCanceledException
bool result = t.Wait(1510, cts.Token); // 后邊代碼就不再執行,直接跳到 catch
Console.WriteLine($"Wait completed normally: {result}");
Console.WriteLine($"The task status: {t.Status}");
}
catch (OperationCanceledException e)
{
Console.WriteLine($"{e.GetType().Name}: The wait has been canceled.");
Console.WriteLine($"Task status:{t.Status}"); // 此時程序運行 1.5s 多,任務 t 還在等待,因此狀態是 Running
Thread.Sleep(4000); // 4s + 1.5s > 5s 此時任務 t 已經執行完成,狀態為 RanToCompletion
Console.WriteLine("After sleeping, the task status: {t.Status}");
cts.Dispose();
}
Console.ReadLine();
}
private static void CancelToken(Object obj)
{
Thread.Sleep(1500); // 延遲 1.5s
Console.WriteLine($"Canceling the cancellation token from thread {Thread.CurrentThread.ManagedThreadId}...");
CancellationTokenSource source = obj as CancellationTokenSource;
if (source != null)
source.Cancel(); // 將 CancellationTokenSource 的實例執行取消
}
??
2.3 WaitAll()
等待一組任務全部完成,無論是否拋異常。AggregateException 將會收集全部異常信息,可以通過遍歷獲取每一個異常詳情。
如下代碼,新建是個任務組成任務組 tasks,其中 2~5 線程手動拋異常,最后通過遍歷 AggregateException aex 記錄全部異常。
static void Main(string[] args)
{
var tasks = new List<Task<int>>();
// 創建一個委托,用于任務執行,并記錄每個任務信息
Func<object, int> action = (object obj) =>
{
int i = (int)obj;
// 讓每次的 TickCount 不同(系統開始運行的毫秒數)
Thread.Sleep(i * 1000);
if (2 <= i && i <= 5) // 從第 2 到 5 個任務都拋異常
{
throw new InvalidOperationException("SIMULATED EXCEPTION");
}
int tickCount = Environment.TickCount; // 獲取系統開始運行的毫秒數
Console.WriteLine($"Task={Task.CurrentId}, i={i}, TickCount={tickCount}, Thread={Thread.CurrentThread.ManagedThreadId}");
return tickCount;
};
// 連續創建 10 個任務
for (int i = 0; i < 10; i++)
{
int index = i;
tasks.Add(Task<int>.Factory.StartNew(action, index)); // 后臺線程
}
try
{
// WaitAll() 等待全部任務完成
Task.WaitAll(tasks.ToArray());
// 由于線程中手動拋出了異常,因此這個消息將無法打印在控制臺
Console.WriteLine("WaitAll() has not thrown exceptions. THIS WAS NOT EXPECTED.");
}
catch (AggregateException aex) // AggregateException 異常中包含 2~5 四個異常
{
Console.WriteLine("\nThe following exceptions have been thrown by WaitAll(): (THIS WAS EXPECTED)");
Console.WriteLine($"\ne.InnerExceptions.Count:{aex.InnerExceptions.Count}");
for (int j = 0; j < aex.InnerExceptions.Count; j++) // aex.InnerExceptions.Count == 4
{
Console.WriteLine("\n-------------------------------------------------\n{0}", aex.InnerExceptions[j].ToString());
}
}
Console.ReadLine();
}
??
2.4 WaitAny()
等待一組任務中的任一任務完成,然后返回第一個執行完成任務的序號,可通過tasks[index].Id取得任務 ID。
如下示例,每個任務都有延遲,當第一個任務完成時,遍歷打印出其他全部任務的狀態:
static void Main(string[] args)
{
Task[] tasks = new Task[5];
for (int ctr = 0; ctr <= 4; ctr++)
{
int factor = ctr; // 重新聲明一個變量
tasks[ctr] = Task.Run(() => Thread.Sleep(factor * 250 + 50));
}
int index = Task.WaitAny(tasks); // 等待任一任務結束
Console.WriteLine($"任務 #{tasks[index].Id} 已完成。");
Console.WriteLine("\n當前各個任務的狀態:");
foreach (var t in tasks)
Console.WriteLine($" Task {t.Id}: {t.Status}");
Console.ReadLine();
}
??
參考:https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.task.wait?view=net-7.0
三、延續任務 Task.ContinueWith()
3.1 一個簡單的示例
如下代碼,首先創建一個耗時的任務 task 并啟動,此時也不影響主線程的運行。然后通過task.ContinueWith()在第一個任務執行完成后,執行其中的匿名函數。
static void Main(string[] args)
{
// 創建一個任務
Task<int> task = new Task<int>(() =>
{
int sum = 0;
Console.WriteLine($"使用 Task 執行異步操作,當前線程 {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(2000);
for (int i = 0; i < 100; i++)
{
sum += i;
}
return sum;
});
// 啟動任務
task.Start();
// 主線程在此處可以執行其他處理
Console.WriteLine($"1 主線程 {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1000);
//任務完成時執行處理。
Task cwt = task.ContinueWith(t =>
{
Console.WriteLine($"任務完成后的執行結果:{t.Result} 當前線程 {Thread.CurrentThread.ManagedThreadId}");
});
task.Wait();
cwt.Wait();
Console.WriteLine($"2 主線程 {Thread.CurrentThread.ManagedThreadId}");
Console.ReadLine();
}
??
詳情可參考:https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.task.continuewith?view=net-7.0
3.2 任務的并行與串行
ContinueWith、WaitAll 當這兩者結合起來,我們就可以處理復雜一點的東西。比如,現在有 7 個任務,其中 t1 需要串行,t2-t3 可以并行,t4 需要串行,t5-t6 并行,t7 串行。邏輯如下圖:
??
public static void Main(string[] args)
{
ConcurrentStack<int> stack = new ConcurrentStack<int>(); // ConcurrentStack:線程安全的后進先出(LIFO:LastIn-FirstOut)集合
ConcurrentBag<int> bag = new ConcurrentBag<int>(); // ConcurrentBag:線程安全的無序集合
// t1先串行
var t1 = Task.Factory.StartNew(() =>
{
stack.Push(1);
stack.Push(2);
});
// t1.ContinueWith() t1 之后,t2、t3并行執行
var t2 = t1.ContinueWith(t =>
{
int result;
stack.TryPop(out result);
});
// t2,t3并行執行
var t3 = t1.ContinueWith(t =>
{
int result;
stack.TryPop(out result);
});
// 等待 t2、t3 執行完
Task.WaitAll(t2, t3);
//t4串行執行
var t4 = Task.Factory.StartNew(() =>
{
stack.Push(1);
stack.Push(2);
});
// t5、t6 并行執行
var t5 = t4.ContinueWith(t =>
{
int result;
stack.TryPop(out result);
});
// t5、t6 并行執行
var t6 = t4.ContinueWith(t =>
{
int result;
// 只彈出,不移除
stack.TryPeek(out result);
});
// 臨界區:等待 t5、t6 執行完
Task.WaitAll(t5, t6);
// t7 串行執行
var t7 = Task.Factory.StartNew(() =>
{
Console.WriteLine($"當前集合元素個數:{stack.Count}"); // 當前集合元素個數:1
});
Console.ReadLine();
}
參考: https://www.cnblogs.com/huangxincheng/archive/2012/04/03/2430638.html
https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.task?view=net-7.0
https://www.cnblogs.com/zhaoshujie/p/11082753.html
總結
以上是生活随笔為你收集整理的关于 Task 简单梳理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 京东面试题:Java中 ++i 的操作是
- 下一篇: Hello cnblogs!