TPL Dataflow组件应对高并发,低延迟要求
長話短說
2C互聯(lián)網(wǎng)業(yè)務(wù)增長,單機多核的共享內(nèi)存模式帶來的排障問題、編程困難;隨著多核時代和分布式系統(tǒng)的到來,共享模型已經(jīng)不太適合并發(fā)編程,因此actor-based模型又重新受到了人們的重視。
---------------------------調(diào)試過多線程的都懂-----------------------------
傳統(tǒng)編程模型通常使用回調(diào)和同步對象(如鎖)來協(xié)調(diào)任務(wù)和訪問共享數(shù)據(jù),從宏觀看:若任務(wù)的執(zhí)行需要某些共享資源,不可避免該任務(wù)需要關(guān)注并搶占資源。
actor-based模型是一種流水線模型,actor-based模型share nothing。所有的線程(或進程)通過消息傳遞的方式進行合作,這些線程(或進程)稱為參與者actor,預(yù)先定義任務(wù)流水線后,不關(guān)注數(shù)據(jù)什么時候流到這個任務(wù),專注完成當前任務(wù)本身。
??.Net TPL Dataflow組件幫助我們快速實現(xiàn)actor-based模型,當有多個必須異步通信的操作或要等待數(shù)據(jù)可用再進一步處理時,Dataflow組件非常有用。
TPL Dataflow是微軟前幾年給出的數(shù)據(jù)處理庫, 內(nèi)置常見的處理塊,可將這些塊組裝成一個處理管道,"塊"對應(yīng)處理管道中的"階段任務(wù)",可類比AspNetCore 中Middleware和Pipeline。
TPL Dataflow庫為消息傳遞、CPU密集型/I-O密集型應(yīng)用程序提供了編程基礎(chǔ), 可更明確控制數(shù)據(jù)的暫存方式、移動路線,達到高吞吐量和低延遲。
需要注意的是:TPL Dataflow非分布式數(shù)據(jù)流,消息在進程內(nèi)傳遞?。
TPL Dataflow核心概念
TPL Dataflow 內(nèi)置的Block覆蓋了常見的應(yīng)用場景,如果內(nèi)置塊不能滿足你的要求,你也可以自定“塊”。
Block可以劃分為下面3類:
Buffering Only? ?[Buffer不是緩存Cache的概念,而是一個暫存區(qū)的概念]
Execution
Grouping?
使用以上塊混搭處理管道, 大多數(shù)的塊都會執(zhí)行一個操作,有些時候需要將消息分發(fā)到不同Block,這時可使用特殊類型的緩沖塊給管道“”分叉”。
Execution Block
可執(zhí)行的塊有兩個核心組件:
輸入、輸出消息的暫存區(qū)(一般稱為Input,Output隊列)
在消息上執(zhí)行動作的委托
消息在輸入和輸出時能夠被暫存:
當輸入的消息速度比Func委托的執(zhí)行速度比快,后續(xù)消息將在到達時暫存;
當下一個塊的輸入暫存區(qū)中無可用空間,將在當前塊輸出時暫存。
每個塊我們可以配置:
暫存區(qū)的總?cè)萘?#xff0c;默認無上限
執(zhí)行操作委托的并發(fā)度,默認情況下塊按照順序處理消息,一次一個。
將塊鏈接在一起形成處理管道,生產(chǎn)者將消息推向管道。
TPL Dataflow有一個基于pull的機制(使用Receive和TryReceive方法),但我們將在管道中使用塊連接和推送機制。
TransformBlock(Execution category)-- 由輸入輸出暫存區(qū)和一個Func<TInput,?TOutput>委托組成,輸入的每個消息,都會輸為出另一個,可以使用這個Block去執(zhí)行消息的轉(zhuǎn)換,或者轉(zhuǎn)發(fā)輸出的消息到另外一個Block
TransformManyBlock (Execution category) -- 由輸入輸出暫存區(qū)和一個Func<TInput, IEnumerable<TOutput>>委托組成, 它為輸入的每個消息輸出一個IEnumerable<TOutput>
BroadcastBlock (Buffering category)-- 只容納最多1個消息的暫存區(qū)和Func<T, T>委托組成(新消息到達會覆蓋原消息),委托僅僅為了讓你控制怎樣克隆這個消息,不做消息轉(zhuǎn)換
該塊在需要將消息廣播給多個塊時很有用(管道分叉)
ActionBlock (Execution category)-- 由緩沖區(qū)和Action<T>委托組成,它們不再給其他塊轉(zhuǎn)發(fā)消息,只處理輸入的消息,一般作為管道結(jié)尾
BatchBlock (Grouping category)-- 告訴它你想要的每個批處理的大小,它將累積消息,直到它達到那個大小,然后將它作為一組消息轉(zhuǎn)發(fā)到下一個塊
其他內(nèi)建Block類型:BufferBlock、WriteOnceBlock、JoinBlock、BatchedJoinBlock,暫時不會深入。
管道連鎖反應(yīng)
當B塊輸入緩沖區(qū)達到上限容量,為其供貨的上游A塊的輸出暫存區(qū)將開始被填充,當A塊輸出暫存區(qū)已滿時,該塊必須暫停處理,直到暫存區(qū)有空間,這意味著一個Block的處理瓶頸可能導(dǎo)致所有前面的塊的暫存區(qū)被填滿。
????但是不是所有的塊暫存區(qū)滿時都會暫停,BroadcastBlock有1個消息的暫存區(qū),每個消息都會被覆蓋, 因此如果這個廣播塊不能及時將消息轉(zhuǎn)發(fā)到下游,則在下個消息到達的時候消息將丟失,某種意義上達到一種限流效果(比較殘暴).
編程實踐
生產(chǎn)者投遞消息
?可使用Post或者SendAsync方法向首塊投遞消息:
Post方法即時返回true/false,True意味著消息被block接收(暫存區(qū)有空余),false意味著拒絕了消息(暫存區(qū)已滿或者Block已經(jīng)出錯)。
SendAsync方法返回一個Task<bool>, 將會以異步的方式阻塞直到塊接收、拒絕、塊出錯。
Post、SendAsync的不同點在于SendAsync可以延遲投遞(后置管道的輸入buffer不空,得到異步通知后再投遞)。
定義流水線管道
按照上圖業(yè)務(wù)定義流水線:
public EqidPairHandler(IHttpClientFactory httpClientFactory, RedisDatabase redisCache, IConfiguration con, LogConfig logConfig, ILoggerFactory loggerFactory){_httpClient = httpClientFactory.CreateClient("bce-request");_redisDB0 = redisCache[0];_redisDB = redisCache;_logger = loggerFactory.CreateLogger(nameof(EqidPairHandler));var option = new DataflowLinkOptions { PropagateCompletion = true };publisher = _redisDB.RedisConnection.GetSubscriber();_eqid2ModelTransformBlock = new TransformBlock<EqidPair, EqidModel>(// redis piublih 沒有做在TransformBlock fun里面, 因為publih失敗可能影響后續(xù)的block傳遞eqidPair => EqidResolverAsync(eqidPair),new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = con.GetValue<int>("MaxDegreeOfParallelism")});// https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline_logBatchBlock = new LogBatchBlock<EqidModel>(logConfig, loggerFactory);_logPublishBlock = new ActionBlock<EqidModel>(x => PublishAsync(x) );_broadcastBlock = new BroadcastBlock<EqidModel>(x => x); // 由只容納一個消息的緩存區(qū)和拷貝函數(shù)組成_broadcastBlock.LinkTo(_logBatchBlock.InputBlock, option);_broadcastBlock.LinkTo(_logPublishBlock, option);_eqid2ModelTransformBlock.LinkTo(_broadcastBlock, option);}?仿IIS日志寫入組件
異常處理
上述程序在生產(chǎn)部署時遇到相關(guān)的坑位:
在測試環(huán)境_eqid2ModelTransformBlock塊委托函數(shù)穩(wěn)定執(zhí)行,程序并未出現(xiàn)異樣;?
部署到生產(chǎn)之后,該Pipeline運行一段時間就停止工作,一直很困惑。
后來通過監(jiān)測_eqid2ModelTransformBlock.Completion屬性,發(fā)現(xiàn)該塊在執(zhí)行某次委托時報錯,提前進入完成態(tài)。
當TPL Dataflow不再處理消息且保證不再處理消息的時候,就被定義為 "完成態(tài)",?IDataflow.Completion屬性(Task對象)標記該狀態(tài),Task對象的TaskStatus枚舉值描述此Block進入完成態(tài)的真實原因。
?TaskStatus.RanToCompletion????"成功完成" 在Block中定義的任務(wù)??
?TaskStatus.Fault????因未處理的異常導(dǎo)致"過早的完成"
?TaskStatus.Canceled????因取消操作導(dǎo)致 "過早的完成"
官方資料表明:某塊進入Fault、Cancel狀態(tài),都會導(dǎo)致該塊提前進入“完成態(tài)”,但因Fault、Canceled進入的“完成態(tài)”會導(dǎo)致輸入暫存區(qū)和輸出暫存區(qū)被清空。
After Fault has been called on a dataflow block, that block will complete, and its Completion task will enter a final state. Faulting a block, as with canceling a block, causes buffered messages (unprocessed input messages as well as unoffered output messages) to be lost.
故需要嚴肅對待異常,一般情況下我們使用try、catch包含所有的執(zhí)行代碼以確保所有的異常都被處理。
????本文作為TPL Dataflow的入門指南(代碼較多建議左下角轉(zhuǎn)向原文)
微軟技術(shù)棧的可持續(xù)關(guān)注actor-based模型的流水線處理組件,應(yīng)對單體程序中高并發(fā),低延遲相當巴適。
+?https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.broadcastblock-1?view=netcore-3.1
+?https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.idataflowblock.fault?redirectedfrom=MSDN&view=netcore-2.2#System_Threading_Tasks_Dataflow_IDataflowBlock_Fault_System_Exception_
文字+制圖,均為原創(chuàng),
掃一掃左邊二維碼,
讓干貨飛一會。
............
歷史推薦
AspNetCore應(yīng)用注意這一點,CTO會對你刮目相看
手撕公司SSO登錄原理
.Net線程同步技術(shù)解讀
總結(jié)
以上是生活随笔為你收集整理的TPL Dataflow组件应对高并发,低延迟要求的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: dapr微服务.net sdk入门
- 下一篇: abp模块生命周期设计思路剖析