FreeSql接入CAP的实践
CAP
CAP 是一個基于 .NET Standard 的 C# 庫,它是一種處理分布式事務的解決方案,同樣具有 EventBus 的功能,它具有輕量級、易使用、高性能等特點。
https://github.com/dotnetcore/CAP
ADO.NET事務
1.DotNetCore.CAP.MySql中引用 了如下類庫.在Commit事務時,會調用 Flush方法推送消息
<PackageReference?Include="Microsoft.EntityFrameworkCore"?Version="3.1.7"?/> <PackageReference?Include="Microsoft.EntityFrameworkCore.Relational"?Version="3.1.7"?/> <PackageReference?Include="MySqlConnector"?Version="1.0.1"?/>https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs
其中我們能看到,事務的提交,會調用父類CapTransactionBase中的方法Flush。他是protected類型的,并未開放出此接口。
???????protected?virtual?void?Flush(){while?(!_bufferList.IsEmpty){_bufferList.TryDequeue(out?var?message);_dispatcher.EnqueueToPublish(message);}}我們來看一下集成 的demo調用
????[Route("~/adonet/transaction")]public?IActionResult?AdonetWithTransaction(){using?(var?connection?=?new?MySqlConnection(AppDbContext.ConnectionString)){using?(var?transaction?=?connection.BeginTransaction(_capBus,?true)){//your?business?codeconnection.Execute("insert?into?test(name)?values('test')",?transaction:?(IDbTransaction)transaction.DbTransaction);//for?(int?i?=?0;?i?<?5;?i++)//{_capBus.Publish("sample.rabbitmq.mysql",?DateTime.Now);//}}}return?Ok();}https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs
代碼中通過擴展IDbConnection類,增加BeginTransaction方法,傳遞了注入的_capBus類,傳了autoCommit
private?readonly?ICapPublisher?_capBus;public?PublishController(ICapPublisher?capPublisher) {_capBus?=?capPublisher; } ///?<summary> ///?Start?the?CAP?transaction ///?</summary> ///?<param?name="dbConnection">The?<see?cref="IDbConnection"?/>.</param> ///?<param?name="publisher">The?<see?cref="ICapPublisher"?/>.</param> ///?<param?name="autoCommit">Whether?the?transaction?is?automatically?committed?when?the?message?is?published</param> ///?<returns>The?<see?cref="ICapTransaction"?/>?object.</returns> public?static?ICapTransaction?BeginTransaction(this?IDbConnection?dbConnection,ICapPublisher?publisher,?bool?autoCommit?=?false) {if?(dbConnection.State?==?ConnectionState.Closed){dbConnection.Open();}var?dbTransaction?=?dbConnection.BeginTransaction();publisher.Transaction.Value?=?publisher.ServiceProvider.GetService<ICapTransaction>();return?publisher.Transaction.Value.Begin(dbTransaction,?autoCommit); }autoCommit:false,(此屬性會自動提交事務,集成其他ORM,不建議開啟)因為,我們只要調用 了Publish,他會調用MySqlCapTransaction中的Commit(),并執行Flush,即消息 會發出去。IDbContextTransaction
這段代碼是非常 重要的。
????publisher.Transaction.Value?=?publisher.ServiceProvider.GetService<ICapTransaction>();從CapPublisher中可以看出,事務是通過AsyncLocal實現狀態共享的。
internal?class?CapPublisher?:?ICapPublisher {public?AsyncLocal<ICapTransaction>?Transaction?{?get;?} }publisher.Transaction.Value的類型實現上才是ICapTransaction ,
CapTransactionExtensions.cs還有一個擴展方法,調用Begin,相當于給當前控制器上注入的ICapPublisher設置了new MySqlConnection(AppDbContext.ConnectionString).BeginTransaction()的值。
??????public?static?ICapTransaction?Begin(this?ICapTransaction?transaction,IDbTransaction?dbTransaction,?bool?autoCommit?=?false){transaction.DbTransaction?=?dbTransaction;transaction.AutoCommit?=?autoCommit;return?transaction;}對于ADO.NET,我們只要傳遞transaction,就能保證發送消息和操作DB是一個事務了。。
EF Core事務
同樣,我們看擴展方法和使用方式
????public?static?IDbContextTransaction?BeginTransaction(this?DatabaseFacade?database,ICapPublisher?publisher,?bool?autoCommit?=?false){var?trans?=?database.BeginTransaction();publisher.Transaction.Value?=?publisher.ServiceProvider.GetService<ICapTransaction>();var?capTrans?=?publisher.Transaction.Value.Begin(trans,?autoCommit);return?new?CapEFDbTransaction(capTrans);}dbContext.Database就是DatabaseFacade類型。直接能BeginTransaction事務。
[Route("~/ef/transaction")] public?IActionResult?EntityFrameworkWithTransaction([FromServices]AppDbContext?dbContext) {using?(var?trans?=?dbContext.Database.BeginTransaction(_capBus,?autoCommit:?false)){dbContext.Persons.Add(new?Person()?{?Name?=?"ef.transaction"?});for?(int?i?=?0;?i?<?1;?i++){_capBus.Publish("sample.rabbitmq.mysql",?DateTime.Now);}dbContext.SaveChanges();trans.Commit();}return?Ok(); }同樣,還有一個Begin擴展方法,僅僅是給ICapTransaction賦下值。
public?static?ICapTransaction?Begin(this?ICapTransaction?transaction,IDbContextTransaction?dbTransaction,?bool?autoCommit?=?false) {transaction.DbTransaction?=?dbTransaction;transaction.AutoCommit?=?autoCommit;return?transaction; }在這個demo,上,autoCommit是false,因為dbContext有自己的SaveChanges(),如果發送不太合適。SaveChanges()要做好些操作,具體不太情況是什么。具體不詳細研究。
但我們可以看下CapTransactionBase源碼,DbTransaction是Object類型。
EF Core中的事務類型是IDbContextTransaction
ADO.NET實際是IDbTransaction類型。
?public?object?DbTransaction?{?get;?set;?}所以在最開始的那段代碼,判斷DbTransaction,是哪種類型,然后調用自身內部使用的事務進行Commit()。如果要集成其他ORM,但又想去掉EFCore的依賴,然后增加其他ORM,如下類似的處理,就是關鍵,比如CommitAsync,Commit,Roolback()
????public?override?void?Commit(){Debug.Assert(DbTransaction?!=?null);switch?(DbTransaction){case?IDbTransaction?dbTransaction:dbTransaction.Commit();break;case?IDbContextTransaction?dbContextTransaction:dbContextTransaction.Commit();break;}Flush();}還有MySqlDataStorage.cs
https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs
判斷dbTransaction的類型,然后獲取當前事務,引用其他ORM,記得修改此處。
????var?dbTrans?=?dbTransaction?as?IDbTransaction;if?(dbTrans?==?null?&&?dbTransaction?is?IDbContextTransaction?dbContextTrans){dbTrans?=?dbContextTrans.GetDbTransaction();}參考項目(不打算維護)
https://github.com/luoyunchong/DotNetCore.CAP.Provider
維護就要保證與上層Dotnetcore/cap項目保持同步,這是一件困難的事。
還有一個重要的原因是:我們有更簡單的方式。
FreeSql接入CAP(最簡單的方式)
關于此問題的想法
https://github.com/luoyunchong/DotNetCore.CAP.Provider/issues/1
我們還是引用各自官方的庫
Install-Package?DotNetCore.CAP.Dashboard Install-Package?DotNetCore.CAP.MySql Install-Package?DotNetCore.CAP.RabbitMQ Install-Package?FreeSql Install-Package?FreeSql.DbContext Install-Package?FreeSql.Provider.MySqlConnector關于CAP集成的方式,配置項,這里不做詳情,官方地址有中文:http://cap.dotnetcore.xyz/
代碼參考。因為只有一個類,我們自行復制項目即可。
https://github.com/luoyunchong/lin-cms-dotnetcore/blob/master/src/LinCms.Application/CapUnitOfWorkExtensions.cs
重寫擴展方法,BeginTransaction。是基于IUnitOfWork的擴展。
提交事務調用Commit(IUnitOfWork)時,內部再通過反射調用 ICapTransaction中protected類型的方法Flush。
public static class CapUnitOfWorkExtensions{public static void Flush(this ICapTransaction capTransaction){capTransaction?.GetType().GetMethod("Flush", BindingFlags.Instance | BindingFlags.NonPublic)?.Invoke(capTransaction, null);}public static ICapTransaction BeginTransaction(this IUnitOfWork unitOfWork, ICapPublisher publisher, bool autoCommit = false){publisher.Transaction.Value = (ICapTransaction)publisher.ServiceProvider.GetService(typeof(ICapTransaction));return publisher.Transaction.Value.Begin(unitOfWork.GetOrBeginTransaction(), autoCommit);}public static void Commit(this ICapTransaction capTransaction, IUnitOfWork unitOfWork){unitOfWork.Commit();capTransaction.Flush();}}注入我們的FreeSql
public?void?ConfigureServices(IServiceCollection?services){IConfigurationSection?configurationSection?=?Configuration.GetSection($"ConnectionStrings:MySql");IFreeSql?fsql?=?new?FreeSqlBuilder().UseConnectionString(DataType.MySql,?configurationSection.Value);.UseNameConvert(NameConvertType.PascalCaseToUnderscoreWithLower).UseAutoSyncStructure(true).UseNoneCommandParameter(true).UseMonitorCommand(cmd?=>{Trace.WriteLine(cmd.CommandText?+?";");}).Build();services.AddSingleton(fsql);services.AddFreeRepository();services.AddScoped<UnitOfWorkManager>(); }示例
????[HttpGet("~/freesql/unitofwork/{id}")]public?DateTime?UnitOfWorkManagerTransaction(int?id,?[FromServices]?IBaseRepository<Book>?repo){DateTime?now?=?DateTime.Now;using?(IUnitOfWork?uow?=?_unitOfWorkManager.Begin()){ICapTransaction?trans?=?_unitOfWorkManager.Current.BeginTransaction(_capBus,?false);repo.Insert(new?Book(){Author?=?"luoyunchong",Summary?=?"2",Title?=?"122"});_capBus.Publish("freesql.time",?now);trans.Commit(uow);}return?now;}[NonAction][CapSubscribe("freesql.time")]public?void?GetTime(DateTime?time){Console.WriteLine($"time:{time}");}注意trans不需要using,freesql內部會釋放資源。,也可using,但請更新到最新的freesql版本。
ICapTransaction?trans?=?_unitOfWorkManager.Current.BeginTransaction(_capBus,?false);提交事務,也請調用擴展方法,否則事務無法正常。
trans.Commit(uow);源碼位置
https://github.com/luoyunchong/lin-cms-dotnetcore/blob/master/src/LinCms.Web/Controllers/v1/TestController.cs
總結
以上是生活随笔為你收集整理的FreeSql接入CAP的实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用 .NET 5 体验大数据和机器学习
- 下一篇: 购票啦 | 2020中国.NET开发者峰