事件驱动之异步事件
前言
上一篇講了事件,以及為什么要使用事件,主要是為了解耦,但是有同學(xué)就問(wèn)了,同步如果訂閱事件的人太多,比如13億人都關(guān)心上頭條的事,那么RaiseEvent得等13億人都處理完,那得多久呀,從此再也不敢發(fā)事件了。 舉個(gè)例子,你在網(wǎng)上下單,下完單要通知庫(kù)房,甚至要通知供應(yīng)商補(bǔ)貨,如果都是同步的話(huà),消費(fèi)者還不等急死呀,實(shí)際上你在電商網(wǎng)站上下個(gè)單, 一般你很快就能到訂單頁(yè)面,那個(gè)頁(yè)面告訴你:“兄弟,訂單已經(jīng)創(chuàng)建成功,訂單號(hào)是xxxxx-xxxxx-xxxx-xxxx,你的訂單已經(jīng)提交到庫(kù)房” 等。然后你就很快了的下另一單了。好吧, 提問(wèn)的同學(xué),說(shuō)好的妹子呢?
實(shí)現(xiàn)思路
發(fā)出事件
public void Head() { var NewsPaper = new NewsPaper("南都娛樂(lè)"); NewsPaper.WriteToHeader("汪峰"); RaiseEvent(new HeadedEvent {Name = "汪峰"}); } private void RaiseEvent(HeadedEvent headedEvent) { EventBus.Publish<HeadedEvent>(new HeadedEvent { Name = "汪峰" }); }所以我們只需在代碼里RaiseEvent就可以了。
訂閱事件
其實(shí)很簡(jiǎn)單,因?yàn)槲覀円獙?shí)現(xiàn)的是同步的事件,我們只需要找到所有處理這個(gè)事件的實(shí)現(xiàn)類(lèi),然后調(diào)用所有就可以了。
public interface IEventHandler<TEvent> where TEvent : Event {void Handle(TEvent e); } public class HeadedEvent:Event { public string Name { get; set; } } public class GuoJiZhangMotherEventHandler : IEventHandler<HeadedEvent> { public void Handle(HeadedEvent e) { Console.WriteLine(e.Name+", Are you kidding me?"); } } public class PiMingEventHandler:IEventHandler<HeadedEvent> { public void Handle(HeadedEvent e) { Console.WriteLine(e.Name+", Guo Ji Zhang is your last wife?"); } }我們可以看到正真的事件協(xié)調(diào)者是EventBus, 之前的代碼如下是同步的。
public class EventBus {public static void Publish<T>(T concreteEvent) where T: Event { var handlers = _container.ResolveAll<IEventHandler<T>>(); foreach (var handle in handlers) { handle.Handle(concreteEvent); } } }為了提高性能,我們可以先來(lái)第一步改進(jìn)
publicvoid Publish<T>(T @event) where T : Event{var handlers = _eventHandlerFactory.GetHandlers<T>();handlers.AsParallel().ForAll((h)=> h.Handle(@event)); }我們可以看到,現(xiàn)在并行處理可以大大加快速度,但是有兩個(gè)問(wèn)題,第一個(gè)問(wèn)題就是沒(méi)有處理異常,所以讓我們加上異常。
public void Publish<T>(T @event) where T : Event{var handlers = _eventHandlerFactory.GetHandlers<T>(); handlers.AsParallel().ForAll((h)=> HandleEvent<T>(h,@event)); } private void HandleEvent<T>(IEventHandler<T> handle, T @event) where T : Event { try { handle.Handle(@event); } catch (Exception e) { // Log the exception, as the caller don't care this } } }第二個(gè)問(wèn)題,就是我們雖然用了并行加快了速度,但是還沒(méi)有正真實(shí)現(xiàn)異步,整個(gè)程序還是等所有Handler處理完才返回。
publicvoid Publish<T>(T @event) where T : Event{var handlers = _eventHandlerFactory.GetHandlers<T>();handlers.Select(h => Task.Factory.StartNew(() => HandleEvent<T>(h, @event))); }這段代碼執(zhí)行完,盡然發(fā)現(xiàn)Handler沒(méi)有執(zhí)行,好吧,原因是IQueryable的延遲執(zhí)行,所以我們需要調(diào)用一下ToList
publicvoid Publish<T>(T @event) where T : Event{var handlers = _eventHandlerFactory.GetHandlers<T>();handlers.Select(h => Task.Factory.StartNew(() => HandleEvent<T>(h, @event))).ToArray(); }好了,我們就這樣輕易的實(shí)現(xiàn)了一個(gè)AsyncEventBus, 是不是感謝.Net的強(qiáng)大?
總結(jié)
這里還只是一個(gè)系統(tǒng)內(nèi)部的Async, 如果涉及到系統(tǒng)之間的交互,這個(gè)就不行了,而且如果異步處理有錯(cuò)誤,我們就會(huì)有信息丟失,所以需要更健壯的異步事件處理系統(tǒng),這個(gè)后面再講,但是一般的系統(tǒng)我們只需要把出錯(cuò)的時(shí)間記錄下來(lái),然后再看要不要處理就可以。
另外,異步要處理的東西很多,比如處理完畢后,如何通知用戶(hù),還是讓用戶(hù)刷新? 我個(gè)人建議,一般情況下都不要用異步,只有在真的需要的時(shí)候再用。
- 作者: 王德水
- 出處:http://deshui.wang
總結(jié)
- 上一篇: 布衣之怒血溅五步(布衣之怒)
- 下一篇: CQRS