EventBus/EventQueue 再思考
EventBus/EventQueue 再思考
Intro
之前寫過兩篇文章,造輪子系列的 EventBus/ EventQueue,回想起來覺得當前的想法有點問題,當時對 EvenStore 可能有點誤解,有興趣可以參考?
動手造輪子:實現一個簡單的 EventBus
動手造輪子:實現簡單的 EventQueue,
最近把 Event 相關的邏輯做了一個重構,修改 EventStore,重新設計了 Event 相關的組件
重構后的 Event
Event: 事件的抽象定義
EventHandler:事件處理器抽象定義
EventHandlerFactory:事件處理器工廠,用來根據事件類型獲取事件處理器(新增)
EventPublisher:事件發布器,用于事件發布
EventSubscriber:事件訂閱器,用于管理事件的訂閱
EventSubscriptionManager:事件訂閱管理器,在?EventSubscriber?的基礎上增加了一個根據事件類型獲取事件訂閱器類型的方法
EventBus:事件總線,由 EventPubliser 和 EventSubscriber 組合而成,用來比較方便的做事件發布和訂閱
EventQueue:事件隊列,希望某些消息順序處理的時候可以考慮用 EventQueue 的模式
EventStore:事件存儲,事件的持久化存儲(在之前的版本里,EventStore 實際作用是一個?EventSubscriptionManager,在最近的版本更新中已修改)
以上 EventSubscriber 和 EventSubscriptionManager 一般不直接用,一般用 EventBus 來處理即可
EventHandlerFactory
這次引入了 EventHandlerFactory 用來抽象獲取 EventHandler 的邏輯,原來的設計里是在處理 Event 的時候獲取 EventHandler 的類型,然后從依賴注入框架中獲取或創建新的 event handler 實例之后再調用 EventHandler 的 Handle 方法處理事件,有一些冗余
使用 EventHandlerFactory 之后就可以直接獲取一個 EventHandler 實例集合,具體是實例化還是從依賴注入中獲取就由 EventHandlerFactory 來決定了,這樣就可以對依賴注入很友好,對于基于內存的簡單 EventBus 來說,在服務注冊之后就不需要再調用 Subscribe 去顯式訂閱了,因為再注冊服務的時候就已經隱式實現了訂閱的邏輯,這樣實際就不需要 EventSubscriptionManager 來管理訂閱了,訂閱信息都在依賴注入框架內部,比如說 CounterEvent,要獲取它的訂閱信息,我只需要從依賴注入框架中獲取 IEventHandler<CounterEvent> 的實例即可,實際就代替了原先 “EventStoreInMemory”,現在的 EventSubscriptionManagerInMemory
基于依賴注入的 EventHandlerFactory 定義:
public sealed class DependencyInjectionEventHandlerFactory : IEventHandlerFactory {private readonly IServiceProvider _serviceProvider;public DependencyInjectionEventHandlerFactory(IServiceProvider serviceProvider = null){_serviceProvider = serviceProvider ?? DependencyResolver.Current;}public ICollection<IEventHandler> GetHandlers(Type eventType){var eventHandlerType = typeof(IEventHandler<>).MakeGenericType(eventType);return _serviceProvider.GetServices(eventHandlerType).Cast<IEventHandler>().ToArray();} }如果不使用依賴注入,也可以根據 IEventSubscriptionManager 訂閱信息來實現:
public sealed class DefaultEventHandlerFactory : IEventHandlerFactory {private readonly IEventSubscriptionManager _subscriptionManager;private readonly ConcurrentDictionary<Type, ICollection<IEventHandler>> _eventHandlers = new ConcurrentDictionary<Type, ICollection<IEventHandler>>();private readonly IServiceProvider _serviceProvider;public DefaultEventHandlerFactory(IEventSubscriptionManager subscriptionManager, IServiceProvider serviceProvider = null){_subscriptionManager = subscriptionManager;_serviceProvider = serviceProvider ?? DependencyResolver.Current;}public ICollection<IEventHandler> GetHandlers(Type eventType){var eventHandlers = _eventHandlers.GetOrAdd(eventType, type =>{var handlerTypes = _subscriptionManager.GetEventHandlerTypes(type);var handlers = handlerTypes.Select(t => (IEventHandler)_serviceProvider.GetServiceOrCreateInstance(t)).ToArray();return handlers;});return eventHandlers;} }EventQueue Demo
來看一下 EventQueue 的示例,示例基于 asp.net core 的,定義了一個 HostedService 來實現一個 EventConsumer 來消費 EventQueue 中的事件信息
EventConsumer 定義如下:
public class EventConsumer : BackgroundService {private readonly IEventQueue _eventQueue;private readonly IEventHandlerFactory _eventHandlerFactory;public EventConsumer(IEventQueue eventQueue, IEventHandlerFactory eventHandlerFactory){_eventQueue = eventQueue;_eventHandlerFactory = eventHandlerFactory;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){while (!stoppingToken.IsCancellationRequested){var queues = await _eventQueue.GetQueuesAsync();if (queues.Count > 0){await queues.Select(async q =>{var @event = await _eventQueue.DequeueAsync(q);if (null != @event){var handlers = _eventHandlerFactory.GetHandlers(@event.GetType());if (handlers.Count > 0){await handlers.Select(h => h.Handle(@event)).WhenAll();}}}).WhenAll();}await Task.Delay(1000, stoppingToken);}} }定義 PageViewEvent 和 PageViewEventHandler,用來記錄和處理請求訪問記錄
public class PageViewEvent : EventBase { } public class PageViewEventHandler : EventHandlerBase<PageViewEvent> {public static int Count;public override Task Handle(PageViewEvent @event){Interlocked.Increment(ref Count);return Task.CompletedTask;} }事件很簡單,事件處理也只是增加了 PageViewEventHandler 內定義的 Count。
服務注冊:
// 注冊事件核心組件 // 會注冊 EventBus、EventHandlerFactory、EventQueue 等 services.AddEvents()// 注冊 EventHanlder.AddEventHandler<PageViewEvent, PageViewEventHandler>(); // 注冊 EventQueuePubliser,默認注冊的 IEventPublisher 是 EventBus services.AddSingleton<IEventPublisher, EventQueuePublisher>(); // 注冊 EventConsumer services.AddHostedService<EventConsumer>();事件發布,定義了一個中間件來發布 PageViewEvent,定義如下:
// pageView middleware app.Use((context, next) =>{var eventPublisher = context.RequestServices.GetRequiredService<IEventPublisher>();eventPublisher.Publish(new PageViewEvent());return next();});然后定義一個接口來獲取上面定義的 PageViewEventHandler 中的 Count
[Route("api/[controller]")] public class EventsController : ControllerBase {[HttpGet("pageViewCount")]public IActionResult Count(){return Ok(new { Count = PageViewEventHandler.Count });} }運行起來之后,訪問幾次接口,看上面的接口返回 Count 是否會增加,正常的話每訪問一次接口就會增加 1,并發訪問問題也不大,因為每個事件都是順序處理的,即使并發訪問也沒有關系,事件發布之后,在隊列里都是順序處理的,這也就是引入事件隊列的目的(好像上面的原子遞增沒什么用了...) 如果沒看到了增加,稍等一會兒再訪問試試,事件處理會遲到,但總會處理,畢竟是異步處理的,有些延遲很正常,而且上面我們還有一個 1s 的延遲
More
作者水平有限,如果上述有哪些不對的地方還望指出,萬分感謝
Reference
https://github.com/WeihanLi/WeihanLi.Common/tree/dev/src/WeihanLi.Common/Event
https://github.com/WeihanLi/WeihanLi.Common/blob/dev/samples/AspNetCoreSample/Startup.cs
https://www.cnblogs.com/weihanli/p/implement-a-simple-event-bus.html
https://www.cnblogs.com/weihanli/p/implement-event-queue.html
總結
以上是生活随笔為你收集整理的EventBus/EventQueue 再思考的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: .NET Core + Kubernet
- 下一篇: [推荐]大量 Blazor 学习资源(一