一文读懂 .NET 中的高性能队列 Channel
介紹
System.Threading.Channels 是.NET Core 3.0 后推出的新的集合類型, 具有異步API,高性能,線程安全等特點,它可以用來做消息隊列,進行數(shù)據(jù)的生產(chǎn)和消費, 公開的?Writer?和?Reader?api對應消息的生產(chǎn)者和消費者,也讓Channel更加的簡潔和易用,與Rabbit MQ 等其他隊列不同的是,Channel 是進程內的隊列。
開始Channel之旅
創(chuàng)建一個 channel 非常簡單,Channel 類公開的API支持創(chuàng)建無限容量和有限容量的 channel
// 創(chuàng)建有限容量的channel var channel = Channel.CreateBounded<string>(100);// 創(chuàng)建無限容量的channel var channel = Channel.CreateUnbounded<string>();這里需要注意的是,當你使用一個有限容量的 Channel 時,你需要指定容量的大小,還可以指定一個?BoundedChannelFullMode?的枚舉類型,來告訴 channel 達到容量限制的時候,繼續(xù)寫入時應該怎么處理
public enum BoundedChannelFullMode { Wait, DropNewest,DropOldest,DropWrite }?Wait 是默認值,當 channel 容量滿了以后,寫入數(shù)據(jù)時會返回 false,直到channel有數(shù)據(jù)被消費了以后,才可以繼續(xù)寫入?DropNewest 移除最新的數(shù)據(jù),也就是從隊列尾部開始移除?DropOldest 移除最老的數(shù)據(jù),也就是從隊列頭部開始移除?DropWrite 寫入數(shù)據(jù)返回成功,但是轉頭就把剛才的數(shù)據(jù)丟了
// 創(chuàng)建有限容量的channel, 并指定容量達到最大的策略 var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(100) { FullMode = BoundedChannelFullMode.Wait });生產(chǎn)數(shù)據(jù)
生產(chǎn)數(shù)據(jù)主要通過 Channel 提供的?Writer?api, 常規(guī)的寫入操作如下:
await channel.Writer.WriteAsync("hello");Channel 還提供了?TryWrite()?方法,如果寫入數(shù)據(jù)失敗時會返回 false,WaitToWriteAsync()?方法會做非阻塞的等待,直到 Channel 允許寫入新的數(shù)據(jù)時返回 true,同樣的 Channel 關閉后會返回 false
消費數(shù)據(jù)
消費數(shù)據(jù)主要通過 Channel 提供的?Reader?api, 常規(guī)的讀取操作如下:
var item = await channel.Reader.ReadAsync();同樣的,Channel 提供了?TryRead()?嘗試讀取數(shù)據(jù),WaitToReadAsync()?方法會做非阻塞的等待,直到 Channel 可以讀取到數(shù)據(jù)時會返回 true,在 Channel 關閉后會返回 false,另外你可以通過?channel.Reader.Count?獲取隊列元素的數(shù)量。
在實際的使用場景中,可能需要一些后臺任務,長時間的進行消費,那么你可以使用下邊的方式
while (await channel.Reader.WaitToReadAsync()) {while (channel.Reader.TryRead(out var item)){Console.WriteLine(item);} }ReadAllAsync()?方法返回的是一個?IAsyncEnumerable<T>?對象,也可以用?await foreach?的方式來獲取數(shù)據(jù)
await foreach(var item in channel.Reader.ReadAllAsync()) {Console.WriteLine(item); }單一生產(chǎn)者和消費者
創(chuàng)建 Channel 時,可以設置 ChannelOptions 的?SingleWriter?和?SingleReader,來指定 Channel 時單一的生產(chǎn)者和消費者,默認都是 false,當設置了 SingleWriter = true 時, 會限制同一個時間只能有一個生產(chǎn)者可以寫入數(shù)據(jù), SingleReader = true 是同樣的。
另外,如果只需要一個消費者的話,你應該設置?SingleReader = true, Channel 在內部做了一些優(yōu)化,在讀取時避免了鎖操作,性能上有些許的提升。
性能
這里的基準測試我對比了三種類型,Channel, BufferBlock, BlockingCollection,分別寫入了10000條數(shù)據(jù),然后進行讀取,發(fā)現(xiàn) Channel 確實是表現(xiàn)比較好。
總結
Channel 實際上還是使用?ConcurrentQueue做的封裝, 使用起來更方便,對異步更友好,另外,.NET 5 其中的 Quic 內部就使用了Channel,CAP 也在新版本中使用 Channel 替換掉了之前的 BlockingCollection,來實現(xiàn)進程內的隊列。
官方介紹 https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels
源碼 https://github.com/dotnet/runtime/tree/main/src/libraries/System.Threading.Channels/src/System/Threading/Channels
CAP https://github.com/dotnetcore/CAP
Quic https://github.com/dotnet/runtime/tree/main/src/libraries/System.Net.Quic
總結
以上是生活随笔為你收集整理的一文读懂 .NET 中的高性能队列 Channel的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用 Blazor 开发内部后台(二):
- 下一篇: C# 枚举(Enum)