如何利用.NETCore向Azure EventHubs准实时批量发送数据?
最近在做一個基于Azure云的物聯網分析項目:.netcore采集程序向Azure事件中心(EventHubs)發送數據,通過Azure EventHubs Capture轉儲到Azure BlogStorage,供數據科學團隊分析。
為什么使用Azure事件中心?
Azure事件中心是一種Azure上完全托管的實時數據攝取服務, 每秒可流式傳輸來自website、app、device任何源的數百萬個事件。提供的統一流式處理平臺和時間保留緩沖區,將事件生成者和事件使用者分開。
事件生成者:可使用https、AQMP協議發布事件
分區:事件中心通過分區使用者模式提供消息流式處理功能,提高可用性和并行化
事件接收者:所有事件中心使用者通過AMQP 1.0會話進行連接,讀取數據
例如,如果事件中心具有四個分區,并且其中一個分區要在負載均衡操作中從一臺服務器移動到另一臺服務器,則仍可以通過其他三個分區進行發送和接收。此外,具有更多分區可以讓更多并發讀取器處理數據,從而提高聚合吞吐量。了解分布式系統中分區和排序的意義是解決方案設計的重要方面。?為了幫助說明排序與可用性之間的權衡,請參閱 CAP 定理
最直觀的方式:請在portal.azure.cn門戶站點---->創建事件中心命名空間---> 創建事件中心
.NetCore 準實時批量發送數據到事件中心
.NET庫 (Azure.Messaging.EventHubs)
我們使用Asp.NetCore以Azure App Service形式部署,依賴Azure App Service的自動縮放能錄應對物聯網的潮汐大流量。
“通常推薦批量發送到事件中心,能有效增加web服務的吞吐量和響應能力。
目前新版SDk:Azure.Messaging.EventHubs僅支持分批發送。
nuget上引入Azure.Messaging.EventHubs庫
EventHubProducerClient客戶端負責分批發送數據到事件中心,根據發送時指定的選項,事件數據可能會自動路由到可用分區或發送到特定請求的分區。
在以下情況下,建議允許自動路由分區:
1) 事件的發送必須高度可用
2) 事件數據應在所有可用分區之間平均分配。
自動路由分區的規則:
1)使用循環法將事件平均分配到所有可用分區中
2)如果某個分區不可用,事件中心將自動檢測到該分區并將消息轉發到另一個可用分區。
我們要注意,根據選定的 命令空間定價層, 每批次發給事件中心的最大消息字節大小也不一樣:
分段批量發送策略
這里我們就需要思考:web程序收集數據是以個數為單位;但是我們分批發送時要根據分批的字節大小來切分。
我的方案是:因引入TPL Dataflow 管道:
web程序收到數據,立刻丟入TransformBlock<string, EventData>
轉換到EventData之后,使用BatchBlock<EventData>按照配置的個數打包
利用ActionBlock<EventData[]>在包內 累積指定字節大小批量發送
最后我們設置一個定時器(5min),強制在BatchBlock的前置隊列未滿時打包發送。
核心的TPL Dataflow代碼如下:
public?class?MsgBatchSender{private?readonly?EventHubProducerClient?Client;private?readonly?TransformBlock<string,?EventData>?_transformBlock;private?readonly?BatchBlock<EventData>?_packer;private?readonly?ActionBlock<EventData[]>?_batchSender;private?readonly?DataflowOption?_dataflowOption;private?readonly?Timer?_trigger;private?readonly?ILogger?_logger;public?MsgBatchSender(EventHubProducerClient?client,?IOptions<DataflowOption>?option,ILoggerFactory?loggerFactory){Client?=?client;_dataflowOption?=?option.Value;var?dfLinkoption?=?new?DataflowLinkOptions?{?PropagateCompletion?=?true?};_transformBlock?=?new?TransformBlock<string,?EventData>(text?=>?new?EventData(Encoding.UTF8.GetBytes(text)),new?ExecutionDataflowBlockOptions{MaxDegreeOfParallelism?=?_dataflowOption.MaxDegreeOfParallelism});_packer?=?new?BatchBlock<EventData>(_dataflowOption.BatchSize);_batchSender?=?new?ActionBlock<EventData[]>(msgs=>?BatchSendAsync(msgs));_packer.LinkTo(_batchSender,?dfLinkoption);_transformBlock.LinkTo(_packer,?dfLinkoption,?x?=>?x?!=?null);_trigger?=?new?Timer(_?=>?_packer.TriggerBatch(),?null,?TimeSpan.Zero,?TimeSpan.FromSeconds(_dataflowOption.TriggerInterval));_logger?=?loggerFactory.CreateLogger<DataTrackerMiddleware>();}private?async?Task?BatchSendAsync(EventData[]?msgs){try{if?(msgs?!=?null){var?i?=?0;while?(i?<?msgs.Length){var?batch?=?await?Client.CreateBatchAsync();while?(i?<?msgs.Length){if?(batch.TryAdd(msgs[i++])?==?false){break;}}if(batch!=?null?&&?batch.Count>0){await?Client.SendAsync(batch);batch.Dispose();}}}}catch?(Exception?ex){//?ignore?and?log?any?exception_logger.LogError(ex,?"SendEventsAsync:?{error}",?ex.Message);}}public??async?Task<bool>?PostMsgsync(string?txt){return?await?_transformBlock.SendAsync(txt);}public?async?Task?CompleteAsync(){_transformBlock.Complete();await?_transformBlock.Completion;await?_batchSender.Completion;await?_batchSender.Completion;}}總結
Azure事件中心的基礎用法
.NET Core準實時分批向Azure事件中心發送數據,其中用到的TPL Dataflow以actor模型:提供了粗粒度的數據流和流水線任務,提高了高并發程序的健壯性。?
源碼地址:https://github.com/zaozaoniao/SaicEnergyTracker
TPL Dataflow組件應對高并發,低延遲要求
ASP.NET Core跨平臺技術內幕
AspNetCore結合Redis實踐消息隊列
Quartz.net在集群環境下部署任務的姿勢
基于docker-compose的Gitlab CI/CD實踐&排坑指南
總結
以上是生活随笔為你收集整理的如何利用.NETCore向Azure EventHubs准实时批量发送数据?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 了解下C#由转换二进制所引起的思考
- 下一篇: Vue 3拖更,尤雨溪介绍最新进展