[项目更新] 集成RabbitMQ队列与EventBus总线
(Blog.Core框架開發情況,著色部分為本次新增)
終于項目繼續迭代更新了,在開源這兩年多,也是感謝每一個支持Blog.Core項目的同學,同時也感謝每一個在生產環境中使用,并提出意見和建議的小伙伴,2,606個Star,是我們相互之間共同的努力和肯定,上邊的這些都是我和各位使用者提出的需求,剛開始很快,越是到后邊,開發起來越難,這里先說明幾點問題:
1、增加的東西太多,有一部分使用者表示使用不到,太笨重;
2、目前基本比較全面,后期新增需求難度系數較高;
3、功能太多了,不好抄代碼;????
不過該更新的還是需要更新的,我已經很貼心的把各部分的代碼隔開了,就差每個功能建立一個類庫了,這個我也考慮過,不過那要是建立起來,就是二三十個,果斷放棄了。因為代碼已經隔開了,如果自己不需要,可以刪除掉,當然這樣也方便其他不使用我框架的粘貼復制到自己項目。
今年終于在年末的時候,增加上了RabbitMQ消息隊列和EventBus事件總線,之前新增過Redis的消息隊列,基于Redis很方便且很簡單的一個InitQ組件,具體請看《【BCVP】實現基于 Redis 的消息隊列》,然后,大家應該都知道,最近我一直在錄制一個系列視頻教程——《eShopOnContainer微服務系列講解》,里邊最重要的就是事件總線,基于的也正好是RabbitMQ的分布式消息隊列組件,當然其中的訂單微服務也用到了MediatR作為進程內的訂閱發布模式,這個MediatR我在DDD系列中已經講過,就不說了,這次就重點說說RabbitMQ和EventBus吧,也正好屬于倆個系列的串燒了。這里說一下,我是從eshop代碼里拷貝出來做講解的,當然做了適當修改,還是要多關注官方,支持原作者:
https://github.com/dotnet-architecture/eShopOnContainers。
此外,熱烈歡迎支付組件的合作者,如果你正在開發支付相關組件,可以聯系我,一起推廣開發,一起造福社區,也可以入駐BCVP開發者社區。
OK,今天就先簡單的給大家先說下思路,以下每一個小節其實都可以寫一篇或多篇文章的,本文就當個系列文章導讀吧,詳細講解以后會有,主要就是關于RabbitMQ消息隊列和EventBus事件總線的。
01
消息隊列
Message?Queue
Publish/Subscribe
基本概念:
消息隊列(英語:Message queue)是一種進程間通信或同一進程的不同線程間的通信方式,軟件的貯列用來處理一系列的輸入,通常是來自用戶。
消息隊列提供了異步的通信協議,每一個隊列中的記錄包含詳細說明的數據,包含發生的時間,輸入設備的種類,以及特定的輸入參數,也就是說:消息的發送者和接收者不需要同時與消息隊列交互。消息會保存在隊列中,直到接收者取回它。
最終可以實現解耦的目的。
下面通過一個簡單的架構模型來解釋:
Producer:消息生產者,負責產生和發送消息到Broker。
Broker:消息處理中心。負責消息存儲、確認、重試等,一般其中會包含多個Queue。
Consumer:消息消費者,負責從 Broker 中獲取消息,并進行相應處理。
消息隊列的好處:
從上邊的定義中,我們可以看出來,優點主要是三塊:異步、流量削峰與流控、解耦。這三個優點在高并發等三高場景還是很有必要的,甚至說是十分必要的。
系統A將userId寫到消息隊列中,系統C和系統D從消息隊列中拿數據,從而實現了解耦的目的:
(圖片來源于知乎/question/54152397)
接下來,為了提高用戶體驗和吞吐量,其實可以異步地調用系統B、C、D的接口。所以,我們可以弄成是這樣的:
(圖片來源于知乎/question/54152397)
最后,系統B和系統C根據自己的能夠處理的請求數去消息隊列中拿數據,這樣即便有每秒有8000個請求,那只是把請求放在消息隊列中,去拿消息隊列的消息由系統自己去控制,這樣就不會把整個系統給搞崩:
(圖片來源于知乎/question/54152397)
當然消息隊列,也有些壞處,這里就先隨便列幾個,其他的大家自行搜索即可:
1、高可用:如果使用消息隊列,基本要配合集群的,因為如果MQ服務器崩了,那就整個服務災難了。
2、數據安全:必須保證數據不能丟失,也就是要考慮好最終一致性,做好補償機制。
3、合理的消費。
好啦,基本概念先說到這里,下邊就簡單說下代碼吧,因為篇幅的問題,我們只統一講解接口的設計,畢竟實現類是比較復雜的,當然,我會抽一個實現類的核心方法說一下。
02
RabbitMQ持久連接
IRabbitMQ
PersistentConnection
首先說下關于RabbitMQ的連接,這個是很簡單的,和平時我們使用SqlServer/Redis這種第三方組件是類似的,通過連接字符串(或者說是服務器),然后配置用戶名/密碼,就能連接上了,相關的接口是這樣的:
/// <summary>/// RabbitMQ持久連接/// 接口/// </summary>public interface IRabbitMQPersistentConnection: IDisposable{bool?IsConnected?{?get;?}bool?TryConnect();IModel CreateModel();}接口一共提供了三個方法,分別是是否連接、嘗試連接、創建模型。
使用的時候,首先需要連接nuget包:
其實只需要第三個RabbitMQ.Client就行了,前邊兩個是輔助作用,分別是提供序列化和重試機制的,如果你有一個需求是需要重試的,比如連接數據庫或者執行某個進程,如果遇到異常,重試幾次,可以使用組件Polly,它還有其他的功能,自己可以多嘗試下。
那說到了重試,我就說一下,TryConnect();?這個核心的方法:
/// <summary> /// 連接 /// </summary> /// <returns></returns> public bool TryConnect() {_logger.LogInformation("RabbitMQ Client is trying to connect");// 加鎖lock (sync_root){// 重試策略var policy = RetryPolicy.Handle<SocketException>().Or<BrokerUnreachableException>().WaitAndRetry(_retryCount,retryAttempt =>TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>{_logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message);});// 執行策略policy.Execute(() =>{// 開始連接RabbitMQ_connection = _connectionFactory.CreateConnection();});// 連接成功if (IsConnected){// 追加事件處理器,目的是為了異常重試,共3種情況_connection.ConnectionShutdown += OnConnectionShutdown;_connection.CallbackException += OnCallbackException;_connection.ConnectionBlocked += OnConnectionBlocked;_logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", _connection.Endpoint.HostName);return true;}else{_logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");return false;}} }相應的邏輯我已經在代碼中,增加了注釋,過程肯定能看得懂,至于真實的底層原理,這里先不說了。
可以看到上邊就用到了重試機制,可以配置策略。這樣就可以連接上RabbitMQ服務器了,那如何基于這個連接做事件總線呢,別著急,咱們先說下什么是事件和事件處理器。
03
事件與處理器
IntegrationEvent
IIntegrationEventHandler<T>
關于事件
如果你看過我DDD領域驅動設計,應該會有些印象和了解,我這里再簡單的說明一下吧。關于事件,其實我們每天都在用,而且很久之前就用過,就比如說asp的時候的按鈕事件:
其中object sender指代發出事件的對象,這里也就是button對象;EventArgs e事件參數,可以理解為對事件的描述 ,它們可以統稱為事件源。其中的代碼邏輯,就是對事件的處理。我們可以統稱為事件處理程序。
所以:事件有兩部分=事件源對象+事件處理器程序。
關于總線
那我們平時肯定會遇到很多很多的事件:
注冊的時候,校驗成功后持久化到數據庫,然后發注冊成功的郵件。
支付的時候,判斷成功后,修改數據庫訂單,庫存,物流,郵件,短信,等等等等,這都是一個個的事件。
那如何對這些事件進行統一的管理呢,單體下很簡單,就是按照過程走就行了,分布式或者微服務中,多個服務已經隔離開,無法按照過程一步步走,那這個時候就需要一個策略,常用的就是——訂閱發布模式,事件總線是對發布-訂閱模式的一種實現。它是一種集中式事件處理機制,允許不同的組件之間進行彼此通信而又不需要相互依賴,達到一種解耦的目的。
代碼舉例
我們用代碼來簡單看看如何設計事件和事件處理器:
事件是一個對象,是一個模型,那很重要的標識,就是Id和Date這兩個屬性了,當然也可以適當做其他的一些處理,請注意private set; 的寫法。
對事件的處理比較簡單的,我們定義接口,只需要一個Handle方法即可,剩下的就是我們定義一個一個的具體的事件處理器,通過繼承這個接口,來實現具體的業務邏輯。
比如我這里定義了一個例子,關于博客刪除的,當然可能不太貼切,我只是想舉個例子:
/// <summary>/// 博客刪除事件處理器/// 刪除博客后觸發/// </summary>public class BlogDeletedIntegrationEventHandler : IIntegrationEventHandler<BlogDeletedIntegrationEvent>{private readonly IBlogArticleServices _blogArticleServices;private readonly ILogger<BlogDeletedIntegrationEventHandler> _logger;public BlogDeletedIntegrationEventHandler(IBlogArticleServices blogArticleServices,ILogger<BlogDeletedIntegrationEventHandler> logger){_blogArticleServices = blogArticleServices;_logger = logger ?? throw new ArgumentNullException(nameof(logger));}public async Task Handle(BlogDeletedIntegrationEvent @event){_logger.LogInformation("----- Handling integration event: {IntegrationEventId} at {AppName} - ({@IntegrationEvent})", @event.Id, "Blog.Core", @event);ConsoleHelper.WriteSuccessLine($"----- Handling integration event: {@event.Id} at Blog.Core - ({@event})");await _blogArticleServices.DeleteById(@event.BlogId.ToString());}}當我執行刪除的時候,不去執行,而是放到隊列里,通過訂閱發布的模式,每一個訂閱者來消費信息,從而實現解耦的目的。
現在明白了事件和處理器,那如何對這是事件操作,怎么發布,又是如何訂閱呢?事件總線就這么出現了,請往下看。
04
基于RabbitMQ事件總線
IEventBus
EventBusRabbitMQ
上邊我們已經連接好了RabbitMQ服務器,也明白了什么是事件和處理器,現在就是需要發布和訂閱了,總線是一個很好的方案,那設計下接口,就是這樣的:
這里定義了基本的常見操作,如何實現這個接口,可以針對不同的方案,既然我們使用了RabbitMQ,就說說它,當然你也可以使用其他的,比如AzureService之類的。
基于RabbitMQ的事件總線實現類比較復雜,我就不多說明了,感興趣的可以直接看我的代碼,我這里就說一下構造函數,從構造函數中,可以知道,當前類的依賴項,畢竟現在都是使用依賴注入了:
/// <summary>/// RabbitMQ事件總線/// </summary>/// <param name="persistentConnection">RabbitMQ持久連接</param>/// <param name="logger">日志</param>/// <param name="autofac">autofac容器</param>/// <param name="subsManager">事件總線訂閱管理器</param>/// <param name="queueName">隊列名稱</param>/// <param name="retryCount">重試次數</param>public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5){_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));_logger = logger ?? throw new ArgumentNullException(nameof(logger));_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();_queueName = queueName;_consumerChannel = CreateConsumerChannel();_autofac = autofac;_retryCount = retryCount;_subsManager.OnEventRemoved += SubsManager_OnEventRemoved;}除了比較常規的日志、RabbitMQ連接、Autofac容器、Polly重試這幾個比較基礎和必要的,還有一個參數是很重要的——IEventBusSubscriptionsManager subsManager 。
這個是干什么的呢,我們知道,單體應用很簡單,按照過程一一步驟即可,單一的訂閱發布模式也比較簡單,就是一對一,但是還是有很多復雜的,那如何對這些訂閱統一管理呢,就是需要一個事件總線訂閱管理器。
05
事件總線訂閱管理器
InMemory
EventBusSubscriptionsManager
是對每一個訂閱事件需要做管理,比如該發布的事件不想被消費了,比如需要動態的添加一個訂閱者,比如全部清除等,可以這么設計接口:
/// <summary>/// 事件總線訂閱管理器/// 接口/// </summary>public interface IEventBusSubscriptionsManager{bool IsEmpty { get; }event EventHandler<string> OnEventRemoved;void AddDynamicSubscription<TH>(string eventName)where TH : IDynamicIntegrationEventHandler;void AddSubscription<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>;void RemoveSubscription<T, TH>()where TH : IIntegrationEventHandler<T>where T : IntegrationEvent;void RemoveDynamicSubscription<TH>(string eventName)where TH : IDynamicIntegrationEventHandler;bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;bool HasSubscriptionsForEvent(string eventName);Type GetEventTypeByName(string eventName);void Clear();IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);string GetEventKey<T>();}這里應該明白它的作用了吧,就是控制當前項目中的總線中的各個訂閱情況,可以直接在內存里操作,注意這個內存是管理總線中的訂閱的,和RabbitMQ的分布式不一樣,需要搞清楚二者的區別,如果不是很懂,可以聯系我,或者留言。
所以可以設計這么一個實現類InMemoryEventBusSubscriptionsManager:
基本到這里就沒啥問題了,核心的幾個知識點也講完了,當然,僅僅是講完了,其中的知識點量,要遠比這個多的多,剩下的可以看看效果。
06
服務注冊和使用
Service?
registration and usage
上邊的設計完,接下來注冊一下服務就行了,首先就是注冊RabbitMQ:
public static void AddRabbitMQSetup(this IServiceCollection services) {if (services == null) throw new ArgumentNullException(nameof(services));if (Appsettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool()){services.AddSingleton<IRabbitMQPersistentConnection>(sp =>{var logger = sp.GetRequiredService<ILogger<RabbitMQPersistentConnection>>();var factory = new ConnectionFactory(){HostName = Appsettings.app(new string[] { "RabbitMQ", "Connection" }),DispatchConsumersAsync = true};if (!string.IsNullOrEmpty(Appsettings.app(new string[] { "RabbitMQ", "UserName" }))){factory.UserName = Appsettings.app(new string[] { "RabbitMQ", "UserName" });}if (!string.IsNullOrEmpty(Appsettings.app(new string[] { "RabbitMQ", "Password" }))){factory.Password = Appsettings.app(new string[] { "RabbitMQ", "Password" });}var retryCount = 5;if (!string.IsNullOrEmpty(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" }))){retryCount = int.Parse(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" }));}return new RabbitMQPersistentConnection(factory, logger, retryCount);});} }可以在配置文章中配置下參數。
然后注冊事件總線EventBus:
public static void AddEventBusSetup(this IServiceCollection services){if (services == null) throw new ArgumentNullException(nameof(services));if (Appsettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool() && Appsettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool()){var subscriptionClientName = Appsettings.app(new string[] { "EventBus", "SubscriptionClientName" });services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();services.AddTransient<BlogDeletedIntegrationEventHandler>();services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>{var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();var retryCount = 5;if (!string.IsNullOrEmpty(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" }))){retryCount = int.Parse(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" }));}return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);});}}最后,當項目啟動的時候,直接訂閱我們的事件處理程序:
?var?eventBus?=?app.ApplicationServices.GetRequiredService<IEventBus>();eventBus.Subscribe<BlogDeletedIntegrationEvent,?BlogDeletedIntegrationEventHandler>();?我們嘗試一下,發送一個事件到總線里:
動圖效果如下:
是不是很簡單,好啦,暫時就先到這里,打完手工。
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的[项目更新] 集成RabbitMQ队列与EventBus总线的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 尝鲜!.NET5实操之docker+k8
- 下一篇: 网传不要升级.NET5的诸多原因,你赞同