学Dapr Actors 看这篇就够了
介紹
Actor模式將Actor描述為最低級別的“計算單元”。換句話說,您在一個獨立的單元(稱為actor)中編寫代碼,該單元接收消息并一次處理一個消息,沒有任何并發或線程。
再換句話說,根據ActorId劃分獨立計算單元后,相同的ActorId重入要排隊,可以理解為lock(ActorId)
注:這里有個反例,就是重入性的引入,這個概念目前還是Preview,它允許同一個鏈內可以重復進入,判斷的標準不止是ActorId這么簡單,即自己調自己是被允許的。這個默認是關閉的,需要手動開啟,即默認不允許自己調自己。
當您的代碼處理一條消息時,它可以向其他參與者發送一條或多條消息,或者創建新的參與者。底層運行時管理每個參與者運行的方式、時間和地點,并在參與者之間路由消息。
大量的Actor可以同時執行,Actor彼此獨立執行。
Dapr 包含一個運行時,它專門實現了 Virtual Actor 模式。 通過 Dapr 的實現,您可以根據 Actor 模型編寫 Dapr Actor,而 Dapr 利用底層平臺提供的可擴展性和可靠性保證。 ?
什么時候用Actors
Actor 設計模式非常適合許多分布式系統問題和場景,但您首先應該考慮的是該模式的約束。一般來說,如果出現以下情況,請考慮使用Actors模式來為您的問題或場景建模:
您的問題空間涉及大量(數千個或更多)小的、獨立且孤立的狀態和邏輯單元。
您希望使用不需要與外部組件進行大量交互的單線程對象,包括跨一組Actors查詢狀態。
您的 Actor 實例不會通過發出 I/O 操作來阻塞具有不可預測延遲的調用者。
Dapr Actor
每個Actor都被定義為Actor類型的實例,就像對象是類的實例一樣。 ?例如,可能有一個執行計算器功能的Actor類型,并且可能有許多該類型的Actor分布在集群的各個節點上。每個這樣的Actor都由一個Acotr ID唯一標識。 ?
生命周期
Dapr Actors是虛擬的,這意味著他們的生命周期與他們的內存表現無關。因此,它們不需要顯式創建或銷毀。Dapr Actors運行時在第一次收到對該Actor ID 的請求時會自動激活該Actor。如果一個Actor在一段時間內沒有被使用,Dapr Actors運行時就會對內存中的對象進行垃圾回收。如果稍后需要重新激活,它還將保持對參與者存在的了解。如果稍后需要重新激活,它還將保持對 Actor 的一切原有數據。
調用 Actor 方法和提醒會重置空閑時間,例如提醒觸發將使Actor保持活躍。無論Actor是活躍還是不活躍,Actor提醒都會觸發,如果為不活躍的Actor觸發,它將首先激活演員。Actor 計時器不會重置空閑時間,因此計時器觸發不會使 Actor 保持活動狀態。計時器僅在Actor處于活動狀態時觸發。
Reminders 和 Timers 最大的區別就是Reminders會保持Actor的活動狀態,而Timers不好會
Dapr 運行時用來查看Actor是否可以被垃圾回收的空閑超時和掃描間隔是可配置的。當 Dapr 運行時調用 Actor 服務以獲取支持的 Actor 類型時,可以傳遞此信息。
由于Virtual Actor模型的存在,這種Virtual Actor生命周期抽象帶來了一些注意事項,事實上,Dapr Actors實現有時會偏離這個模型。 ?
第一次將消息發送到Actor ID時,Actor被自動激活(導致構建Actor對象)。 經過一段時間后,Actor對象將被垃圾回收。被回收后再次使用Actor ID將導致構造一個新的Actor對象。 Actor 的狀態比對象的生命周期長,因為狀態存儲在 Dapr 運行時配置的狀態管理組件中。
注:Actor被垃圾回收之前,Actor對象是會復用的。這里會導致一個問題,在.Net Actor類中,構造函數在Actor存活期間只會被調用一次。
分發和故障轉移
為了提供可擴展性和可靠性,Actor 實例分布在整個集群中,Dapr 根據需要自動將它們從故障節點遷移到健康節點。
Actors 分布在 Actor 服務的實例中,而這些實例分布在集群中的節點之間。 對于給定的Actor類型,每個服務實例都包含一組Actor。
Dapr安置服務(Placement Service)
Dapr Actor 運行時為您管理分發方案和密鑰范圍設置。這是由Actor Placement 服務完成的。創建服務的新實例時,相應的 Dapr 運行時會注冊它可以創建的Actor類型,并且安置服務會計算給定Actor類型的所有實例的分區。每個Actor類型的分區信息表被更新并存儲在環境中運行的每個Dapr實例中,并且可以隨著Actor服務的新實例的創建和銷毀而動態變化。這如下圖所示:
當客戶端調用具有特定ID的Actor(例如,Actor ID 123)時,客戶端的 Dapr 實例會Hash Actor類型和 ID,并使用該信息調用可以為特定Actor ID的請求提供服務的相應Dapr實例。因此,始終為任何給定的Actor ID 調用相同的分區(或服務實例)。這如下圖所示:
這簡化了一些選擇,但也帶來了一些考慮: ?
默認情況下,Actor 隨機放置到 pod 中,從而實現均勻分布。
因為Actor是隨機放置的,應該可以預料到Actor操作總是需要網絡通信,包括方法調用數據的序列化和反序列化,產生延遲和開銷。
注:Dapr Actor 放置服務僅用于 Actor 放置,因此如果您的服務不使用 Dapr Actors,則不需要。 放置服務可以在所有托管環境中運行,包括自托管和 Kubernetes。
Actor通訊
您可以通過HTTP/gRPC調用Actor,當然也可以用SDK。
POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/<method/state/timers/reminders>并發
Dapr Actor 運行時為訪問 Actor 方法提供了一個簡單的回合制(turn-basesd)的訪問模型。這意味著在任何時候,Actor 對象的代碼中都不能有超過一個線程處于活動狀態。
單個Actor實例一次不能處理多個請求。如果預期要處理并發請求,Actor 實例可能會導致吞吐量瓶頸。
單個Actor實例指每個Actor ID對應的Actor對象。單個Actor不并發就沒有問題
如果兩個 Actor 之間存在循環請求,而同時向其中一個 Actor 發出外部請求,則 Actor 之間可能會陷入僵局。Dapr Actor運行時自動超時Actor調用并向調用者拋出異常以中斷可能的死鎖情況。
重入性(Preview)
作為對 dapr 中基礎 Actor 的增強。現在重入性為預覽功能,感興趣的小伙伴可以到看官方文檔。
回合制訪問(Turn-based access)
一個回合包括一個Actor方法的完整執行以響應來自其他Actor或客戶端的請求,或者一個計時器/提醒回調的完整執行。即使這些方法和回調是異步的,Dapr Actor運行時也不會將它們交叉。一個回合必須完全完成后,才允許進行新的回合。換句話說,當前正在執行的Actor方法或計時器/提醒回調必須完全完成,才能允許對方法或回調的新調用。
Dapr Actor運行時通過在回合開始時獲取每個Actor的鎖并在回合結束時釋放鎖來實現基于回合的并發性。 因此,基于回合的并發是在每個Actor的基礎上執行的,而不是跨Actor。Actor 方法和計時器/提醒回調可以代表不同的 Actor 同時執行。
以下示例說明了上述概念。考慮實現兩個異步方法(例如 Method1 和 Method2)、計時器和提醒的Actor 類型。下圖顯示了代表屬于此Actor類型的兩個Actors(ActorId1 和 ActorId2)執行這些方法和回調的時間線示例。
Actor狀態管理
Actor可以使用狀態管理功能可靠地保存狀態。您可以通過 HTTP/gRPC 端點與 Dapr 交互以進行狀態管理。
要使用 actor,您的狀態存儲必須支持事務。這意味著您的狀態存儲組件必須實現 TransactionalStore 接口。只有一個狀態存儲組件可以用作所有參與者的狀態存儲。
事務支持列表:https://docs.dapr.io/reference/components-reference/supported-state-stores/
注:建議學習的時候都用Redis,官方所有的示例也都是基于Redis,比較容易上手,且Dapr init默認集成
Actor計時器和提醒
Actor可以通過注冊計時器或提醒來安排自己的定期工作。
計時器和提醒的功能非常相似。主要區別在于,Dapr Actor 運行時在停用后不保留有關計時器的任何信息,而使用 Dapr Actor 狀態提供程序保留有關提醒的信息。
定時器和提醒的調度配置是相同的,總結如下:
DueTime 是一個可選參數,用于設置第一次調用回調之前的時間或時間間隔。如果省略 DueTime,則在定時器/提醒注冊后立即調用回調。
支持的格式:
RFC3339 日期格式,例如2020-10-02T15:00:00Z
time.Duration 格式,例如2h30m
ISO 8601 持續時間格式,例如PT2H30M
period 是一個可選參數,用于設置兩次連續回調調用之間的時間間隔。當以 ISO 8601-1 持續時間格式指定時,您還可以配置重復次數以限制回調調用的總數。如果省略 period,則回調將僅被調用一次。
支持的格式:
time.Duration 格式,例如2h30m
ISO 8601 持續時間格式,例如PT2H30M, R5/PT1M30S
ttl 是一個可選參數,用于設置計時器/提醒到期和刪除的時間或時間間隔。如果省略 ttl,則不應用任何限制。
支持的格式:
RFC3339 日期格式,例如2020-10-02T15:00:00Z
time.Duration 格式,例如2h30m
ISO 8601 持續時間格式,例如PT2H30M
當您同時指定周期內的重復次數和 ttl 時,計時器/提醒將在滿足任一條件時停止。
Actor 運行時配置
actorIdleTimeout - 停用空閑 actor 之前的超時時間。每個 actorScanInterval 間隔都會檢查超時。默認值:60 分鐘
actorScanInterval - 指定掃描演員以停用空閑Actor的頻率的持續時間。閑置時間超過 actor_idle_timeout 的 Actor 將被停用。默認值:30 秒
drainOngoingCallTimeout - 在耗盡Rebalanced的Actor的過程中的持續時間。這指定了當前活動 Actor 方法完成的超時時間。如果當前沒有 Actor 方法調用,則忽略此項。默認值:60 秒
drainRebalancedActors - 如果為 true,Dapr 將等待 drainOngoingCallTimeout 持續時間以允許當前角色調用完成,然后再嘗試停用角色。默認值:true
drainRebalancedActors與上面的drainOngoingCallTimeout需搭配使用
reentrancy - (ActorReentrancyConfig) - 配置角色的重入行為。如果未提供,則禁用可重入。默認值:disabled, 0
remindersStoragePartitions - 配置Actor提醒的分區數。如果未提供,則所有提醒都將保存為Actor狀態存儲中的單個記錄。默認值:0
分區提醒(Preview)
在 sidecar 重新啟動后,Actor 提醒會保留并繼續觸發。在 Dapr 運行時版本 1.3 之前,提醒被保存在 actor 狀態存儲中的單個記錄上。
此為Preview功能,感興趣可以看官方文檔
.Net調用Dapr的Actor
與以往不同,Actor示例會多創建一個共享類庫用于存放Server和Client共用的部分
創建Assignment.Shared
創建類庫項目,并添加Dapr.ActorsNuGet包引用,最后添加以下幾個類:
AccountBalance.cs
namespace Assignment.Shared; public class AccountBalance {public string AccountId { get; set; } = default!;public decimal Balance { get; set; } }IBankActor.cs
注:這個是Actor接口,IActor是Dapr SDK提供的
using Dapr.Actors;namespace Assignment.Shared; public interface IBankActor : IActor {Task<AccountBalance> GetAccountBalance();Task Withdraw(WithdrawRequest withdraw); }OverdraftException.cs
namespace Assignment.Shared; public class OverdraftException : Exception {public OverdraftException(decimal balance, decimal amount): base($"Your current balance is {balance:c} - that's not enough to withdraw {amount:c}."){} }WithdrawRequest.cs
namespace Assignment.Shared; public class WithdrawRequest {public decimal Amount { get; set; } }創建Assignment.Server
創建類庫項目,并添加Dapr.Actors.AspNetCoreNuGet包引用和Assignment.Shared項目引用,最后修改程序端口為5000。
注:Server與Shared和Client的NuGet包不一樣,Server是集成了服務端的一些功能
修改program.cs
var builder = WebApplication.CreateBuilder(args); builder.Services.AddSingleton<BankService>(); builder.Services.AddActors(options => {options.Actors.RegisterActor<DemoActor>(); });var app = builder.Build();app.UseRouting();app.UseEndpoints(endpoints => {endpoints.MapActorsHandlers(); });app.Run();添加BankService.cs
using Assignment.Shared;namespace Assignment.Server; public class BankService {// Allow overdraft of up to 50 (of whatever currency).private readonly decimal OverdraftThreshold = -50m;public decimal Withdraw(decimal balance, decimal amount){// Imagine putting some complex auditing logic here in addition to the basics.var updated = balance - amount;if (updated < OverdraftThreshold){throw new OverdraftException(balance, amount);}return updated;} }添加BankActor.cs
using Assignment.Shared; using Dapr.Actors.Runtime; using System;namespace Assignment.Server; public class BankActor : Actor, IBankActor, IRemindable // IRemindable is not required {private readonly BankService bank;public BankActor(ActorHost host, BankService bank): base(host){// BankService is provided by dependency injection.// See Program.csthis.bank = bank;}public async Task<AccountBalance> GetAccountBalance(){var starting = new AccountBalance(){AccountId = this.Id.GetId(),Balance = 10m, // Start new accounts with 100, we're pretty generous.};var balance = await StateManager.GetOrAddStateAsync("balance", starting);return balance;}public async Task Withdraw(WithdrawRequest withdraw){var starting = new AccountBalance(){AccountId = this.Id.GetId(),Balance = 10m, // Start new accounts with 100, we're pretty generous.};var balance = await StateManager.GetOrAddStateAsync("balance", starting)!;if (balance.Balance <= 0){// Simulated reminder depositif (Random.Shared.Next(100) > 90){await RegisterReminderAsync("Deposit", null, TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1));}}// Throws Overdraft exception if the account doesn't have enough money.var updated = this.bank.Withdraw(balance.Balance, withdraw.Amount);balance.Balance = updated;await StateManager.SetStateAsync("balance", balance);}public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period){if (reminderName == "Deposit"){var balance = await StateManager.GetStateAsync<AccountBalance>("balance")!;if (balance.Balance <= 0){balance.Balance += 60; // 50(Overdraft Threshold) + 10 = 60Console.WriteLine("Deposit: 10");}else{Console.WriteLine("Deposit: ignore");}}} }運行Assignment.Server
使用Dapr CLI來啟動,先使用命令行工具跳轉到目錄 dapr-study-room\Assignment07\Assignment.Server,然后執行下面命令
dapr run --app-id testactor --app-port 5000 --dapr-http-port 3500 --dapr-grpc-port 50001 dotnet run創建Assignment.Client
創建控制臺項目,并添加Dapr.ActorsNuGet包引用和Assignment.Shared項目引用。
修改Program.cs
using Assignment.Shared; using Dapr.Actors; using Dapr.Actors.Client;Console.WriteLine("Creating a Bank Actor"); var bank = ActorProxy.Create<IBankActor>(ActorId.CreateRandom(), "BankActor"); Parallel.ForEach(Enumerable.Range(1, 10), async i => {while (true){var balance = await bank.GetAccountBalance();Console.WriteLine($"[Worker-{i}] Balance for account '{balance.AccountId}' is '{balance.Balance:c}'.");Console.WriteLine($"[Worker-{i}] Withdrawing '{1m:c}'...");try{await bank.Withdraw(new WithdrawRequest() { Amount = 1m });}catch (ActorMethodInvocationException ex){Console.WriteLine("[Worker-{i}] Overdraft: " + ex.Message);}Task.Delay(1000).Wait();} });Console.ReadKey();運行Assignment.Client
使用Dapr CLI來啟動,先使用命令行工具跳轉到目錄 dapr-study-room\Assignment07\Assignment.Client,然后執行下面命令
dotnet run本章源碼
Assignment07
https://github.com/doddgu/dapr-study-room
我們正在行動,新的框架、新的生態
我們的目標是自由的、易用的、可塑性強的、功能豐富的、健壯的。
所以我們借鑒Building blocks的設計理念,正在做一個新的框架MASA Framework,它有哪些特點呢?
原生支持Dapr,且允許將Dapr替換成傳統通信方式
架構不限,單體應用、SOA、微服務都支持
支持.Net原生框架,降低學習負擔,除特定領域必須引入的概念,堅持不造新輪子
豐富的生態支持,除了框架以外還有組件庫、權限中心、配置中心、故障排查中心、報警中心等一系列產品
核心代碼庫的單元測試覆蓋率90%+
開源、免費、社區驅動
還有什么?我們在等你,一起來討論
經過幾個月的生產項目實踐,已完成POC,目前正在把之前的積累重構到新的開源項目中
目前源碼已開始同步到Github(文檔站點在規劃中,會慢慢完善起來):
MASA.BuildingBlocks
MASA.Contrib
MASA.Utils
MASA.EShop
BlazorComponent
MASA.Blazor
QQ群:7424099
微信群:加技術運營微信(MasaStackTechOps),備注來意,邀請進群
總結
以上是生活随笔為你收集整理的学Dapr Actors 看这篇就够了的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Redis集群之脑裂:一次奇怪的数据丢失
- 下一篇: MQTT断线重连及订阅消息恢复