TPL Dataflow .Net 数据流组件,了解一下?
回顧上文
作為單體程序,依賴的第三方服務(wù)雖不多,但是2C的程序還是有不少內(nèi)容可講; 作為一個(gè)常規(guī)互聯(lián)網(wǎng)系統(tǒng),無外乎就是接受請(qǐng)求、處理請(qǐng)求,輸出響應(yīng)。
由于業(yè)務(wù)漸漸增長(zhǎng),數(shù)據(jù)處理的過程會(huì)越來越復(fù)雜和冗長(zhǎng),【連貫高效的處理數(shù)據(jù)】 越來越被看重,? .Net 提供了TPL? Dataflow組件使我們更高效的實(shí)現(xiàn)基于數(shù)據(jù)流和 流水線操作的代碼。
? ? 下圖是單體程序中 數(shù)據(jù)處理的用例圖。
?
?程序中用到的TPL Dataflow 組件,Dataflow是微軟前幾年給出的數(shù)據(jù)處理庫,?是由不同的處理塊組成,可將這些塊組裝成一個(gè)處理管道,"塊"對(duì)應(yīng)處理管道中的"階段", 可類比AspNetCore 中Middleware 和pipeline.。
-
TPL Dataflow庫為消息傳遞和并行化CPU密集型和I / O密集型應(yīng)用程序提供了編程基礎(chǔ),這些應(yīng)用程序具有高吞吐量和低延遲。它還可以讓您明確控制數(shù)據(jù)的緩沖方式并在系統(tǒng)中移動(dòng)。
- 為了更好地理解數(shù)據(jù)流編程模型,請(qǐng)考慮從磁盤異步加載圖像并創(chuàng)建這些圖像的應(yīng)用程序。
-
? 傳統(tǒng)的編程模型通常使用回調(diào)和同步對(duì)象(如鎖)來協(xié)調(diào)任務(wù)和訪問共享數(shù)據(jù), 從宏觀看傳統(tǒng)模型: 任務(wù)是一步步緊接著完成的。
-
? 通過使用數(shù)據(jù)流編程模型,您可以創(chuàng)建在從磁盤讀取圖像時(shí)處理圖像的數(shù)據(jù)流對(duì)象。在數(shù)據(jù)流模型下,您可以聲明數(shù)據(jù)在可用時(shí)的處理方式以及數(shù)據(jù)之間的依賴關(guān)系。由于運(yùn)行時(shí)管理數(shù)據(jù)之間的依賴關(guān)系,因此通常可以避免同步訪問共享數(shù)據(jù)的要求。此外,由于運(yùn)行時(shí)調(diào)度基于數(shù)據(jù)的異步到達(dá)而工作,因此數(shù)據(jù)流可以通過有效地管理底層線程來提高響應(yīng)性和吞吐量。 ? 也就是說: 你定義的是任務(wù)內(nèi)容和任務(wù)之間的依賴,不關(guān)注數(shù)據(jù)什么時(shí)候流到這個(gè)任務(wù)?。
-
- ? ?需要注意的是:TPL Dataflow 非分布式數(shù)據(jù)流,消息在進(jìn)程內(nèi)傳遞,? ?使用nuget引用?System.Threading.Tasks.Dataflow 包。
TPL Dataflow 核心概念
?1.? Buffer & Block
TPL Dataflow 內(nèi)置的Block覆蓋了常見的應(yīng)用場(chǎng)景,當(dāng)然如果內(nèi)置塊不能滿足你的要求,你也可以自定“塊”。
Block可以劃分為下面3類:
-
Buffering Only? ? 【Buffer不是緩存Cache的概念, 而是一個(gè)緩沖區(qū)的概念】
-
Execution
-
Grouping?
使用以上塊混搭處理管道, 大多數(shù)的塊都會(huì)執(zhí)行一個(gè)操作,有些時(shí)候需要將消息分發(fā)到不同Block,這時(shí)可使用特殊類型的緩沖塊給管道“”分叉”。
2. Execution Block
可執(zhí)行的塊有兩個(gè)核心組件:-
輸入、輸出消息的緩沖區(qū)(一般稱為Input,Output隊(duì)列)
-
在消息上執(zhí)行動(dòng)作的委托
消息在輸入和輸出時(shí)能夠被緩沖:當(dāng)Func委托的運(yùn)行速度比輸入的消息速度慢時(shí),后續(xù)消息將在到達(dá)時(shí)進(jìn)行緩沖;當(dāng)下一個(gè)塊的輸入緩沖區(qū)中沒有容量時(shí),將在輸出時(shí)緩沖。
每個(gè)塊我們可以配置:
-
緩沖區(qū)的總?cè)萘?#xff0c; 默認(rèn)無上限
-
執(zhí)行操作委托的并發(fā)度, 默認(rèn)情況下塊按照順序處理消息,一次一個(gè)。
我們將塊鏈接在一起形成一個(gè)處理管道,生產(chǎn)者將消息推向管道。
TPL Dataflow有一個(gè)基于pull的機(jī)制(使用Receive和TryReceive方法),但我們將在管道中使用塊連接和推送機(jī)制。
-
TransformBlock(Execution category)-- 由輸入輸出緩沖區(qū)和一個(gè)Func<TInput, TOutput>委托組成,消費(fèi)的每個(gè)消息,都會(huì)輸出另外一個(gè),你可以使用這個(gè)Block去執(zhí)行輸入消息的轉(zhuǎn)換,或者轉(zhuǎn)發(fā)輸出的消息到另外一個(gè)Block。
-
TransformManyBlock (Execution category) -- 由輸入輸出緩沖區(qū)和一個(gè)Func<TInput, IEnumerable<TOutput>>委托組成, 它為輸入的每個(gè)消息輸出一個(gè) IEnumerable<TOutput>
-
BroadcastBlock (Buffering category)-- 由只容納1個(gè)消息的緩沖區(qū)和Func<T, T>委托組成。緩沖區(qū)被每個(gè)新傳入的消息所覆蓋,委托僅僅為了讓你控制怎樣克隆這個(gè)消息,不做消息轉(zhuǎn)換。
該塊可以鏈接到多個(gè)塊(管道的分叉),雖然它一次只緩沖一條消息,但它一定會(huì)在該消息被覆蓋之前將該消息轉(zhuǎn)發(fā)到鏈接塊(鏈接塊還有緩沖區(qū))。
-
ActionBlock (Execution category)-- 由緩沖區(qū)和Action<T>委托組成,他們一般是管道的結(jié)尾,他們不再給其他塊轉(zhuǎn)發(fā)消息,他們只會(huì)處理輸入的消息。
-
BatchBlock (Grouping category)-- 告訴它你想要的每個(gè)批處理的大小,它將累積消息,直到它達(dá)到那個(gè)大小,然后將它作為一組消息轉(zhuǎn)發(fā)到下一個(gè)塊。
還有一下其他的Block類型:BufferBlock、WriteOnceBlock、JoinBlock、BatchedJoinBlock,我們暫時(shí)不會(huì)深入。
3. Pipeline Chain React
當(dāng)輸入緩沖區(qū)達(dá)到上限容量,為其供貨的上游塊的輸出緩沖區(qū)將開始填充,當(dāng)輸出緩沖區(qū)已滿時(shí),該塊必須暫停處理,直到緩沖區(qū)有空間,這意味著一個(gè)Block的處理瓶頸可能導(dǎo)致所有前面的塊的緩沖區(qū)被填滿。
但是不是所有的塊變滿時(shí),都會(huì)暫停,BroadcastBlock 有允許1個(gè)消息的緩沖區(qū),每個(gè)消息都會(huì)被覆蓋, 因此如果這個(gè)廣播塊不能將消息轉(zhuǎn)發(fā)到下游,則在下個(gè)消息到達(dá)的時(shí)候消息將丟失,這在某種意義上是一種限流(比較生硬).
編程實(shí)踐
? 將按照上圖實(shí)現(xiàn)TPL Dataflow?
①? 定義Dataflow? pipeline public EqidPairHandler(IHttpClientFactory httpClientFactory, RedisDatabase redisCache, IConfiguration con, LogConfig logConfig, ILoggerFactory loggerFactory){_httpClient = httpClientFactory.CreateClient("bce-request");_redisDB0 = redisCache[0];_redisDB = redisCache;_logger = loggerFactory.CreateLogger(nameof(EqidPairHandler));var option = new DataflowLinkOptions { PropagateCompletion = true };publisher = _redisDB.RedisConnection.GetSubscriber();_eqid2ModelTransformBlock = new TransformBlock<EqidPair, EqidModel>(// redis piublih 沒有做在TransformBlock fun里面, 因?yàn)閜ublih失敗可能影響后續(xù)的block傳遞eqidPair => EqidResolverAsync(eqidPair),new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = con.GetValue<int>("MaxDegreeOfParallelism")});// https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline_logBatchBlock = new LogBatchBlock<EqidModel>(logConfig, loggerFactory);_logPublishBlock = new ActionBlock<EqidModel>(x => PublishAsync(x) );_broadcastBlock = new BroadcastBlock<EqidModel>(x => x); // 由只容納一個(gè)消息的緩存區(qū)和拷貝函數(shù)組成 _broadcastBlock.LinkTo(_logBatchBlock.InputBlock, option);_broadcastBlock.LinkTo(_logPublishBlock, option);_eqid2ModelTransformBlock.LinkTo(_broadcastBlock, option);} public class LogBatchBlock<T> : ILogDestination<T> where T : IModelBase{private readonly string _dirPath;private readonly Timer _triggerBatchTimer;private readonly Timer _openFileTimer;private DateTime? _nextCheckpoint;private TextWriter _currentWriter;private readonly LogHead _logHead;private readonly object _syncRoot = new object();private readonly ILogger _logger;private readonly BatchBlock<T> _packer;private readonly ActionBlock<T[]> batchWriterBlock;private readonly TimeSpan _logFileIntervalTimeSpan;/// <summary>/// Generate request log file./// </summary>public LogBatchBlock(LogConfig logConfig, ILoggerFactory loggerFactory){_logger = loggerFactory.CreateLogger<LogBatchBlock<T>>();_dirPath = logConfig.DirPath;if (!Directory.Exists(_dirPath)){Directory.CreateDirectory(_dirPath);}_logHead = logConfig.LogHead;_packer = new BatchBlock<T>(logConfig.BatchSize);batchWriterBlock = new ActionBlock<T[]>(models => WriteToFile(models)); _packer.LinkTo(batchWriterBlock, new DataflowLinkOptions { PropagateCompletion = true });_triggerBatchTimer = new Timer(state =>{_packer.TriggerBatch();}, null, TimeSpan.Zero, TimeSpan.FromSeconds(logConfig.Period));_logFileIntervalTimeSpan = TimeSpan.Parse(logConfig.LogFileInterval);_openFileTimer = new Timer(state =>{AlignCurrentFileTo(DateTime.Now);}, null, TimeSpan.Zero, _logFileIntervalTimeSpan);}public ITargetBlock<T> InputBlock => _packer;private void AlignCurrentFileTo(DateTime dt){if (!_nextCheckpoint.HasValue){OpenFile(dt);}if (dt >= _nextCheckpoint.Value){CloseFile();OpenFile(dt);}}private void OpenFile(DateTime now, string fileSuffix = null){string filePath = null;try{var currentHour = now.Date.AddHours(now.Hour);_nextCheckpoint = currentHour.Add(_logFileIntervalTimeSpan);int hourConfiguration = _logFileIntervalTimeSpan.Hours;int minuteConfiguration = _logFileIntervalTimeSpan.Minutes;filePath = $"{_dirPath}/u_ex{now.ToString("yyMMddHH")}{fileSuffix}.log";var appendHead = !File.Exists(filePath);if (filePath != null){var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write);var sw = new StreamWriter(stream, Encoding.Default);if (appendHead){sw.Write(GenerateHead());}_currentWriter = sw;_logger.LogDebug($"{currentHour} TextWriter has been created.");}}catch (UnauthorizedAccessException ex){_logger.LogWarning("I/O error or specific type of scecurity error,{0}", ex);throw;}catch (Exception e){if (fileSuffix == null){_logger.LogWarning($"OpenFile failed:{e.StackTrace.ToString()}:{e.Message}.", e.StackTrace);OpenFile(now, $"-{Guid.NewGuid()}");}else{_logger.LogError($"OpenFile failed after retry: {filePath}", e);throw;}}}private void CloseFile(){if (_currentWriter != null){_currentWriter.Flush();_currentWriter.Dispose();_currentWriter = null;_logger.LogDebug($"{DateTime.Now} TextWriter has been disposed.");}_nextCheckpoint = null;}private string GenerateHead(){StringBuilder head = new StringBuilder();head.AppendLine("#Software: " + _logHead.Software).AppendLine("#Version: " + _logHead.Version).AppendLine($"#Date: {DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss")}").AppendLine("#Fields: " + _logHead.Fields);return head.ToString();}private void WriteToFile(T[] models){try{lock (_syncRoot){var flag = false;foreach (var model in models){if (model == null)continue;flag = true;AlignCurrentFileTo(model.ServerLocalTime);_currentWriter.WriteLine(model.ToString());}if (flag)_currentWriter.Flush();}}catch (Exception ex){_logger.LogError("WriteToFile Error : {0}", ex.Message);}}public bool AcceptLogModel(T model){return _packer.Post(model);}public string GetDirPath(){return _dirPath;}public async Task CompleteAsync(){_triggerBatchTimer.Dispose();_openFileTimer.Dispose();_packer.TriggerBatch();_packer.Complete();await InputBlock.Completion;lock (_syncRoot){CloseFile();}}} 仿IIS日志存儲(chǔ)代碼② 異常處理
上述程序在部署時(shí)就遇到相關(guān)的坑位,在測(cè)試環(huán)境_eqid2ModelTransformBlock?內(nèi)Func委托穩(wěn)定執(zhí)行,程序并未出現(xiàn)異樣;
部署到生產(chǎn)之后, 該P(yáng)ipeline會(huì)運(yùn)行一段時(shí)間就停止工作,一直很困惑, 后來通過監(jiān)測(cè)_eqid2ModelTransformBlock.Completion 屬性,該塊提前進(jìn)入“完成態(tài)”? ?:???程序在執(zhí)行某次Func委托時(shí)報(bào)錯(cuò),Block提前進(jìn)入完成態(tài)
TransfomrBlock.Completion 一個(gè)Task對(duì)象,當(dāng)TPL Dataflow不再處理消息并且能保證不再處理消息的時(shí)候,就被定義為完成態(tài), Task對(duì)象的TaskStatus枚舉值將標(biāo)記此Block進(jìn)入完成態(tài)的真實(shí)原因
- TaskStatus.RanToCompletion? ? ? ?根據(jù)Block定義的任務(wù)成功完成
- TaskStatus.Fault? ? ? ? ? ? ? ? ? ? ? ? ? ? 因?yàn)?strong>未處理的異常?導(dǎo)致"過早的完成"
- TaskStatus.Cancled? ? ? ? ? ? ? ? ? ? ?? 因?yàn)?strong>取消操作?導(dǎo)致 "過早的完成"
我們需要小心處理異常, 一般情況下我們使用try、catch包含所有的執(zhí)行代碼以確保所有的異常都被處理。
?
??? 可將TPL Dataflow 做為進(jìn)程內(nèi)消息隊(duì)列,本文只是一個(gè)入門參考,更多復(fù)雜用法還是看官網(wǎng), 你需要記住的是, 這是一個(gè).Net 進(jìn)程內(nèi)數(shù)據(jù)流組件, 能讓你專注于流程。
?
作者:JulianHuang感謝您的認(rèn)真閱讀,如有問題請(qǐng)大膽斧正;覺得有用,請(qǐng)下方或加關(guān)注。
本文歡迎轉(zhuǎn)載,但請(qǐng)保留此段聲明,且在文章頁面明顯位置注明本文的作者及原文鏈接。
轉(zhuǎn)載于:https://www.cnblogs.com/JulianHuang/p/11177766.html
總結(jié)
以上是生活随笔為你收集整理的TPL Dataflow .Net 数据流组件,了解一下?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 国内一些大公司的开源项目
- 下一篇: Linux简单实用小技巧