eShopOnContainers 知多少[5]:EventBus With RabbitMQ
1. 引言
事件總線這個概念對你來說可能很陌生,但提到觀察者(發布-訂閱)模式,你也許就很熟悉。事件總線是對發布-訂閱模式的一種實現。它是一種集中式事件處理機制,允許不同的組件之間進行彼此通信而又不需要相互依賴,達到一種解耦的目的。從上圖可知,核心就4個角色:
事件(事件源+事件處理)
事件發布者
事件訂閱者
事件總線
實現事件總線的關鍵是:
事件總線維護一個事件源與事件處理的映射字典;
通過單例模式,確保事件總線的唯一入口;
利用反射完成事件源與事件處理的初始化綁定;
提供統一的事件注冊、取消注冊和觸發接口。
以上源于我在事件總線知多少(1)中對于EventBus的分析和簡單總結。基于以上的簡單認知,我們來梳理下eShopOnContainers中EventBus的實現機制。
2. 高屋建瓴--看類圖
我們直接以上帝視角,來看下其實現機制,上類圖。
我們知道事件的本質是:事件源+事件處理。 針對事件源,其定義了 Handle方法用于響應事件。不同之處在于方法參數的類型: 第一個接受的是一個強類型的 dynamic。 為什么要單獨提供一個事件源為 dynamic可以簡化事件源的構建,更趨于靈活。
有了事件源和事件處理,接下來就是事件的注冊和訂閱了。為了方便進行訂閱管理,系統提供了額外的一層抽象 InMemoryEventBusSubscriptionsManager就是使用內存進行存儲事件源和事件處理的映射字典。 從類圖中看 SubscriptionInfo,其主要用于表示事件訂閱方的訂閱類型和事件處理的類型。
我們來近距離看下
//InMemoryEventBusSubscriptionsManager.cs
//定義的事件名稱和事件訂閱的字典映射(1:N)
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
//保存所有的事件處理類型
private readonly List<Type> _eventTypes;
//定義事件移除后事件
public event EventHandler<string> OnEventRemoved;
//構造函數初始化
public InMemoryEventBusSubscriptionsManager()
{
? ?_handlers = new Dictionary<string, List<SubscriptionInfo>>();
? ?_eventTypes = new List<Type>();
}
//添加動態類型事件訂閱(需要手動指定事件名稱)
public void AddDynamicSubscription<TH>(string eventName)
? ?where TH : IDynamicIntegrationEventHandler
{
? ?DoAddSubscription(typeof(TH), eventName, isDynamic: true);
}
//添加強類型事件訂閱(事件名稱為事件源類型)
public void AddSubscription<T, TH>()
? ?where T : IntegrationEvent
? ?where TH : IIntegrationEventHandler<T>
{
? ?var eventName = GetEventKey<T>();
? ?DoAddSubscription(typeof(TH), eventName, isDynamic: false);
? ?if (!_eventTypes.Contains(typeof(T)))
? ?{
? ? ? ?_eventTypes.Add(typeof(T));
? ?}
}
//移除動態類型事件訂閱
public void RemoveDynamicSubscription<TH>(string eventName)
? ?where TH : IDynamicIntegrationEventHandler
{
? ?var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName);
? ?DoRemoveHandler(eventName, handlerToRemove);
}
//移除強類型事件訂閱
public void RemoveSubscription<T, TH>()
? ?where TH : IIntegrationEventHandler<T>
? ?where T : IntegrationEvent
{
? ?var handlerToRemove = FindSubscriptionToRemove<T, TH>();
? ?var eventName = GetEventKey<T>();
? ?DoRemoveHandler(eventName, handlerToRemove);
}
添加了這么一層抽象,即符合了單一職責原則,又完成了代碼重用。 IEventBusSubscriptionsManager的依賴,即可完成訂閱管理。 你這里可能會好奇,為什么要暴露一個 EventBusRabbitMQ源碼親密接觸。
3.3.1. 構造函數定義
IRabbitMQPersistentConnection以便連接到對應的Broke。
使用空對象模式注入?OnEventRemoved事件,取消隊列的綁定。(這也就回答了上面遺留的問題)
3.3.2. 事件訂閱的邏輯:
public void Publish(IntegrationEvent @event)
{
? ?if (!_persistentConnection.IsConnected)
? ?{
? ? ? ?_persistentConnection.TryConnect();
? ?}
? ?var policy = RetryPolicy.Handle<BrokerUnreachableException>()
? ? ? ?.Or<SocketException>()
? ? ? ?.WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
? ? ? ?{
? ? ? ? ? ?_logger.LogWarning(ex.ToString());
? ? ? ?});
? ?using (var channel = _persistentConnection.CreateModel())
? ?{
? ? ? ?var eventName = @event.GetType()
? ? ? ? ? ?.Name;
? ? ? ?channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
? ? ? ?var message = JsonConvert.SerializeObject(@event);
? ? ? ?var body = Encoding.UTF8.GetBytes(message);
? ? ? ?policy.Execute(() =>
? ? ? ?{
? ? ? ? ? ?var properties = channel.CreateBasicProperties();
? ? ? ? ? ?properties.DeliveryMode = 2; // persistent
? ? ? ? ? ?channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, mandatory:true, basicProperties: properties, body: body);
? ? ? ?});
? ?}
}
這里面有以下幾個知識點:
使用Polly,以2的階乘的時間間隔進行重試。(第一次2s后,第二次4s后,第三次8s后...重試)
使用direct全匹配、單播形式的路由機制進行消息分發
消息主體是格式化的json字符串
指定?mandatory:true告知服務器當根據指定的routingKey和消息找不到對應的隊列時,直接返回消息給生產者。
3.3.4. 然后看看事件消息的監聽
Received事件委托處理消息接收事件調用?
以上代碼主要包括以下知識點:
4. EventBus的集成和使用
以上介紹了EventBus的實現要點,那各個微服務是如何集成呢?
1. 注冊
2. 注冊單例模式的 services.AddSingleton<IEventBusSubscriptionsManager,InMemoryEventBusSubscriptionsManager>();
3. 注冊單例模式的
完成了以上集成,就可以在代碼中使用事件總線進行事件的發布和訂閱。
4. 發布事件
若要發布事件,需要根據是否需要事件源(參數傳遞)來決定是否需要申明相應的集成事件,需要則繼承自 IEventBus的實例的
IIntegrationEventHandler或 IEventBus的實例調用
TestEvent事件,B服務訂閱該事件,同樣需要在B服務復制定義一個 <code class="prettyprint code-in-text prettyprinted" style="box-sizing: border-box;background: rgb(243, 241, 241);color: rgb(88, 88, 88);line-height: 18px;font-family: consolas, menlo, courier, monospace, " initial="" microsoft="" !important;"="" 0px="">TestEvent5. 最后
通過一步一步的源碼梳理,我們發現eShopOnContainers中事件總線的總體實現思路與引言部分的介紹十分契合。所以對于事件總線,不要覺得高深,明確參與的幾個角色以及基本的實現步驟,那么不管是基于RabbitMQ實現也好還是基于Azure Service Bus也好,萬變不離其宗!
//定義事件處理
public class ProductPriceChangedIntegrationEventHandler : IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
{
? ?public async Task Handle(ProductPriceChangedIntegrationEvent @event)
? ?{
? ? ? ?//do something
? ?}
}
//事件源的聲明
public class ProductPriceChangedIntegrationEvent : IntegrationEvent
{ ? ? ? ?
? ?public int ProductId { get; private set; }
? ?public decimal NewPrice { get; private set; }
? ?public decimal OldPrice { get; private set; }
? ?public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice, decimal oldPrice)
? ?{
? ? ? ?ProductId = productId;
? ? ? ?NewPrice = newPrice;
? ? ? ?OldPrice = oldPrice;
? ?}
}
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
? ?var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
? ?var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
? ?var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
? ?var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
? ?var retryCount = 5;
? ?if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
? ?{
? ? ? ?retryCount = int.Parse(Configuration["EventBusRetryCount"]);
? ?}
? ?return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
});
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
? ?var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
? ?//...
? ?return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
Json字符串的反序列化
利用依賴注入容器解析集成事件(Integration Event)和事件處理(Event Handler)類型
反射調用具體的事件處理方法
private async Task ProcessEvent(string eventName, string message)
{
? ?if (_subsManager.HasSubscriptionsForEvent(eventName))
? ?{
? ? ? ?using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
? ? ? ?{
? ? ? ? ? ?var subscriptions = _subsManager.GetHandlersForEvent(eventName);
? ? ? ? ? ?foreach (var subscription in subscriptions)
? ? ? ? ? ?{
? ? ? ? ? ? ? ?if (subscription.IsDynamic)
? ? ? ? ? ? ? ?{
? ? ? ? ? ? ? ? ? ?var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
? ? ? ? ? ? ? ? ? ?dynamic eventData = JObject.Parse(message);
? ? ? ? ? ? ? ? ? ?await handler.Handle(eventData);
? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?else
? ? ? ? ? ? ? ?{
? ? ? ? ? ? ? ? ? ?var eventType = _subsManager.GetEventTypeByName(eventName);
? ? ? ? ? ? ? ? ? ?var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
? ? ? ? ? ? ? ? ? ?var handler = scope.ResolveOptional(subscription.HandlerType);
? ? ? ? ? ? ? ? ? ?var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
? ? ? ? ? ? ? ? ? ?await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ?}
? ?}
}
總結
以上是生活随笔為你收集整理的eShopOnContainers 知多少[5]:EventBus With RabbitMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 开源库Magicodes.Storage
- 下一篇: eShopOnContainers 知多