手把手教你学Dapr - 6. 发布订阅
介紹
發布/訂閱模式允許微服務使用消息相互通信。生產者或發布者在不知道哪個應用程序將接收它們的情況下向主題發送消息。這涉及將它們寫入輸入通道。同樣,消費者或訂閱者訂閱該主題并接收其消息,而不知道是什么服務產生了這些消息。這涉及從輸出通道接收消息。中間消息代理負責將每條消息從輸入通道復制到所有對該消息感興趣的訂閱者的輸出通道。當您需要將微服務彼此分離時,這種模式特別有用。
Dapr 中的發布/訂閱 API 提供至少一次(at-least-once)的保證,并與各種消息代理和隊列系統集成。 您的服務所使用的特定實現是可插入的,并被配置為運行時的 Dapr Pub/Sub 組件。 這種方法消除了您服務的依賴性,從而使您的服務可以更便攜,更靈活地適應更改。
Dapr 發布/訂閱構建塊提供了一個與平臺無關的 API 來發送和接收消息。您的服務將消息發布到命名主題,并訂閱主題以使用這些消息。
下圖顯示了一個“shipping”服務和一個“email”服務的例子,它們都訂閱了“cart”服務發布的主題。每個服務都會加載指向同一發布/訂閱消息總線組件的發布/訂閱組件配置文件,例如 Redis Streams、NATS Streaming、Azure Service Bus 或 GCP Pub/Sub。
下圖具有相同的服務,但是這次顯示的是 Dapr 發布 API,它發送“訂單”主題和訂閱服務上的訂單端點,這些主題消息由 Dapr 發布到。
特性
Cloud Events消息格式
為了啟用消息路由并為每條消息提供額外的上下文,Dapr 使用 CloudEvents 1.0 規范作為其消息格式。應用程序使用 Dapr 發送到主題的任何消息都會自動“包裝”在 Cloud Events 信封中,使用 datacontenttype 屬性的 Content-Type 標頭值。
Dapr 實現了以下 Cloud Events 字段:
id
source
specversion
type
datacontenttype (Optional)
以下示例顯示了 CloudEvent v1.0 中序列化為 JSON 的 XML 內容:
{"specversion" : "1.0","type" : "xml.message","source" : "https://example.com/message","subject" : "Test XML Message","id" : "id-1234-5678-9101","time" : "2020-09-23T06:23:21Z","datacontenttype" : "text/xml","data" : "<note><to>User1</to><from>user2</from><message>hi</message></note>" }消息訂閱
Dapr 應用程序可以訂閱已發布的主題。 Dapr 允許您的應用程序訂閱主題的兩種方法:
聲明式,其中訂閱在外部文件中定義
程序化,在用戶代碼中定義訂閱
消息傳遞
原則上,當訂閱者在處理消息后以非錯誤響應進行響應時,Dapr 認為消息已成功傳遞。為了進行更精細的控制,Dapr 的發布/訂閱 API 還提供了在響應負載中定義的顯式狀態,訂閱者可以使用這些狀態向 Dapr 指示特定的處理指令(例如 RETRY 或 DROP)。如果兩個不同的應用程序(不同的 app-ID)訂閱了同一個主題,Dapr 將每條消息只傳遞給每個應用程序的一個實例。
主題范圍
默認情況下,所有支持 Dapr 發布/訂閱組件(例如 Kafka、Redis Stream、RabbitMQ)的主題都可用于配置了該組件的每個應用程序。為了限制哪個應用程序可以發布或訂閱主題,Dapr 提供了主題范圍。這使您能夠允許應用程序發布哪些主題以及允許應用程序訂閱哪些主題。
消息生存時間(TTL)
Dapr 可以在每條消息的基礎上設置超時消息,這意味著如果沒有從 pub/sub 組件讀取消息,則該消息將被丟棄。這是為了防止堆積未讀的消息。在隊列中比配置的 TTL 時間長的消息稱為死消息。
注:也可以在組件創建時為給定隊列設置消息 TTL。
與不使用 Dapr 和 CloudEvents 的應用程序通信
對于一個應用程序使用 Dapr 而另一個應用程序不使用的場景,可以為發布者或訂閱者禁用 CloudEvent 包裝。這允許在不能一次全部采用 Dapr 的應用程序中部分采用 Dapr 發布訂閱。
使用.Net調用Dapr的發布訂閱
以下示例創建應用程序來發布和訂閱名為 deathStarStatus 的主題
先決條件
運行 dapr init 時,Redis Streams 默認安裝在本地機器上。如果是 dapr init --slim 需要自己動手操作一些東西了,這里就不演示了。
通過打開您的組件文件進行驗證 %UserProfile%\.dapr\components\pubsub.yaml
apiVersion: dapr.io/v1alpha1 kind: Component metadata:name: pubsub spec:type: pubsub.redisversion: v1metadata:- name: redisHostvalue: localhost:6379- name: redisPasswordvalue: ""注:這里redisHost的value可以根據實際情況設置,比如某云的redis實例等。又有人問過是否可以切換默認db,當然可以,name設置redisDB,value設置為你要使用的db即可
如果你要更詳細的yaml配置參數,比如并發設置、最大重試次數等等都可以看這里 https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-redis-pubsub/
訂閱主題
訂閱主題有兩種方式:聲明式、程序化
聲明式訂閱
您可以使用以下自定義資源定義 (CRD) 訂閱主題。創建一個名為 subscription.yaml 的文件并粘貼以下內容:
apiVersion: dapr.io/v1alpha1 kind: Subscription metadata:name: myevent-subscription spec:topic: deathStarStatusroute: /dsstatuspubsubname: pubsub scopes: - app1 - app2上面的示例顯示了對 pubsub 組件 pubsub 主題 deathStarStatus 的事件訂閱。
將CRD文件放到組件目錄即可,這里不繼續展開說了。
程序化訂閱
Dapr 實例在啟動時調用您的應用程序并期待主題訂閱的 JSON 響應:
pubsubname:Dapr 應該使用哪個 pub/sub 組件
topic:訂閱哪個主題
route:當消息涉及該主題時,Dapr 調用哪個endpoint
注:你可能會覺得,這是不是很麻煩?是的,所以我們用dapr dotnet-sdk來幫助我們自動完成這些事情
.Net訂閱
以上是讓dapr sidecar知道這個消息的訂閱最終給誰。但我們的程序里要怎么寫呢?
如果你選擇的是聲明式訂閱,你做一個route即可,而如果是程序化訂閱則不需要多寫一個yaml文件,且通過特性即可支持,接下來看看.Net SDK怎么做的吧。
創建Assignment.Server(Sub)
創建ASP.NET Core 空項目,同時根據之前的文章內容添加Dapr.AspNetCoreNuGet包和修改程序端口為5000
修改program.cs代碼
using Microsoft.AspNetCore.Mvc;var builder = WebApplication.CreateBuilder(args); var app = builder.Build();app.UseRouting(); app.UseCloudEvents(); app.UseEndpoints(endpoints => {endpoints.MapSubscribeHandler(); });app.MapPost("/dsstatus", ([FromBody] string word) => Console.WriteLine($"Hello {word}!")).WithTopic("pubsub", "deathStarStatus");app.Run();注:為了告訴 Dapr 消息已成功處理,請返回 200 OK 響應。如果 Dapr 收到除 200 之外的任何其他返回狀態代碼,或者如果您的應用程序崩潰,Dapr 將嘗試按照 At-Least-Once 語義重新傳遞消息。
運行Assignment.Server
使用Dapr CLI來啟動,先使用命令行工具跳轉到目錄 dapr-study-room\Assignment06\Assignment.Server,然后執行下面命令
dapr run --app-id testpubsub --app-port 5000 --dapr-http-port 3500 --dapr-grpc-port 50001 dotnet run創建Assignment.Client(Publish)
創建控制臺項目,并修改program.cs
var client = new Dapr.Client.DaprClientBuilder().Build(); await client.PublishEventAsync<string>("pubsub", "deathStarStatus", "World");運行Assignment.Client即可看到Assignment.Server中會打印Hello World!
將消息路由到不同的事件處理程序
基于內容的路由是一種使用 DSL 而不是命令式應用程序代碼的消息傳遞模式。PubSub 路由是此模式的一種實現,它允許開發人員使用表達式根據 CloudEvents 的內容將其路由到應用程序中的不同 URI/路徑和事件處理程序。
注:這是個預覽功能,如果你感興趣可自行嘗試,值得一提的是Common Expression Language (CEL)很有趣,這里就只貼一段代碼看看吧。
[Topic("pubsub", "inventory", "event.type ==\"widget\"", 1)][HttpPost("widgets")]public async Task<ActionResult<Stock>> HandleWidget(Widget widget, [FromServices] DaprClient daprClient){// Logicreturn stock;}[Topic("pubsub", "inventory", "event.type ==\"gadget\"", 2)][HttpPost("gadgets")]public async Task<ActionResult<Stock>> HandleGadget(Gadget gadget, [FromServices] DaprClient daprClient){// Logicreturn stock;}[Topic("pubsub", "inventory")][HttpPost("products")]public async Task<ActionResult<Stock>> HandleProduct(Product product, [FromServices] DaprClient daprClient){// Logicreturn stock;}發布/訂閱主題訪問權限
命名空間或組件范圍可用于限制對特定應用程序的組件訪問。添加到組件的這些應用程序范圍僅限制具有特定 ID 的應用程序能夠使用該組件。
除了這個通用組件范圍之外,發布/訂閱組件還可以限制以下內容:
可以使用哪些主題(已發布或已訂閱)
允許哪些應用程序發布到特定主題
允許哪些應用程序訂閱特定主題
主題訪問權限
apiVersion: dapr.io/v1alpha1 kind: Component metadata:name: pubsubnamespace: default spec:type: pubsub.redisversion: v1metadata:- name: redisHostvalue: "localhost:6379"- name: redisPasswordvalue: ""- name: publishingScopesvalue: "app1=topic1;app2=topic2,topic3;app3="- name: subscriptionScopesvalue: "app2=;app3=topic1"下表顯示了允許哪些應用程序發布到主題中:
| app1 | X | ||
| app2 | X | X | |
| app3 |
下表顯示了允許哪些應用程序訂閱主題:
| app1 | X | X | X |
| app2 | |||
| app3 | X |
注:如果應用程序未列出(例如 subscriptionScopes 中的 app1),則允許訂閱所有主題。由于未使用 allowedTopics 且 app1 沒有任何訂閱范圍,因此它還可以使用上面未列出的其他主題。
限制允許的主題
如果 Dapr 應用程序向其發送消息,則會創建一個主題。在某些情況下,應該控制這個主題的創建。例如:
在Dapr應用程序中,在生成主題名稱時出現的錯誤可能導致創建無限數量的主題 ?
精簡主題名稱和總數,防止主題無限增長
結合 allowedTopics 和 scopes
有時您希望組合這兩個作用域,因此只允許一組固定的主題,并將作用域指定給特定的應用程序。 ?
apiVersion: dapr.io/v1alpha1 kind: Component metadata:name: pubsubnamespace: default spec:type: pubsub.redisversion: v1metadata:- name: redisHostvalue: "localhost:6379"- name: redisPasswordvalue: ""- name: allowedTopicsvalue: "A,B"- name: publishingScopesvalue: "app1=A"- name: subscriptionScopesvalue: "app1=;app2=A"注:第三個應用程序沒有列出,因為如果一個應用程序沒有在范圍內指定,它是允許使用所有主題的。
下表顯示了允許發布到主題的應用程序:
| app1 | X | ||
| app2 | X | X | |
| app3 | X | X |
下表顯示了哪個應用程序可以訂閱主題: ?
| app1 | |||
| app2 | X | ||
| app3 | X | X |
消息TTL
同狀態管理,使用 metadata 的 ttlInSeconds
本章源碼
Assignment06
https://github.com/doddgu/dapr-study-room
我們正在行動,新的框架、新的生態
我們的目標是自由的、易用的、可塑性強的、功能豐富的、健壯的。
所以我們借鑒Building blocks的設計理念,正在做一個新的框架MASA Framework,它有哪些特點呢?
原生支持Dapr,且允許將Dapr替換成傳統通信方式
架構不限,單體應用、SOA、微服務都支持
支持.Net原生框架,降低學習負擔,除特定領域必須引入的概念,堅持不造新輪子
豐富的生態支持,除了框架以外還有組件庫、權限中心、配置中心、故障排查中心、報警中心等一系列產品
核心代碼庫的單元測試覆蓋率90%+
開源、免費、社區驅動
還有什么?我們在等你,一起來討論
經過幾個月的生產項目實踐,已完成POC,目前正在把之前的積累重構到新的開源項目中
目前源碼已開始同步到Github(文檔站點在規劃中,會慢慢完善起來):
MASA.BuildingBlocks
MASA.Contrib
MASA.Utils
MASA.EShop
BlazorComponent
MASA.Blazor
QQ群:7424099
微信群:加技術運營微信(MasaStackTechOps),備注來意,邀請進群
總結
以上是生活随笔為你收集整理的手把手教你学Dapr - 6. 发布订阅的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何高效的将 DataReader 转成
- 下一篇: WPF 透明窗口在桌面上放虫子。。。