Pipelines - .NET中的新IO API指引(二)
原文:Pipelines - a guided tour of the new IO API in .NET, part 2
作者:marcgravell
在上一章,我們討論了以往的StreamAPI中存在的一些問題,并且介紹了Pipe,PipeWriter,PipeReader?等API,研究如何寫出一個Pipe?并且從中消費數據,我們也討論了FlushAsync()?和ReadAsync()?是如何協同保證兩端的工作,從而解決“空”和“滿”的場景——在沒有數據時掛起reader,并在數據到來時恢復它;在寫入快過讀取(即pipe滿載)時掛起writer,并在reader追上后恢復它;并且我們也在線程模型的層面上探討了什么是“掛起”。
在這章,我們將會研究pipelines的內存模型:數據實際上存在于哪里?我們也會開始著手研究如何在現實場景中使用pipeline以滿足真實需求。
內存模型:我的數據在哪里?
在上一章,我們講了pipe如何管理所有的緩沖區,允許writer通過?GetMemory()和GetSpan()請求緩沖區,隨后通過ReadAsync()中的?.Buffer?將提交后的數據暴露給reader——reader取得的數據是一個?ReadOnlySequence<byte>,即全部數據其中的一些片段。
那么其中究竟發生了什么?
每一個Pipe實例都有一個引用指向MemoryPool<byte>——一個System.Memory中的新東西,顧名思義,它創建了一個內存池。在創建Pipe的時候,你可以在選項中指定一個專門的?MemoryPool<byte>,但是在默認情況下(我猜也是大多數情況下)——應該是使用一個應用級別共享的 (MemoryPool<byte>.Shared) 內存池。
MemoryPool<byte>?的概念是非常開放的。其默認的實現是簡單地使用ArrayPool<byte>.Shared(應用級別的數組池),在需要的時候租借數組,并在使用完后歸還。這個?ArrayPool<T>?使用了?WeakReference來實現,所以池化的數組在內存有壓力時是可以回收的,但是,當你請求GetMemory(someSize)?或者?GetSpan(someSize)時,它并不是簡單地向內存池請求“someSize”,相反,它在內部追蹤了一個“片段(segment)”,一個新“片段”將是(默認情況下,可以通過配置改變)someSize和2048字節中的最大值,這樣在請求一個大小可觀的內存時就意味著我們的系統不會充滿著許多小數組,而后者會對GC造成顯著碰撞。當你在writer中?Advance(bytesWritten),它:
移動一個表達當前已使用多少片段的內部計數器
更新reader的“備讀(available to be read)”鏈的末端;如果我們剛剛對一個空片段的第一個字節進行了寫入,這意味著將會向鏈中增加一個新片段,否則,它意味著當前鏈的結尾標志被增加(后移)
這就是我們從?ReadAsync()中獲取到的“備讀”鏈;而當我們在reader中?AdvanceTo?——在整個片段都被消費后,pipe會將這些片段送回內存池。在那里,它們可以被多次復用。并且作為上述兩點導致的直接結果,我們可以看到在大多數情況下(即使在writer中多次調用Advance?),我們最終會在reader中發現一個單獨的片段;而如果是在片段邊界處,或reader落后于writer,數據開始累積的情況下,會有多個片段。
只有使用默認池才能:
我們不用在每次調用GetMemory()?/?GetSpan()時都要分配內存
我們不需要每次GetMemory()?/?GetSpan()都產生一個單獨的數組——通常我們只是獲得同樣的“片段”中的某個不同的范圍
只使用少量的大緩沖數組
它們不需要大量的類庫代碼,就可以自動回收
當不再需要時,它們可以被GC回收
這也解釋了為什么可以在GetMemory()?/?GetSpan()中請求少量空間再在之后檢查其大小:我們可以訪問當前段的剩下未使用的部分。這意味著:一個大小為2048的片段,在之前的寫入中用掉了200字節——即使我們只請求5字節,我們也可以看到我們還剩下1848字節可供使用,或者更多——記住:從ArrayPool.Shared?中獲取到的數組也是一個“至少這么大”的操作。
零復制緩沖區
在此還有需要注意的地方是,我們獲取數據緩沖的時候,沒有進行任何數據的復制。writer申請一個緩沖區,然后第一次寫入數據到需要的位置。這就成了writer和reader之間的緩沖區,無需復制數據。而如果reader當前無法處理完所有的數據,它能夠通過顯示聲明其“未被消費”的方式將數據放回pipe。這樣無需為reader維護一個單獨的數據積壓處(backlog),而種情況這在使用Stream的協議處理代碼中是非常常見的。
正是這種功能間的組合使得pipeline代碼在內存層面顯得非常友好。你可以用Stream做到所有的這些,但是卻需要大量令人痛苦的易出錯的代碼去實現,甚至需要更多,如果你想做好的話——并且你幾乎必須去為每個場景單獨地實現它。Pipelines讓良好的內存處理變為默認的簡單的途徑——落入成功之中(譯注:即如自由落體一般實現成功的代碼)
更多奇特的內存池
你并不受限于使用我們之前討論的內存池;你可以實現你自己的自定義內存池!默認內存池的優點在于它很簡單。尤其是在我們是否100%完美地返回每個片段并不重要的情況下——如果我們以某種方式丟棄某個pipe,最壞的情況會是GC將在某個時刻回收掉被丟棄的片段。它們不會回到池中,但那沒關系。
但是,你可以做很多有趣的東西。想象一下,比如一個?MemoryPool<byte>承載巨量的內存——通過一些非常大的數組得到的托管內存,或是通過?Marshal.AllocHGlobal?獲得的非托管內存(注意?Memory?和?Span?并不受限于數組——它們需要的不過是某種連續內存),按需使用這些巨大的內存塊。這有很大的潛在場景,但是它會使片段的可靠回收變得更加重要。大多數系統不應該這么做,但是提供這樣的靈活性是好的。
在真實系統中有用的pipes
我們在第一部分中用的例子,是一個讀寫均在同一代碼的單獨Pipe。很明顯這不是個真實場景(除非我們是在試圖模擬一個"echo"服務器),所以我們在更真實的場景中可以做什么呢?首先,我們需要把我們的pipelines連接到什么東西上。我們通常并不想單獨地使用pipe,相反,我們希望可以有一個結合一個普遍的系統或API使用的pipe。所以,來讓我們開始看看接下來會是什么樣子吧。
在這里,我們需要注意:發布于.NET Core 2.1的pipelines不包括任何終端實現。這意味著:?Pipe?雖然存在,但是在框架內沒有提供任何的與現有系統的實際連接——就像提供了抽象的?Stream?基類,卻沒有?FileStream,,NetworkStream等。是的,這聽起來讓人感到失望,但是這只是因為時間所限,不要慌!現在在進行一些關于它們應該以哪種優先級實現的“活躍的”討論。并且現在有一些社區貢獻來補足那些最為明顯的缺陷。
一旦我們處于那些場景,我們可能會問:“將pipelines連接到另一個數據后端需要什么?”
也許將一個pipe連接到一個?Stream會是一個不錯的開頭。我知道你在想:“但是Marc,你在上一章你不遺余力地再說?Stream?有多么糟糕!”。我沒有改變我的看法,它不一定是完美的——對于那些特定場景的Stream實現(比如NetworkStream或FileStream)我們可以有一個專門的基于pipelines的終端直接與那些服務以最小的中轉代價進行通訊;但是這是一個有用的起步:
它使我們可以立即訪問到巨量的API——任何可以通過Stream暴露數據,或任何通過封裝的streams作為中間層的API(加密、壓縮等)
它將所有老舊的StreamAPI隱藏在一個明確清晰的表層下
它帶來了幾乎所有我們之前提到過的優點
所以,讓我們開始吧!我們首先要思考的是:這里的方向是什么?就像剛才提到的一樣,Stream是模糊不清的——可能只讀,只寫,或可讀可寫。來假設我們想解決的是最通常的問題:一個可讀可寫表現為雙工行為的stream——這可以讓我們訪問如sockets(通過NetworkStream)之類的東西。這意味著我們實際上將會需要兩個pipe——一個用來輸入,一個用來輸出。Pipelines通過明確地聲明IDuplexPipe接口來幫助我們指明道路。這是一個非常簡單的接口,數據傳輸給IDuplexPipe就像傳輸給兩個pipe的端點一樣——一個標記為"in",一個標記為"out":
interface IDuplexPipe{PipeReader Input { get; }PipeWriter Output { get; }}我們接下來想要做的是創建一個類來實現?IDuplexPipe,不過其內部使用了兩個Pipe實例:
一個Pipe會是輸出緩沖區(從消費者的角度來看),它將會在調用者寫入Output時被填充——并且我們將會用一個循環來消費這個Pipe并且將數據推入底層Stream(被用來寫入網絡,或者其它任何stream可以寫入的)
一個Pipe將會是輸入緩沖區(從消費者的角度來看),我們將有一個循環來從底層Stream讀取數據,并將其推入Pipe,它將會在調用者從Input中讀取時排出
這個方法可以立即解決普遍影響著那些使用Stream的人一大堆的問題:
我們現在有了input/output緩沖區,用于從讀/寫調用中解耦stream訪問,而不用添加BufferedStream或是其它類似的防止數據碎片的功能(對于寫入代碼來說),并且這將會使我們在處理數據時很方便去接收更多數據(特別是對于讀取代碼來說,這樣我們不用在請求更多數據時保持暫停)
如果調用代碼的寫入,快過stream的Write可以處理的程度,背壓特性將會展現出來,對調用代碼進行節流,這樣我們不會被充滿未發送數據的巨大緩沖區所終結
如果stream的Read超過了消費這些數據的調用代碼,背壓特性也會在這里出場,對我們的stream讀取循環進行節流,這樣我們不會被充滿未處理數據的巨大緩沖區所終結
讀取和寫入代碼都會受益于我們之前所討論的內存池的所有優點
調用代碼從來不用擔心數據的后備存儲(未完成幀)等——pipe去解決它
那么它看起來會是什么樣?
基本上,我們需要做的就是這樣:
class StreamDuplexPipe : IDuplexPipe{Stream _stream;Pipe _readPipe, _writePipe;public PipeReader Input => _readPipe.Reader;public PipeWriter Output => _writePipe.Writer;// ... more here}注意我們有兩個不同的pipe;調用者獲取每個pipe的一個端點——然后我們的代碼將會操作每個pipe的另一個端點。
對pipe進行泵送(Pumping)
那么我們與stream交互的代碼是什么樣的呢?像之前說過的那樣,我們需要兩個方法。首先——很簡單——一個循環,從_stream中讀取數據并且將其推入_readPipe,然后被調用代碼所消費;這個方法的核心類似這樣:
while (true){// note we'll usually get *much* more than we ask forvar buffer = _readPipe.Writer.GetMemory(1);int bytes = await _stream.ReadAsync(buffer);_readPipe.Writer.Advance(bytes);if (bytes == 0) break; // source EOFvar flush = await _readPipe.Writer.FlushAsync();if (flush.IsCompleted || flush.IsCanceled) break;}這個循環向pipie請求一個緩沖區,然后用?netcoreapp2.1?中Stream.ReadAsync?的新重載接收一個?Memory<byte>?來填充緩沖區——我們一會兒討論如果你現在沒有一個能接收?Memory<byte>的API該怎么辦。當讀取完成后,它使用Advance向pipe提交這個數量的字節,然后它在pipe上調用?FlushAsync()?來(如果需要的話)喚醒reader,或者在背壓減輕時暫停寫循環。注意我們還需要檢查Pipe的?FlushAsync()的結果——它可以告訴我們pipe的消費者已經告知其已經讀取完了所有想要的數據(Iscompleted),或者pipe本身被關閉(IsCanceled)。
注意在這兩種情況下,我們都希望確保在此循環退出時告訴管道,這樣我們就不會最終在沒有數據到來時永遠在調用端等待下去,有時會發生意外,有時在調用?_stream.ReadAsync?(或其它方法)可能會有異常拋出,所以最好是利用try/finally:
Exception error = null;try{// our loop from the previous sample}catch(Exception ex) { error = ex; }finally { _readPipe.Writer.Complete(error); }如果你愿意的話,你可以使用兩個?Complete?——一個在try末尾(成功時),一個在catch中(失敗時)。
我們需要的第二個方法會比較復雜。我們需要一個循環來從_writePipe中消費數據,然后將其推入_stream。核心代碼會像這樣:
while (true){var read = await _writePipe.Reader.ReadAsync();var buffer = read.Buffer;if (buffer.IsCanceled) break;if (buffer.IsEmpty && read.IsCompleted) break;// write everything we got to the streamforeach (var segment in buffer){await _stream.WriteAsync(segment);}_writePipe.AdvanceTo(buffer.End);await _stream.FlushAsync(); ? ?}這會等待一些數據(可能在多個緩沖區里),然后進行一些退出判斷檢查;像之前一樣,我們可以在IsCanceled時放棄,但是下一個檢查會比較微妙:我們不希望只因為producer表示它們已經寫入了所有想要的數據(Iscompleted)就停止寫入,不然我們也許會丟失它們末尾幾段數據——我們需要繼續直到我們已經寫入了它們所有的數據,直到buffer.IsEmpty。這是個簡化后的例子,因為我們一直寫入所有數據——我們之后會看到更復雜的例子。一旦我們有了數據,我們按順序將每個非連續緩沖區寫入stream中——因為Stream一次只能寫入一個緩沖區(同樣,我使用的是netcoreapp2.1中的重載,接受ReadOnlyMemory<byte>參數,但是我們不限于此)。一旦它寫完了緩沖區,它告訴pipe我們已經消費完了所有數據,然后刷新(flush)底層的Stream。
在“真實”代碼中,我們也許希望更積極地優化從而減少刷新底層stream,直到我們知道再也不會有可讀取的數據,那么也許在_writePipe.Reader.ReadAsync()之外我們可以使用_writePipe.Reader.TryRead(...)。這個方法的工作方式類似于ReadAsync(),但是會保證同步返回——這可以用來測試“在我忙的時候writer是否附加了什么?”。但是上面的內容已經講述了這一點。
另外,像之前一樣,我們也許需要添加一個?try/finally,這樣在我們退出時總是會調用_writePipe.Reader.Complete()。
我們可以使用?PipeScheduler?來啟動這兩個泵(pumps),這會確保它們在預期環境中運行,然后我們的循環開始泵送數據。我們要添加一些格外的內容(我們可能需要一種機制來?Close()/Dispose()?底層stream等)——但是像你所看到的,將?IDuplexPipe?連接到沒有pipeline設計的源并不需要是一項艱巨的任務。
這是我之前做的...
我已經將上面的內容簡化了一些(說真的,不是太多),以便讓它適合討論,但是你可能仍然不應該從這里復制粘貼代碼來嘗試讓它工作。我并沒有聲稱它們是適用于所有情況的完美解決方案,但是作為StackExchange.Redis 2.0版工作的一部分,我們實現了一系列pipelines的綁定放在nuget上——毫無創意地命名為?Pipelines.Sockets.Unofficial(nuget,github(https://github.com/mgravell/Pipelines.Sockets.Unofficial),它包括了:
將雙工的Stream轉換為?IDuplexPipe?(就像上面說的)
將只讀Stream轉換為PipeReader
將只寫Stream轉換為PipeWriter
將?IDuplexPipe?轉換為雙工的Stream
將PipeReader轉換為只讀Stream
將PipeWriter轉換為只寫Stream
將Socket直接轉換成IDuplexPipe(不經過NetworkStream)
前六個在?StreamConnection的靜態方法中,最后一個在SocketConnection里。
StackExchange.Redis?牽涉著大量Socket工作,所以我們對如何將pipeline連接到socket上非常感興趣,對于沒有TLS的redis連接,我們可以直接將我們的Socket連接到pipeline:
Socket???SocketConnection
對于需要TLS的redis連接(比如云redis提供商),我們可以這樣連接:
Socket???NetworkStream???SslStream???StreamConnection
所有這兩種配置都是一個Socket在其中一端,一個IDuplexPipe在另一端,它開始展示我們如何將pipeline作為更復雜系統的一部分。也許更重要的是,它為我們在未來實施改變提供了空間。將來有可能的例子:
Tim Seaward一直在折騰Leto,它提供了不需要?SslStream?,直接用IDuplexPipe實現TLS的能力(并且不需要stream逆變器)
在 Tim Seaward,David Fowler 和Ben Adams之間,有一系列直接實現pipelines而不用托管sockets的實驗性/正在進行的網絡層工作,包括"libuv","RIO"(Registerd IO),和最近的"magma"——它將整個TCP棧推入用戶代碼從而減少系統調用。
看這個空間如何發展將會非常有趣!
但是我當前的API不會使用?Span?或者?Memory!
當在寫將數據從pipe中泵送到其它系統(比如一個Socket)時,很有可能你會遇到不接收?Span或者?Memory的API。不要慌,這沒有大礙,你依然可以有很多種變通方案使其變得更……傳統。
在你有一個?Memory?或者?ReadOnlyMemory時,第一個技巧是MemoryMarshal.TryGetArray(...)。它接收一個memory并且嘗試獲取一個ArraySegment?,它用一個T[]vector和一個int偏移/計數對描述相同的數據。顯然,這只有在這塊內存是基于一個vector時才能用,而情況并非總是如此,所以這可能會在異種的內存池上失敗。我們第二個解決辦法時MemoryMarshal.GetReference(...),它接受一個span然后返回一個原始數據起點的引用(實際上是一個“托管指針”,又叫做?ref T)。一旦我們有了一個?ref T,我們可以用unsafe語法來獲得一個這個數據的非托管指針,在這種情況下會有用:
Span<byte> span = ...fixed(byte* ptr = &MemoryMarshal.GetReference(span)){// ...}即使span的長度是零,你依然可以這么做,其會返回一個第0項將會存在的位置,而且甚至在使用defaultspan即根本沒有實際后備內存的時候,也可以這么使用。后面這個有一點需要注意,因為ref T通常不被認為會是null,但是在這里它卻是了。實際上,只要你不去嘗試對這種空引用進行解引用,不會有什么問題。如果你使用fixed將其轉換為一個非托管指針,你會得到一個空(零)指針,這相對來說更合理(并且在一些P/Invoke場景中會有用),MemoryMarshal?本質上是unsafe?代碼的同義詞,即使你調用的那段代碼并沒有使用unsafe?關鍵字。使用它是完全有效的,但是如果不恰當地使用它,它可能會坑到你——所以小心就是了。
Pipe的應用端代碼是什么樣的?
OK,我們有了IDuplexPipe,并且我們也看到了如何將兩個pipe的“業務端”連接到你選擇的后端數據服務。現在,我們在應用代碼中如何使用它?
按照我們上一章的例子,我們將從?IDuplexPipe.Output?中把PipeWriter傳遞給我們的出站代碼,從?IDuplexPipe.Input?中把?PipeReader?傳遞給我們的入站代碼。
出站代碼相當簡單,并且通常是需要直接從基于Stream的代碼移植成基于PipeWriter的代碼。關鍵的區別還是那樣,即你不再手動控制緩沖區。下面是一個典型的實現:
ValueTask<bool> Write(SomeMessageType message, PipeWriter writer){// (this may be multiple GetSpan/Advance calls, or a loop,// depending on what makes sense for the message/protocol)var span = writer.GetSpan(...);// TODO: ... actually write the messageint bytesWritten = ... // from writingwriter.Advance(bytesWritten);return FlushAsync(writer);}private static async ValueTask<bool> FlushAsync(PipeWriter writer){// apply back-pressure etcvar flush = await writer.FlushAsync();// tell the calling code whether any more messages// should be writtenreturn !(flush.IsCanceled || flush.IsCompleted);}Write?的第一部分是我們的業務代碼,我們需要把數據從writer寫入到緩沖區;通常這會多次調用?GetSpan(...)?和?Advance()。當我們寫完了數據,我們可以flush它從而保證啟動泵送并且應用背壓控制。對于那些非常大的消息體,我們也可以在中間點flush,但是對于大多數場景:一個消息flush一次足夠了。
如果你好奇為什么我將FlushAsync?分割到不同的代碼中:那是因為我想await?FlushAsync的結果來檢查退出條件,所以它需要在一個async?方法里,在這里最有效率的訪問內存方式是通過?Span<byte>?API,Span<byte>?是一個?ref struct?類型,因此我們不能在異步方法中將 Span<byte> 作為局部變量使用。一個實用的辦法是簡單地分割代碼,這樣一個方法做?Span<byte>?工作,一個方法做async方面的工作。
發散一下:異步代碼、同步熱路徑和異步機制開銷
async?/?await?中引入的機制(譯注:指ValueTask,machinery應該是和async狀態機關聯的詞,但是我并不知道怎么翻譯合適,只好翻譯成機制了)非常棒,但是它仍然會是一個會產生驚人棧開銷的工作——你可以從?sharplab.io?中看到——看看OurCode.FlushAsync?中生成的機制——和整個?struct <FlushAsync>d__0。現在,這些代碼并不是很糟糕——它非常努力地嘗試在同步路徑上避免內存分配——但是沒有必要。
這里有兩種方法可以顯著地改善它;一個是壓根不去?await?,通常如果?await?是在方法中地最后一行并且我們不需要去處理結果:不去?await?——只要去除async然后return這個task——完成或者未完成。在這里我們沒辦法這樣做,因為我們需要去檢查返回的狀態,但是我們可以通過檢查這個task是否已經完成來對成功的結果進行優化(通過?.IsCompletedSuccessfully?——如果它已經結束但是有錯誤,我們仍然需要使用await來讓異常可以正確表現出來)。如果它是成功完成的,我們可以請求到.Result。所以我們也可以將FlushAsync?寫成這樣:
private static ValueTask<bool> Flush(PipeWriter writer){bool GetResult(FlushResult flush)// tell the calling code whether any more messages// should be written=> !(flush.IsCanceled || flush.IsCompleted);async ValueTask<bool> Awaited(ValueTask<FlushResult> incomplete)=> GetResult(await incomplete);// apply back-pressure etcvar flushTask = writer.FlushAsync();return flushTask.IsCompletedSuccessfully? new ValueTask<bool>(GetResult(flushTask.Result)): Awaited(flushTask);}這在大多數情況(同步完成)下完全避免了async/await?機制——如我們再次在?sharplab.io中看到的一樣。我要強調:如果代碼是經常(或僅僅)進行真正的異步行為時,這樣做是完全沒有必要的;它只對于那些結果通常(或僅僅)會同步地產生時才有幫助。
(譯注:對于ValueTask的"hot path"場景的使用,這里有個視頻講過一些,以及其它一些.NET中新的優化性能的方法:?Adam Sitnik - State of the .NET Performance)
那么Reader呢?
就像我們多次看到的一樣,reader總是稍微復雜一些——我們無從得知一個單獨的“讀”操作是否會準確包含一個入站消息,我們也許需要開啟循環直到我們獲取到了所有所需的數據,并且我們也許需要推回一些多余的數據。因此,讓我們假設我們想要消費某種單一的消息:
async ValueTask<SomeMessageType> GetNextMessage(PipeReader reader,CancellationToken cancellationToken = default){while (true){var read = await reader.ReadAsync(cancellationToken);if (read.IsCanceled) ThrowCanceled();// can we find a complete frame?var buffer = read.Buffer;if (TryParseFrame(buffer,out SomeMessageType nextMessage,out SequencePosition consumedTo)){reader.AdvanceTo(consumedTo);return nextMessage;}reader.AdvanceTo(buffer.Start, buffer.End);if (read.IsCompleted) ThrowEOF(); ? ? ? ?}}這里我們從pipe中獲取了一些數據,進行退出檢查(比如取消)。然后我們嘗試辨識一個消息,這是什么意思取決于你具體的代碼——它可以是:
從緩沖區中尋找某些特定的值,比如一個ASCII行尾,然后把所有到這里的數據當作一個消息(丟棄行尾)
解析一個定義良好的二進制幀頭,獲取其內容長度,通過檢查獲取這樣長度的數據然后處理
或者其它你需要的!
如果我們能夠辨識到一個消息,我們可以告訴pipe令其丟棄我們已經消費過的數據——通過?AdvanceTo(consumedTo),在這里使用我們自己的幀解析代碼告訴我們消費了多少。如果我們沒能辨識出一個消息,我們要做的第一件事就是告訴pipe我們什么也沒消費,盡管我們嘗試讀取了所有數據——通過?reader.AdvanceTo(buffer.Start, buffer.End)。在這里會有兩種可能:
我們還沒有獲得足夠的數據
pipe已經死亡,我們再也不會獲得足夠的數據
我們通過read.IsCompleted?檢查了這些,在第二種情況時報告錯誤;否則我們繼續循環,等待更多數據。那么剩下的,就是我們的幀解析——我們已經把復雜的IO管理降低成了簡單的操作;比如,如果我們的消息是以行標記分隔:
private static bool TryParseFrame(ReadOnlySequence<byte> buffer,out SomeMessageType nextMessage,out SequencePosition consumedTo){// find the end-of-line markervar eol = buffer.PositionOf((byte)'\n');if (eol == null){nextMessage = default;consumedTo = default;return false;}// read past the line-endingconsumedTo = buffer.GetPosition(1, eol.Value);// consume the datavar payload = buffer.Slice(0, eol.Value);nextMessage = ReadSomeMessageType(payload);return true;}這里PositionOf?嘗試獲取第一個行標記的位置。如果一個也找不到,我們就放棄,否則我們將consumedTo?設為”行標記+1“(即我們會消費行標記),然后我們分割我們的緩沖區來創建一個子集,表示不包括行標記的內容,這樣我們就可以解析了。最終,我們報告成功,并且慶祝我們可以簡單地解析Linux風格的行尾。
這里的重點是什么?
用這些和大多數最簡單最簡樸的Stream版本(沒有任何nice的特性)非常相似的最少量的代碼,我們的應用現在有了一個reader和writer,利用廣泛的能力確保高效和有效的處理。你可以用Stream來做所有的這些事,但是這樣真的、真的很難去做好做可靠。通過將所有的這些特性集成進框架,許多代碼都可以受益于這一單獨的實現。并且它也給了那些直接在pipeline API上開發并且對自定義pipeline端點和修飾感興趣的人更多的未來空間。
總結
在這節,我們研究了pipeline使用的內存模型和其如何幫助我們避免分配內存,然后我們研究了怎樣才可以將pipeline與現有的API和系統(如Stream)進行交互——并且我們介紹了?Pipelines.Sockets.Unofficial?這樣的可用的工具庫。我們研究了在不支持 span/memory 代碼的API上集成它們的可用選項,最終我們展示了和pipeline交互的真正的調用代碼是什么樣子的(并且簡單地介紹了如何優化那些通常是同步的async代碼)——展示了我們的應用代碼會是什么樣子。在最后一部分,我們將會研究如何在開發現實中的庫,比如StackExchange.Redis,將我們學到的這些知識點聯系起來——討論我們在代碼里需要解決哪些復雜點,而pipeline又如何將它們變得簡單。
相關文章:
System.IO.Pipelines: .NET高性能IO
Pipelines - .NET中的新IO API指引(一)
原文地址:?https://zhuanlan.zhihu.com/p/39969692
.NET社區新聞,深度好文,歡迎訪問公眾號文章匯總 http://www.csharpkit.com
總結
以上是生活随笔為你收集整理的Pipelines - .NET中的新IO API指引(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用C#读写结构化的二进制文件
- 下一篇: 一个迄今为止最快的并发键值存储库FAST