Dapr微服务应用开发系列5:发布订阅构建块
題記:這篇介紹發布訂閱構建塊,這是對事件驅動架構設計的一種實現落地。
注:對于“Building Blocks”這個詞組的翻譯,我之前使用了“構件塊”,現在和官方文檔(Dapr中文社區的貢獻)保持一致,采用“構建塊”。
原理
發布訂閱的概念來自于事件驅動架構(EDA)的設計思想,這是一種讓程序(應用、服務)之間解耦的主要方式,通過發布訂閱的思想也可以實現服務之間的異步調用。而大部分分布式應用都會依賴這樣的發布訂閱解耦模式。
整個發布訂閱的思想其實是比較簡單的:
如上圖所示,把需要解耦的程序分別設定為事件發布者或者事件訂閱者(理論上,對于某個事件,一個程序僅能作為一種角色;對于不同事件,一個程序可以既作為發布者又可以作為訂閱者)。同時利用消息代理(Message Broker)中間件把兩者對接起來,消息代理即作為事件消息的傳輸通道。
在Dapr中對這種發布訂閱模式進行了高度抽象的實現,并提供了自由替換消息代理中間件的特性,如下圖所示:
Dapr的發布訂閱構建塊也可以被看作一種事件總線(Event Bus)的實現,只是你不需要使用特殊的協議,在發布端和訂閱端僅使用HTTP/gRPC即可。
在事件總線中,把發布訂閱兩者關聯在一起的是事件類型,那么在Dapr中也引入了一個類似的概念——主題(Topic)。如果對消息隊列中間件熟悉的人對于這個概念不會陌生。由此發布端和訂閱端的處理過程和針對Dapr的接口也就是圍繞主題來展開的。
能力
消息發送
既然Dapr的PubSub是一種事件總線,那么要發送消息,必然需要對代表主題(事件類型)的消息進行封裝。Dapr并沒有去創造一種獨有的格式,而是采用了目前業界比較流行的開放協議——云事件(CloudEvents)規范。這種格式把事件消息封裝為如下JSON數據:
{"specversion" : "1.0","type" : "xml.message","source" : "https://example.com/message","subject" : "Test XML Message","id" : "id-1234-5678-9101","time" : "2020-09-23T06:23:21Z","datacontenttype" : "text/xml","data" : "<note><to>User1</to><from>user2</from><message>hi</message></note>" }當然對消息的封裝不需要應用程序本身去關心,你只需要給Dapr傳遞data的字符串即可,而這個字符串本身是以什么格式(不管xml還是json)去承載內容都是由應用程序確定。具體如何發送消息,下面規范部分會介紹。
消息傳遞
Dapr會自動根據主題把消息發送給所有訂閱者,傳遞過程保證“至少一次”送達。送達的判斷標準是基于訂閱者的響應是否成功(即HTTP狀態碼為20X)。
當然,訂閱者也可以在響應體中設置 status 屬性來給出更為精細的處理指令,比如 RETRY ?告知Dapr之前處理失敗了,現在是重試成功了;或者 DROP 告知Dapr應用程序對這個消息處理出現問題,已經記錄了告警日志,但是不打算繼續處理它了。
消息傳遞還有一個重要的特性需要理解,就是消息的生存期(Time-to-Live,TTL)。TTL規定了消息在Dapr(實際上是在消息代理中間件)里面的存活時間,如果TTL過期,那么消息就不會再被傳遞(即變成死信)。所有目前支持的發布訂閱組件都支持TTL的特性,Dapr會幫助你處理這方面的邏輯。
消息消費
為了消費消息,需要對主題進行注冊,可以通過聲明式和編程式來進行注冊。聲明式通過外部的yaml文件定義一個K8S的CRD,來描述服務需要訂閱什么主題,接收事件的HTTP API路由地址。編程式通過暴露特定的HTTP API路由地址或者特定的gRPC方法來讓Dapr運行時進行訪問,從而注冊需要訂閱什么主題和接收事件的地址。
發布訂閱構建塊采用的是所謂競爭者消費模式,即同一個應用(AppId相同)的多個實例,只會有一個實例獲得消息,這些同個應用的多個實例稱之為一個消費組。如果你希望消息被多個應用得到,那么就需要使用多個消費組,也即多個AppId。
主題范圍限制
從上面所知,在發送消息和消費消息的時候,都需要針對某個主題。為了對消息的傳遞進行更加精細的控制,在發布訂閱構建塊中可以對主題范圍進行限制,即某些主題只能由某些應用來發布,某些主題只能由某些應用來訂閱。
要進行范圍限制,需要對發布訂閱組件的配置yaml進行配置,設置 spec.metadata 下面的 publishingScopes, subscriptionScopes 和 allowedTopics 配置。(詳細說明見未來的關于組件的文章)。
規范
Dapr給PubSub這一構建塊提供了兩方面的規范:消息生產端和消息消費端。
消息生產端
通過POST如下地址,來發送消息到特定主題:
POST http://localhost:<daprPort>/v1.0/publish/<pubsubname>/<topic>[?<metadata>]其中pubsubname代表了PubSub組件的名稱;topic代表了目標主題名稱。
在 Content-Type 請求頭中設置請求體的格式,比如 application/json
請求體根據請求頭里面的設置格式,傳入JSON或者XML,Dapr會自動把請求體封裝為CloudEvent格式。
如果是直接調用gRPC的接口的話,是調用 PublishEvent 接口并傳遞 PublishEventRequest 實體。
消息消費端
如果你的消費端是通過聲明式來向Dapr注冊需要訂閱什么主題的消息,那么在如下格式的yaml文件中進行定義:
apiVersion: dapr.io/v1alpha1 kind: Subscription metadata:name: myevent-subscription spec:topic: deathStarStatusroute: /dsstatuspubsubname: pubsub scopes: - app1 - app2其中 spec.topic 代表了要訂閱的主題名稱,spec.route 代表了接收訂閱消息的HTTP路由地址,spec.route 代表了針對的PubSub組件是那個。尤為關鍵是 scopes 里面設置了這樣的訂閱聲明是針對那個(或那幾個)應用起作用(填入appid)。
如果你的消費端是通過編程式來向Dapr注冊需要訂閱什么主題的消息,那么暴露一個如下特殊HTTP路由地址的接口:
GET http://localhost:<appPort>/dapr/subscribe并返回如下格式的響應體,讓Dapr知道你的應用需要訂閱什么主題,接收消息的接口路由地址是什么:
[{"pubsubname": "pubsub","topic": "newOrder","route": "/orders"} ]當然你的應用需要暴露注冊的接收路由接口,并支持POST謂詞,接口收到請求后返回2xx狀態碼告訴Dapr消息處理成功了,或者404告訴Dapr出現錯誤且消息已經丟棄,或者其他狀態碼讓Dapr進行重試。
兩種訂閱注冊方式各有優缺,聲明式方便一個主題注冊多個應用,編程式方便一個應用注冊多個主題。
注意:如果是使用gRPC來注冊和暴露消費接口,那么規范有所不同,做法見下面。
DOTNET SDK
Dapr的.NET SDK同樣針對消息生產端和消費端提供相關的函數庫。
在DaprClient這個客戶端類中提供了 PublishEventAsync 這個方法來用于發送事件消息到特定PubSub的特定主題上 (底層是請求了gRPC的接口)。比如:
using var client = new DaprClientBuilder().Build();var eventData = new { Id = "17", Amount = 10m, }; await client.PublishEventAsync(pubsubName, "deposit", eventData, cancellationToken);在消費端,目前針對ASP.NET Core提供了一個特殊的屬性標記 TopicAttribute 來簡化編程式訂閱注冊的過程。比如:
[Topic("pubsub", "deposit")] [HttpPost("deposit")] public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromServices] DaprClient daprClient)如果你是使用gRPC來實現消費端,那么目前并沒有一個簡化方式來注冊(未來我會補上這個坑),只能遵循如下規范:
首先用ListTopicSubscriptions注冊:
public override Task<ListTopicSubscriptionsResponse> ListTopicSubscriptions(Empty request, ServerCallContext context) {var result = new ListTopicSubscriptionsResponse();result.Subscriptions.Add(new TopicSubscription{PubsubName = "pubsub",Topic = "deposit"});result.Subscriptions.Add(new TopicSubscription{PubsubName = "pubsub",Topic = "withdraw"});return Task.FromResult(result); }接著用OnTopicEvent接收:
public override async Task<TopicEventResponse> OnTopicEvent(TopicEventRequest request, ServerCallContext context) {if (request.PubsubName == "pubsub"){var input = JsonSerializer.Deserialize<Models.Transaction>(request.Data.ToStringUtf8(), this.jsonOptions);var transaction = new GrpcServiceSample.Generated.Transaction() { Id = input.Id, Amount = (int)input.Amount, };if (request.Topic == "deposit"){await Deposit(transaction, context);}else{await Withdraw(transaction, context);}}return await Task.FromResult(default(TopicEventResponse)); }具體見SDK的examples:https://github.com/heavenwing/dapr-dotnet-sdk/blob/master/examples/AspNetCore/GrpcServiceSample/Services/BankingService.cs
用法與例子
使用SDK來進行事件消息的發布和訂閱,可以直接參考SDK的examples中的消費端例子 ControllerSample 和生產端例子 PublishSubscribe。
如果是非SDK的用法,可以參考我的quickstarts,消費端 PubSubConsumer和生產端 PubSubProducer。
我的quickstarts的消費端同時使用聲明式和編程式兩種注冊方式。大致代碼如下:
[Route("dapr/subscribe")] [ApiController] public class DaprSubscribeController : ControllerBase {public ActionResult<DaprSubscribeOutput[]> Get(){return Ok(new DaprSubscribeOutput[]{new DaprSubscribeOutput{PubSubName="pubsub",Topic="quickstarts/wakeup",Route="/api/wakeup"}});} }public async Task<IActionResult> PostAsync(TinyCloudEvent<MessageInput> model) {_logger.LogInformation(model.Data.Name);return Ok(); }using var httpClient = new HttpClient(); httpClient.BaseAddress = new Uri(pubsubUrl);Console.WriteLine($"To {topicName1} ..."); var request1 = new HttpRequestMessage(HttpMethod.Post, topicName1); request1.Content = new StringContent(JsonSerializer.Serialize(new { name = "Jack" }), Encoding.UTF8, "application/json"); await httpClient.SendAsync(request1); apiVersion: dapr.io/v1alpha1 kind: Subscription metadata:name: quickstarts-subscription spec:topic: quickstarts/sleeproute: /api/sleeppubsubname: pubsub scopes: - quickstarts-pbc相關文章:
Dapr能否引領云原生中間件的未來?
云原生 | 阿里巴巴的Dapr實踐與探索
Dapr 可視化指南
Dapr 知多少 | 分布式應用運行時
Dapr 正式發布 1.0
Dapr 交通流量控制示例
Dapr是如何簡化微服務的開發和部署
微軟開源微服務運行時Dapr,賦能云原生應用開發
Dapr微服務應用開發系列0:概述
Dapr微服務應用開發系列1:環境配置
Dapr微服務應用開發系列2:Hello World與SDK初接觸
Dapr微服務應用開發系列3:服務調用構件塊
Dapr微服務應用開發系列4:狀態管理構件塊
Dapr微服務應用開發系列5:發布訂閱構建塊
Windows環境下Dapr入門
總結
以上是生活随笔為你收集整理的Dapr微服务应用开发系列5:发布订阅构建块的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: NET问答: 如何从 string 中挖
- 下一篇: 网关Ocelot功能演示安排的明明白白~