CAP-分布式事务的解决方案
CAP 是一個基于 .NET Standard 的 C# 庫,它是一種處理分布式事務的解決方案,同樣具有 EventBus 的功能,它具有輕量級、易使用、高性能等特點。
https://github.com/dotnetcore/CAP
在我們構建 SOA 或者 微服務系統的過程中,我們通常需要使用事件來對各個服務進行集成,在這過程中簡單的使用消息隊列并不能保證數據的最終一致性, CAP 采用的是和當前數據庫集成的本地消息表的方案來解決在分布式系統互相調用的各個環節可能出現的異常,它能夠保證任何情況下事件消息都是不會丟失的。
你同樣可以把 CAP 當做 EventBus 來使用,CAP提供了一種更加簡單的方式來實現事件消息的發布和訂閱,在訂閱以及發布的過程中,你不需要繼承或實現任何接口。
這是CAP集在ASP.NET Core 微服務架構中的一個示意圖:
架構預覽
CAP 實現了?eShop 電子書?中描述的發件箱模式
Getting Started
NuGet
你可以運行以下下命令在你的項目中安裝 CAP。
PM> Install-Package DotNetCore.CAPCAP 支持 Kafka、RabbitMQ、AzureServiceBus 等消息隊列,你可以按需選擇下面的包進行安裝:
PM> Install-Package DotNetCore.CAP.Kafka PM> Install-Package DotNetCore.CAP.RabbitMQ PM> Install-Package DotNetCore.CAP.AzureServiceBusCAP 提供了 Sql Server, MySql, PostgreSQL,MongoDB 的擴展作為數據庫存儲:
// 按需選擇安裝你正在使用的數據庫 PM> Install-Package DotNetCore.CAP.SqlServer PM> Install-Package DotNetCore.CAP.MySql PM> Install-Package DotNetCore.CAP.PostgreSql PM> Install-Package DotNetCore.CAP.MongoDBConfiguration
首先配置CAP到 Startup.cs 文件中,如下:
public void ConfigureServices(IServiceCollection services) {......services.AddDbContext<AppDbContext>();services.AddCap(x =>{//如果你使用的 EF 進行數據操作,你需要添加如下配置:x.UseEntityFramework<AppDbContext>(); //可選項,你不需要再次配置 x.UseSqlServer 了//如果你使用的ADO.NET,根據數據庫選擇進行配置:x.UseSqlServer("數據庫連接字符串");x.UseMySql("數據庫連接字符串");x.UsePostgreSql("數據庫連接字符串");//如果你使用的 MongoDB,你可以添加如下配置:x.UseMongoDB("ConnectionStrings"); //注意,僅支持MongoDB 4.0+集群//CAP支持 RabbitMQ、Kafka、AzureServiceBus 等作為MQ,根據使用選擇配置:x.UseRabbitMQ("ConnectionStrings");x.UseKafka("ConnectionStrings");x.UseAzureServiceBus("ConnectionStrings");}); }發布
在 Controller 中注入?ICapPublisher?然后使用?ICapPublisher?進行消息發送
public class PublishController : Controller {private readonly ICapPublisher _capBus;public PublishController(ICapPublisher capPublisher){_capBus = capPublisher;}//不使用事務[Route("~/without/transaction")]public IActionResult WithoutTransaction(){_capBus.Publish("xxx.services.show.time", DateTime.Now);return Ok();}//Ado.Net 中使用事務,自動提交[Route("~/adonet/transaction")]public IActionResult AdonetWithTransaction(){using (var connection = new MySqlConnection(ConnectionString)){using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true)){//業務代碼_capBus.Publish("xxx.services.show.time", DateTime.Now);}}return Ok();}//EntityFramework 中使用事務,自動提交[Route("~/ef/transaction")]public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext){using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true)){//業務代碼_capBus.Publish("xxx.services.show.time", DateTime.Now);}return Ok();} }訂閱
Action Method
在 Action 上添加 CapSubscribeAttribute 來訂閱相關消息。
public class PublishController : Controller {[CapSubscribe("xxx.services.show.time")]public void CheckReceivedMessage(DateTime datetime){Console.WriteLine(datetime);} }Service Method
如果你的訂閱方法沒有位于 Controller 中,則你訂閱的類需要繼承?ICapSubscribe:
namespace xxx.Service {public interface ISubscriberService{void CheckReceivedMessage(DateTime datetime);}public class SubscriberService: ISubscriberService, ICapSubscribe{[CapSubscribe("xxx.services.show.time")]public void CheckReceivedMessage(DateTime datetime){}} }然后在 Startup.cs 中的?ConfigureServices()?中注入你的?ISubscriberService?類
public void ConfigureServices(IServiceCollection services) {//注意: 注入的服務需要在 `services.AddCap()` 之前services.AddTransient<ISubscriberService,SubscriberService>();services.AddCap(x=>{}); }訂閱者組
訂閱者組的概念類似于 Kafka 中的消費者組,它和消息隊列中的廣播模式相同,用來處理不同微服務實例之間同時消費相同的消息。
當CAP啟動的時候,她將創建一個默認的消費者組,如果多個相同消費者組的消費者消費同一個Topic消息的時候,只會有一個消費者被執行。相反,如果消費者都位于不同的消費者組,則所有的消費者都會被執行。
相同的實例中,你可以通過下面的方式來指定他們位于不同的消費者組。
[CapSubscribe("xxx.services.show.time", Group = "group1" )] public void ShowTime1(DateTime datetime) { }[CapSubscribe("xxx.services.show.time", Group = "group2")] public void ShowTime2(DateTime datetime) { }ShowTime1?和?ShowTime2?處于不同的組,他們將會被同時調用。
PS,你可以通過下面的方式來指定默認的消費者組名稱:
services.AddCap(x => {x.DefaultGroup = "default-group-name"; });Dashboard
CAP 2.1+ 以上版本中提供了儀表盤(Dashboard)功能,你可以很方便的查看發出和接收到的消息。除此之外,你還可以在儀表盤中實時查看發送或者接收到的消息。
使用一下命令安裝 Dashboard:
PM> Install-Package DotNetCore.CAP.Dashboard在分布式環境中,儀表盤內置集成了 Consul 作為節點的注冊發現,同時實現了網關代理功能,你同樣可以方便的查看本節點或者其他節點的數據,它就像你訪問本地資源一樣。
services.AddCap(x => {//...// 注冊 Dashboardx.UseDashboard();// 注冊節點到 Consulx.UseDiscovery(d =>{d.DiscoveryServerHostName = "localhost";d.DiscoveryServerPort = 8500;d.CurrentNodeHostName = "localhost";d.CurrentNodePort = 5800;d.NodeId = 1;d.NodeName = "CAP No.1 Node";}); });儀表盤默認的訪問地址是:http://localhost:xxx/cap,你可以在d.MatchPath配置項中修改cap路徑后綴為其他的名字。
總結
以上是生活随笔為你收集整理的CAP-分布式事务的解决方案的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 你知道WPF这三大模板实例运用吗?
- 下一篇: .Net5 WPF快速入门系列教程