初探System.Threading.Channels
。
System.Threading.Channels是.Net Core基礎類庫中實現的一個多線程相關的庫,專門處理數據流相關的操作,用來在生產者和訂閱者之間傳遞數據(不知道可不可以理解為線程間傳遞數據,我把它類比成了Go語言中的Channel),使用時需要通過NuGet安裝。
這個庫的前身是System.Threading.Tasks.Channels,來自實驗性質的核心類庫項目https://github.com/dotnet/corefxlab,但是在2017年9月就不再更新了,目前使用的話需要用到最新的System.Threading.Channels庫,如果你也是第一次接觸的話,就直接上手研究System.Threading.Channels就可以了。
Channel?API操作基于Channel對象,其操作主要由ChannelReader和ChannelWriter兩部分組成,由Channelt提供的工廠方法創建一個有容量限制(或者無限制、最大容量限制)的channel。這點類似于Go語言中的chan的容量,二者在這里有很多的類似的地方,也有不同的地方。
1.1. 和Go語言channel的一些比較
Go語言中的channel默認是沒有容量的,在使用這個沒有容量的channel時,生產者和消費者必須“流動”起來,否則將會阻塞,也就是當生產者寫入channel一個數據時,必須同時有一個接收者接收,否則寫入操作會停止,等待有一個消費者取走channel中的數據,寫入操作才會繼續。
在System.Threading.Channels庫中,沒有類似Go語言的默認容量的機制,需要按需調用不同的Channel對象:
public static Channel<T> CreateBounded<T>(int capacity);?:可以創建一個帶有容量限制的Channel實例對象。
public static Channel<T> CreateBounded<T>(BoundedChannelOptions options)?:創建一個自定義配置的Channel實例對象,可配置容量、以及在接收到新數據時的操作模式等等:
BoundedChannelFullMode.Wait:等待當前寫入完成
BoundedChannelFullMode.DropNewest:刪除并忽略管道中寫入的最新的數據
BoundedChannelFullMode.DropOldest:刪除并忽略管道中最舊的數據
BoundedChannelFullMode.DropWrite:刪除當前正在寫的數據,以寫入管道中的新數據
public static Channel<T> CreateUnbounded<T>();?:創建一個沒有容量限制的Channel實例對象,在實際使用時應當謹慎使用該創建方式,因為可能會發生OutOfMemoryException。
public static Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options):創建一個自定義配置的沒有容量限制的Channel實例對象。該配置選項因為沒有容量限制所以不會有寫入等待操作模式,只有默認的一些配置:
public bool SingleWriter { get; set; }:是否需要一個一個讀
public bool SingleReader { get; set; }:是否需要一個一個寫
public bool AllowSynchronousContinuations { get; set; }:是否需要異步連續操作(我個人理解為異步操作時同時進行讀寫)
Go語言的channel機制和System.Threading.Channels的不同之處有兩個:
Go語言沒有無限容量的channel,而且就我個人的想法而言,無限容量并不“無限”,因為內存是有限的。
System.Threading.Channels沒有單向的channel類型。在Go中可以創建“只讀”或者“只寫”的channel,但是System.Threading.Channels中沒有提供這種操作。
1.2. 生產者、消費者需要的方法
生產者需要使用的一些方法:TryWrite/WriteAsync/WaitToWriteAsync/Complete?消費者需要使用的一些方法:TryRead/ReadAsync/WaitToReadAsync/Completion
方法介紹:
TryRead/TryWrite:嘗試使用同步方式讀取或寫入一項數據,返回讀取或者寫入是否成功。TryRead同時會以out的形式返回讀取到的數據。
ReadAsync/WriteAsync:使用異步方式寫入或者讀取一項數據。
TryComplete/Completion:可以將channel標記為完成狀態,這樣就不會寫入多余的錯誤數據,如果從已完成狀態的channel中ReadAsync時會拋出異常,所以在不需要異步讀取時建議經常使用TryRead。
WaitToReadAsync/WaitToWriteAsync:在嘗試讀取或者寫入數據之前,調用該方法可獲得一個Task<bool>表示讀取或者寫入操作能否進行。
創建一個控制臺程序演示channel的用法:
| 1 2 3 4 5 6 7 8 9101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 | using System;using System.Collections.Generic;using System.Threading.Channels;using System.Threading.Tasks;namespace ConsoleApp1{ class Program { static void Main(string[] args) { Task.Run(async () => { await ChannelRun(0,0, 1, 50, 5); }); Console.WriteLine("運行開始..."); Console.ReadLine(); } /// <summary> /// channel運行 /// </summary> /// <param name="readDelayMs">讀取器每次讀取完等待時間</param> /// <param name="writeDelayMs">寫入器每次寫入完等待時間</param> /// <param name="finalNumberOfReaders">幾個讀取器同時讀取</param> /// <param name="howManyMessages">寫入器總共寫入多少消息</param> /// <param name="maxCapacity">channel最大容量</param> /// <returns></returns> public static async Task ChannelRun(int readDelayMs, int writeDelayMs, int finalNumberOfReaders,int howManyMessages, int maxCapacity ) { // 創建channel var channel = Channel.CreateBounded<string>(maxCapacity); var reader = channel.Reader; var writer = channel.Writer; var tasks = new List<Task>(); // 讀取器執行讀取任務,可以設置多個讀取器同時讀取 for (var i = 0; i < finalNumberOfReaders; i++) { var idx = i; tasks.Add(Task.Run(() => Read(reader, idx + 1,readDelayMs))); } // 寫入器執行寫入操作 for (var i = 0; i < howManyMessages; i++) { Console.WriteLine($"寫入器在{DateTime.Now.ToLongTimeString()}寫入:{i}"); await writer.WriteAsync($"發布消息:'{i}"); // 寫入完等待片刻 await Task.Delay(writeDelayMs); } // 寫入器標記完成狀態 writer.Complete(); // 等待讀取器讀取完成 await reader.Completion; // 等待讀取器所有的Task完成 await Task.WhenAll(tasks); } /// <summary> /// 讀取數據任務 /// </summary> /// <param name="theReader">讀取器</param> /// <param name="readerNumber">讀取器編號</param> /// <param name="delayMs">讀取完等待時間</param> /// <returns>任務</returns> public static async Task Read(ChannelReader<string> theReader, int readerNumber, int delayMs) { // 循環判斷讀取器是否完成狀態 while (await theReader.WaitToReadAsync()) { // 嘗試讀取數據 while (theReader.TryRead(out var theMessage)) { Console.WriteLine($"線程{readerNumber}號讀取器在{DateTime.Now.ToLongTimeString()}讀取到了消息:'{theMessage}'"); // 讀取完等待片刻 await Task.Delay(delayMs); } } } }} |
借助代碼中的注釋應當可以理解示例代碼的作用,對其中的關鍵點做個說明:
寫入器只有一個,寫入的容量由channel的容量控制。
讀取器可以設置多個,由Task調度同時讀取。
2.1. 寫入器、讀取器無等待
寫入器和讀取器不等待,不停的讀寫數據,有一個讀取器,總共寫入50個數據,channel的容量為5,調用傳參如下:
| 1234 | Task.Run(async () =>{ await ChannelRun(0,0, 1, 50, 5);}); |
結果:?
寫入讀取操作在一秒內完成了,觀察輸出可以發現,寫入和讀取交替進行,寫入的數據會立刻被讀取器讀取出來打印在終端內。
2.2. 讀取器阻塞(等待)
將讀取器的等待時間設置長一些,觀察一下寫入器是否會被阻塞,調用傳參如下:
| 1234 | Task.Run(async () =>{ await ChannelRun(10000,0, 1, 50, 5);}); |
結果:?
從輸出的結果可以看見,在程序開始時寫入器寫入了6個數據(但是調試的時候capacity的值時5,這里的機制有待考證),然后每過10秒讀取器讀取一個數據后,寫入器才能寫入一個數據,由于讀取器的速度限制相當于將寫入器也進行了阻塞。
2.3. 多個讀取器同時讀取
讀取器還是每讀取一次暫停10秒,但是有5個Task同時讀取,調用傳參如下:
| 1234 | Task.Run(async () =>{ await ChannelRun(10000,0, 1, 50, 5);}); |
結果:?
從輸出可以看出來,5個讀取器Task可以每10秒鐘同時讀取5個數據,而寫入器也同樣的幾乎是每次寫入5個數據。
System.Threading.Channels作為一個線程間通信的庫,用來當作發布者/訂閱者組件使用非常方便。但是比起Go語言中的channel還是有些區別的,因為c#的Async/Await從某中意義上講,并不是真正的多線程。
| 1 2 3 4 5 6 7 8 91011121314151617181920212223242526272829303132333435363738394041424344454647 | package mainimport ( "fmt" "time")func main() { fmt.Println("運行開始...") channelRun(2000, 0, 5, 50, 10)}// channel運行實例// readDelayMs, writeDelayMs分別是讀取器需要暫停的時間和寫入器需要暫停的時間// finalNumberOfReaders是讀取器個個數// howManyMessages是消息的總數// maxCapacity是channel的容量func channelRun(readDelayMs, writeDelayMs, finalNumberOfReaders, howManyMessages, maxCapacity int) { // 創建channel channel := make(chan string, maxCapacity) for i := 0; i < finalNumberOfReaders; i++ { go func(i int) { read(channel, i, readDelayMs) }(i) } for i := 0; i < howManyMessages; i++ { fmt.Printf("寫入器在%v寫入:%v\n", time.Now().Format("2006-01-02 15:04:05.0000"), i) channel <- fmt.Sprintf("發布消息:%v", i) time.Sleep(time.Duration(writeDelayMs) * time.Millisecond) }}// 讀取器從channel中讀取數據// channel時chan的實例// readerNumner代表了當前Goroutine的編號// delayMs表示當前的Goroutine讀取完需要等待的時間func read(channel chan string, readerNumber, delayMs int) { // 不斷循環讀取 for { select { case msg := <-channel: fmt.Printf("%v號Gouroutine 讀取器在%v讀取到了消息:'%s'\n", readerNumber, time.Now().Format("2006-01-02 15:04:05.0000"), msg) time.Sleep(time.Duration(delayMs) * time.Millisecond) } }} |
結果:?
https://medium.com/@alexyakunin/go-vs-c-part-1-goroutines-vs-async-await-ac909c651c11
https://github.com/dotnet/corefx/blob/master/src/System.Threading.Channels/tests/ChannelTests.cs
https://sachabarbs.wordpress.com/2018/11/28/system-threading-channels/
總結
以上是生活随笔為你收集整理的初探System.Threading.Channels的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一个通用数据库操作组件DBUtil(c#
- 下一篇: SQL Server之索引解析(二)