Dapr + .NET 实战(四)发布和订阅
什么是發布-訂閱
發布訂閱是一種眾所周知并被廣泛使用的消息傳送模式,常用在微服務架構的服務間通信,高并發削峰等情況。但是不同的消息中間件之間存在細微的差異,項目使用不同的產品需要實現不同的實現類,雖然是明智的決策,但必須編寫和維護抽象及其基礎實現。此方法需要復雜、重復且容易出錯的自定義代碼。
Dapr為了解決這種問題,提供開箱即用的消息傳送抽象和實現,封裝在 Dapr 構建基塊中。業務系統只需調用跟據Dapr的要求實現訂閱發布即可。
工作原理
Dapr 發布&訂閱構建基塊提供了一個與平臺無關的 API 框架來發送和接收消息。
服務將消息發布到指定主題, 業務服務訂閱主題以使用消息。
服務在 Dapr sidecar 上調用 pub/sub API。然后,sidecar 調用預定義 Dapr pub/sub 組件。
任何編程平臺都可以使用 Dapr 本機 API 通過 HTTP 或 gRPC 調用構建基塊。若要發布消息,請進行以下 API 調用:
http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>上述調用中有幾個特定于 Dapr 的 URL 段:
<dapr-port>?提供 Dapr sidecar 偵聽的端口號。
<pub-sub-name>?提供所選 Dapr pub/sub 組件的名稱。
<topic>?提供消息發布到的主題的名稱。
設置發布訂閱組件
Dapr 為 Pub/Sub 提供很多支持的組件,例如 Redis 和 Kafka 等。支持組件詳見?鏈接
在win10上的自承載的Dapr中,默認在?%UserProfile%\.dapr\components\pubsub.yaml 中使用redis作為了pub/sub組件,dapr run一個app時,使用默認組件作為pub/sub組件
apiVersion: dapr.io/v1alpha1 kind: Component metadata:name: pubsub spec:type: pubsub.redisversion: v1metadata:- name: redisHostvalue: localhost:6379- name: redisPasswordvalue: ""訂閱主題
Dapr 允許兩種方法訂閱主題:
聲明式,其中定義在外部文件中。
編程方式,訂閱在用戶代碼中定義
1.聲明式訂閱
在默認組件目錄 %UserProfile%\.dapr\components\pubsub.yaml 中新建subscription.yaml文件,并寫入以下內容
apiVersion: dapr.io/v1alpha1 kind: Subscription metadata:name: myevent-subscription spec:topic: test_topicroute: /TestPubSubpubsubname: pubsub scopes: - frontend上面的示例顯示了?test_topic主題的事件訂閱,使用組件?pubsub。
route?告訴 Dapr 將所有主題消息發送到應用程序中的?/TestPubSub?端點。
scopes?為?FrontEnd啟用訂閱
現在需要在FrontEnd項目中定義接口TestSub,在FrontEnd中新建TestPubSubController
using Dapr.Client;using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Logging;using System.IO; using System.Text; using System.Threading.Tasks;namespace FrontEnd.Controllers {[Route("[controller]")][ApiController]public class TestPubSubController : ControllerBase{private readonly ILogger<TestPubSubController> _logger;private readonly DaprClient _daprClient;public TestPubSubController(ILogger<TestPubSubController> logger, DaprClient daprClient){_logger = logger;_daprClient = daprClient;}[HttpPost]public ActionResult Post(){Stream stream = Request.Body;byte[] buffer = new byte[Request.ContentLength.Value];stream.Position = 0L;stream.ReadAsync(buffer, 0, buffer.Length);string content = Encoding.UTF8.GetString(buffer);return Ok(content);}[HttpGet("pub")]public async Task<ActionResult> PubAsync(){var data = new WeatherForecast();await _daprClient.PublishEventAsync<WeatherForecast>("pubsub", "test_topic", data);return Ok();}} }需要在Startup的Configure中開啟重復讀取Body才能讀取到數據
app.Use((context, next) =>{context.Request.EnableBuffering();return next();});啟動FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .\FrontEnd\bin\Debug\net5.0\FrontEnd.dll調用?pub API發布消息
查看訂閱情況,訂閱消息消費成功
?2.編程式訂閱
為了防止聲明式訂閱的影響,需要先把目錄<%UserProfile%\.dapr\components\pubsub.yaml>中subscription.yaml文件刪除
TestPubSubController新增Api Sub
[Topic("pubsub", "test_topic")][HttpPost("sub")]public async Task<ActionResult> Sub(){Stream stream = Request.Body;byte[] buffer = new byte[Request.ContentLength.Value];stream.Position = 0L;stream.ReadAsync(buffer, 0, buffer.Length);string content = Encoding.UTF8.GetString(buffer);_logger.LogInformation("testsub" + content);return Ok(content);}在Startup的Configure方法中新增中間件
public void Configure(IApplicationBuilder app, IWebHostEnvironment env) {// ...app.UseCloudEvents();app.UseEndpoints(endpoints =>{endpoints.MapSubscribeHandler();// ...}); }啟動FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .\FrontEnd\bin\Debug\net5.0\FrontEnd.dll調用API發布消息
?查看訂閱情況,訂閱消息消費成功
?通過DapreCLI同樣可以發布消息
dapr publish --publish-app-id frontend --pubsub pubsub --topic test_topic --data '{"date":"0001-01-01T00:00:00","temperatureC":0,"temperatureF":32,"summary":null}'查看訂閱情況,訂閱消息消費成功
相關文章:
Dapr實戰(一) 基礎概念與環境搭建
Dapr + .NET Core實戰(二) 服務調用
Dapr + .NET Core實戰(三)狀態管理
總結
以上是生活随笔為你收集整理的Dapr + .NET 实战(四)发布和订阅的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【干货】单日10亿GMV的.NET5电商
- 下一篇: WPF实现截屏(仿微信)