AspNetCoreMassTransit Courier实现分布式事务
在之前的一篇博文中,CAP框架可以方便我們實現(xiàn)非實時、異步場景下的最終一致性,而有些用例總是無法避免的需要在實時、同步場景下進行,可以借助Saga事務來解決這一困擾。在一些博文和倉庫中也搜尋到了.Net下實現(xiàn)Saga模式的解決方案MassTransit,這就省得自己再造輪子了。
分布式事務
分布式系統(tǒng)中,分布式事務是一個不能避免的問題,如何保證不同節(jié)點間的數(shù)據(jù)一致性。舉個常見的例子,下訂單、減庫存、扣余額,三者在單個節(jié)點時,可以借助本地事務,實現(xiàn)要么成功要么失敗。而當三者處于不同節(jié)點時,又參雜了如網(wǎng)絡環(huán)境、節(jié)點自身環(huán)境、服務環(huán)境等各種因素,使得三個節(jié)點想要實現(xiàn)要么成功、要么失敗就增加了許多困難。
CAP理論和BASE理論很好的詮釋了這一問題,也有了許多的解決分布式事務的方案,如2PC、3PC、TCC、本地消息表、Saga等一系列解決方案,面對不同場景、不同要求等可選擇不同的解決方案。
| 2PC | 強 | 低 | 中 | 低 | 低 |
| 3PC | 強 | 低 | 高 | 低 | 中 |
| TCC | 弱 | 高 | 高 | 中 | 高 |
| 本地消息表 | 弱 | 高 | 低 | 中 | 中 |
| MQ事務 | 弱 | 高 | 低 | 高 | 中 |
| Saga事務 | 弱 | 高 | 高 | 中 | 高 |
在之前提到過一個基于本地消息表的CAP框架,借助最終一致性很方便的解決了異步非實時請求下的分布式事務,而對于大部分場景雖然可以直接或者妥協(xié)方式使用著異步非實時,如同步實時場景的下訂單且減庫存變更到異步非實時場景的下訂單后發(fā)事件減庫存,但是總有那么一些場景,不得不去考慮同步實時請求下的分布式事務。
Saga模式
Saga模式又叫做長時間運行事務(Long-running-transaction), 由普林斯頓大學的 Hector Garcia-Molina和Kenneth Salem 1987年發(fā)表的論文《Sagas》。核心思想是將長事務拆分為多個本地短事務,通過保證所有短事務的成功或失敗來決定整體的成功或失敗,由Saga事務協(xié)調(diào)器協(xié)調(diào)管理,所有節(jié)點執(zhí)行成功,則成功,如有節(jié)點失敗,則反向執(zhí)行前置節(jié)點的補償操作。
每個Saga事務由一系列冪等的有序子事務(sub-transaction) Ti 組成。
每個Ti 都有對應的冪等補償動作Ci,補償動作用于撤銷Ti造成的結(jié)果。
執(zhí)行過程
當正常執(zhí)行時,依照T1、T2、T3三個短事務正常執(zhí)行下去,直到最后一個Tn事務執(zhí)行完畢,宣告整個事務的成功。
而當執(zhí)行到某個Tj出現(xiàn)故障時,則反向補償之前的Tj-1..T1,每個對應的補償操作Cj-1...C1,其中Tj事務由于在執(zhí)行階段就已失敗,所以Tj對應的補償動作Cj不需要執(zhí)行,即也確定了最后一個Tn事務可以不設置補償動作Cn。
恢復策略
向前恢復(forward recovery):對于Ti事務的執(zhí)行,部分場景下可能因為數(shù)據(jù)庫的連接、網(wǎng)絡的波動等導致短暫的失敗,對Ti事務重試執(zhí)行,以確保整個事務的執(zhí)行,如執(zhí)行T1, T2, T3,當執(zhí)行T3失敗時,不直接宣告失敗,對T3執(zhí)行重試以排除部分不穩(wěn)定因素,如在若干次重試無效后,再考慮向后恢復。
向后恢復(backward recovery):按照執(zhí)行順序方式作為向前的指向,則向后為反向補償,對已執(zhí)行過的節(jié)點順序倒退執(zhí)行各Ti的補償動作Ci,也就是把走過的路往回走,對執(zhí)行過的操作執(zhí)行業(yè)務上的反操作,如正向流程執(zhí)行減庫存則補償操作時執(zhí)行加庫存。
協(xié)作方式
對于服務與服務間的協(xié)作,我們通常有兩種模式:Orchestration(編排式) 和 Choreography(協(xié)同式),在Saga模式中也有著這兩種的實現(xiàn)。
編排式(Orchestrator):把 Saga 的決策和執(zhí)行順序邏輯集中在一個 Saga 編排器類中。Saga 編排器發(fā)出命令式消息給各個 Saga 參與方,指示這些參與方服務完成具體操作(本地事務)。
協(xié)同式(Choreography):把 Saga 的決策和執(zhí)行順序邏輯分布在 Saga 的每個參與方中,它們通過交換事件的方式來進行溝通。
編排式與協(xié)同式的差異僅在于服務之間的協(xié)作方式,每個參與服務的接口定義卻沒有任何區(qū)別。
編排式(Orchestrator)
編排式的 Saga 需要開發(fā)人員定義一個編排器類,用于編排一個Saga中多個參與服務執(zhí)行的流程。如果整個業(yè)務流程正常結(jié)束,業(yè)務就成功完成,一旦這個過程的任何環(huán)節(jié)出現(xiàn)失敗,Saga編排器類就會以相反的順序調(diào)用補償操作,重新進行業(yè)務回滾。
對于每個參與的服務而言,需要做的事情是
訂閱并處理命令消息
執(zhí)行命令后返回響應消息
設計執(zhí)行邏輯和補償邏輯
以提交訂單為例,假設場景是分布式系統(tǒng)下,進程間以消息傳遞進行通信:
1、事務發(fā)起方的主業(yè)務邏輯請求預先定義好的Saga編排器類(內(nèi)部編排了執(zhí)行順序)。
2、Saga編排器類向MQ發(fā)送減庫存事件,庫存服務訂閱事件、執(zhí)行處理并返回MQ處理結(jié)果。
3、Saga編排器類向MQ發(fā)送減余額事件,支付服務訂閱事件、執(zhí)行處理并返回MQ處理結(jié)果。
4、Saga編排器類向MQ發(fā)送創(chuàng)建訂單命令,訂單服務訂閱事件并按照命令創(chuàng)建訂單。
5、主業(yè)務邏輯接收并處理Saga編排器類處理結(jié)果。
6、整個過程由Saga 編排器類對接收到的回復進行判決,來決定是繼續(xù)執(zhí)行還是懸崖勒馬。
協(xié)同式(Choreography)
沒有集中式的編排類,而是各參與方間相互訂閱,一個服務訂閱另一個服務的事件。
先由事務發(fā)起方執(zhí)行邏輯并發(fā)布一個事件,該事件被一個或多個服務進行訂閱,這些服務執(zhí)行本地數(shù)據(jù)庫操作并發(fā)布(或不發(fā)布)新的事件,該部分需要保證本地數(shù)據(jù)庫的操作成功且寫入MQ的消息也成功,可考慮使用本地消息表或是基于MQ事務。當最后一個服務執(zhí)行本地事務并且不發(fā)布任何事件或者發(fā)布的事件沒有被任何Saga參與者訂閱意味著事務結(jié)束,則整個業(yè)務流程的分布式事務完成。如果某一服務出現(xiàn)故障,那么則反向發(fā)布事件,執(zhí)行補償操作,以此回滾。
以提交訂單為例,假設場景是分布式系統(tǒng)下,進程間以消息傳遞進行通信:
1、事務發(fā)起方執(zhí)行主業(yè)務邏輯發(fā)送提交訂單命令。
2、庫存服務訂閱事件、扣減庫存并發(fā)布已扣減事件。
3、訂單服務訂閱庫存已扣減事件,創(chuàng)建訂單并發(fā)布訂單已創(chuàng)建事件。
4、支付服務訂閱訂單已創(chuàng)建事件,執(zhí)行支付并發(fā)布訂單已支付事件。
5、主業(yè)務邏輯訂閱訂單已支付事件并處理。
當某服務內(nèi)執(zhí)行時如存在異常,則反向發(fā)布事件,如訂單創(chuàng)建失敗,則發(fā)布OrderCreatedFailed事件,庫存服務訂閱該事件并執(zhí)行補償操作。
相比而言,編排式中參與服務無需向協(xié)同式中訂閱上游服務的事件,減少了服務間對事件協(xié)議的依賴,而只需要關心集權(quán)的編排器類發(fā)送的消息。
MassTransit Courier
MassTransit Courier是一種用于創(chuàng)建和執(zhí)行帶有故障補償?shù)姆植际绞聞盏臋C制,它可以用于滿足本地事務的需求,也可以在分布式系統(tǒng)中實現(xiàn)分布式事務。
Courier實現(xiàn)了Routing Slip模式,通過有序組合一系列的Activity,得到一個Routing slip。每個Activity都有 Execute 和 Compensate 兩個方法(最后一個可以只有一個Execute方法)。Compensate即為補償操作。
補償服務
當開啟一個事務前,需要做一些準備,準備一個事務Id,記錄整個事務執(zhí)行情況,各Tj事務執(zhí)行情況,當前請求上下文參數(shù),入?yún)?shù)記錄等,以方便執(zhí)行補償操作時需要用到。如當Tj事務執(zhí)行失敗時,需要對Cj-1到C1執(zhí)行補償操作,此時各補償操作需要一些正向執(zhí)行T1,Tj-1的請求參數(shù)或執(zhí)行結(jié)果,因此都需要記錄下來。
在Courier中,通過Routing Slip來完成這些記錄,創(chuàng)建一個Guid,記錄請求上下文參數(shù)信息,可以綁定幾個內(nèi)置事件,在各階段到來時會發(fā)送事件,如有需要可以訂閱。
服務建立
弄了個Demo,建立了三個服務,此處我使用編排式來完成,但無論是選用編排式還是協(xié)同式,都借助RabbitMQ實現(xiàn)消息傳遞。
每個服務都安裝了MassTransit相關的包
MassTransit.AspNetCore MassTransit.RabbitMQ將Saga編排器類放置在OrderService中了,對于編排器類的放置,個人認為是應該看用例的主服務是誰而放置,想過放在BFF去協(xié)調(diào)三個服務,但是總是感覺不是BFF的職責范圍。
服務配置
在各服務中對MassTransit配置,如下在OrderService中對MassTransit需要使用到的RabbitMQ配置,對需要進行多個服務協(xié)作的用例配置Routing Slip,對消息隊列偵聽訂閱需要的事件并配置相應的Activity處理。
services.AddMassTransit(x => {var currentAssembly = Assembly.GetExecutingAssembly();x.AddActivities(currentAssembly);x.AddConsumers(currentAssembly);x.AddRequestClient<createordercommand>();x.UsingRabbitMq((context, cfg) =>{// 配置RabbitMQcfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>{h.Username(Configuration["RabbitmqConfig:Username"]);h.Password(Configuration["RabbitmqConfig:Password"]);});//配置Routing Slipcfg.ReceiveEndpoint("CreateOrderCommand", ep =>{ep.ConfigureConsumer<createorderrequestproxy>(context);ep.ConfigureConsumer<createorderresponseproxy>(context);});// 配置訂閱隊列及Handler處理cfg.ReceiveEndpoint("CreateOrder_execute", ep =>{ep.ExecuteActivityHost<createorderactivity, createordermodel="">(context);});}); }); services.AddMassTransitHostedService();服務編排
構(gòu)建Routing Slip,此處依據(jù)用例的需求,對需要協(xié)作的服務編排,組合一系列的Activity。
Task BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<createordercommand> request) {builder.AddActivity("ReduceStock", new Uri("..."), new {});builder.AddActivity("DeductBalance", new Uri("..."), new {});builder.AddActivity("CreateOrder", new Uri("..."), new { });return Task.CompletedTask; }執(zhí)行請求
當請求進入后,通過RequestClient發(fā)送CreateOrderCommand,同步等待執(zhí)行結(jié)果,再由編排器類負責協(xié)調(diào)預設好的Activity,發(fā)送事件到消息隊列,經(jīng)各Activity訂閱處理最終返回結(jié)果。
[Route("[controller]")] public class OrderController : ControllerBase {private readonly IRequestClient<createordercommand> _createOrderClient;public OrderController(IRequestClient<createordercommand> createOrderClient){_createOrderClient = createOrderClient;}[HttpGet("CreateOrder")]public async Task<commoncommandresponse<createorderresult>> CreateOrder(){var result = await _createOrderClient.GetResponse <commoncommandresponse<createorderresult>>(new CreateOrderCommand(){// ...});return result.Message;} }各服務中對于Activity設置偵聽隊列以及請求信息,調(diào)用Execute執(zhí)行邏輯,當出現(xiàn)異常時返回到MQ通知編排器類,在對之前執(zhí)行的Activity執(zhí)行Compensate。如在CreateOrderActivity中執(zhí)行異常,由編排器類執(zhí)行補償,ReduceStockActivity調(diào)用Compensate,執(zhí)行增加庫存邏輯
public class ReduceStockActivity : IActivity<ReduceStockModel, ReduceStockLog> {public async Task<ExecutionResult> Execute(ExecuteContext<ReduceStockModel> context){var argument = context.Arguments;// 扣減庫存await Task.Delay(100);return context.Completed(new ReduceStockLog() { ProductId = argument.ProductId, Amount = 1 });}public async Task<CompensationResult> Compensate(CompensateContext<ReduceStockLog> context){// 增加庫存await Task.Delay(100);return context.Compensated();} }執(zhí)行成功
用例請求執(zhí)行后,先由Controller發(fā)送請求,再由庫存服務扣減庫存,支付服務扣減余額,最后由訂單服務創(chuàng)建訂單,當創(chuàng)建失敗時,執(zhí)行補償操作,庫存服務增加庫存,支付服務增加余額。
執(zhí)行補償
用例請求執(zhí)行后,先由Controller發(fā)送請求,再由庫存服務扣減庫存,支付服務扣減余額,最后由訂單服務創(chuàng)建訂單,當創(chuàng)建失敗時,執(zhí)行補償操作,庫存服務增加庫存,支付服務增加余額。
在整個事務失敗后,先會返回異常,再由編排器執(zhí)行補償操作,實現(xiàn)最終的數(shù)據(jù)一致性。MassTransit也提供了重試機制以實現(xiàn)向前恢復,避免因數(shù)據(jù)庫連接超時、網(wǎng)絡波動等問題造成的失敗。
Demo
參考
Masstransit中的 Request/Response 與 Courier 功能實現(xiàn)最終一致性 - 丁松松松
理解分布式事務 (juejin.cn)-陳彩華
總結(jié)
以上是生活随笔為你收集整理的AspNetCoreMassTransit Courier实现分布式事务的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ASP.NET Core 中的规约模式(
- 下一篇: 微软 MS Learn 上线 Blazo