ASP.NET Core Web API下事件驱动型架构的实现(四):CQRS架构中聚合与聚合根的实现
在前面兩篇文章中,我詳細(xì)介紹了基本事件系統(tǒng)的實(shí)現(xiàn),包括事件派發(fā)和訂閱、通過事件處理器執(zhí)行上下文來解決對(duì)象生命周期問題,以及一個(gè)基于RabbitMQ的事件總線的實(shí)現(xiàn)。接下來對(duì)于事件驅(qū)動(dòng)型架構(gòu)的討論,就需要結(jié)合一個(gè)實(shí)際的架構(gòu)案例來進(jìn)行分析。在領(lǐng)域驅(qū)動(dòng)設(shè)計(jì)的討論范疇,CQRS架構(gòu)本身就是事件驅(qū)動(dòng)的,因此,我打算首先介紹一下CQRS架構(gòu)下相關(guān)部分的實(shí)現(xiàn),然后再繼續(xù)討論事件驅(qū)動(dòng)型架構(gòu)實(shí)現(xiàn)的具體問題。
當(dāng)然,CQRS架構(gòu)本身的實(shí)現(xiàn)也是根據(jù)實(shí)際情況的不同,需要具體問題具體分析的,不僅如此,CQRS架構(gòu)的實(shí)現(xiàn)也是非常復(fù)雜的,絕不是一套文章一套案例能夠解釋清楚并涵蓋全部的。所以,我不會(huì)把大部分篇幅放在CQRS架構(gòu)實(shí)現(xiàn)的細(xì)節(jié)上,而是會(huì)著重介紹與我們的主題相關(guān)的內(nèi)容,并對(duì)無關(guān)的內(nèi)容進(jìn)行弱化。或許,在這個(gè)系列文章結(jié)束的時(shí)候,我們會(huì)得到一個(gè)完整的、能夠運(yùn)行的CQRS架構(gòu)系統(tǒng),不過,這套系統(tǒng)極有可能僅供技術(shù)研討和學(xué)習(xí)使用,無法直接用于生產(chǎn)環(huán)境。
基于這樣的前提,我們今天首先看一下CQRS架構(gòu)中聚合與聚合根的實(shí)現(xiàn),或許你會(huì)覺得目前討論的內(nèi)容與你本打算關(guān)心的事件驅(qū)動(dòng)架構(gòu)沒什么關(guān)系,而事實(shí)是,CQRS架構(gòu)中聚合與聚合根的實(shí)現(xiàn)是完全面向事件驅(qū)動(dòng)的,而這部分內(nèi)容也會(huì)為我們之后的討論做下鋪墊。不僅如此,我還會(huì)在本文討論一些基于.NET/C#的軟件架構(gòu)設(shè)計(jì)的思考與實(shí)踐(請(qǐng)注意文章中我添加了Note字樣并且字體加粗的句子),因此,我還是會(huì)推薦你繼續(xù)讀完這篇文章。
CQRS架構(gòu)知識(shí)回顧
早在2010年,我針對(duì)CQRS架構(gòu)總結(jié)過一篇文章,題目是:《EntityFramework之領(lǐng)域驅(qū)動(dòng)設(shè)計(jì)實(shí)踐【擴(kuò)展閱讀】:CQRS體系結(jié)構(gòu)模式》,當(dāng)然,這篇文章跟Entity Framework本沒啥關(guān)系,只是延續(xù)了領(lǐng)域驅(qū)動(dòng)設(shè)計(jì)這一話題進(jìn)行的擴(kuò)展討論罷了。這篇文章介紹了CQRS架構(gòu)模式所產(chǎn)生的背景、結(jié)構(gòu),以及相關(guān)的一些概念,比如:最近非常流行的詞語:“事件溯源”、解決事件溯源性能問題的“快照”、用于存取事件數(shù)據(jù)的“事件存儲(chǔ)(Event Store)”,還有重新認(rèn)識(shí)了什么叫做“對(duì)象的狀態(tài)”,等等。此外,在后續(xù)的博文中,我也經(jīng)常對(duì)CQRS架構(gòu)中的實(shí)現(xiàn)細(xì)節(jié)做些探討,有興趣的讀者可以翻看我過去的博客文章。總體上講,CQRS架構(gòu)基本符合下圖所描述的結(jié)構(gòu):
看上去是不是特別復(fù)雜?沒錯(cuò),特別復(fù)雜,而且每個(gè)部分都可以使用不同的工具、框架,以不同的形式進(jìn)行實(shí)現(xiàn)。整個(gè)架構(gòu)甚至可以是語言、平臺(tái)異構(gòu)的,還可以跟外部系統(tǒng)進(jìn)行整合,實(shí)現(xiàn)大數(shù)據(jù)分析、呈現(xiàn)等等,玩法可謂之五花八門,這些統(tǒng)統(tǒng)都不在我們的討論范圍之內(nèi)。我們今天打算討論的,就是上圖右上部分“領(lǐng)域模型”框框里的主題:CQRS架構(gòu)中的聚合與聚合根。
說到聚合與聚合根,了解過領(lǐng)域驅(qū)動(dòng)設(shè)計(jì)(DDD)的讀者肯定對(duì)這兩個(gè)概念非常熟悉。通常情況下,具有相同生命周期,組合起來能夠共同表述一種領(lǐng)域概念的一組模型對(duì)象,就可以組成一個(gè)聚合。在每個(gè)聚合中,銜接各個(gè)領(lǐng)域模型對(duì)象,并向外提供統(tǒng)一訪問聚合的對(duì)象,就是聚合根。聚合中的所有對(duì)象,離開聚合根,就不能完整地表述一個(gè)領(lǐng)域概念。比如:收貨地址無法離開客戶,訂單詳情無法離開訂單,庫存無法離開貨品等等。所以從定義上來看,一個(gè)聚合大概就是這樣:
聚合中的對(duì)象可以是實(shí)體,也可以是值對(duì)象
聚合中所有對(duì)象具有相同的生命周期
外界通過聚合根訪問整個(gè)聚合,聚合根通過導(dǎo)航屬性(Navigation Properties)進(jìn)而訪問聚合中的其它實(shí)體和值對(duì)象
通過以上兩點(diǎn),可以得出:工廠和倉儲(chǔ)必須針對(duì)聚合根進(jìn)行操作
聚合根是一個(gè)實(shí)體
聚合中的對(duì)象是有狀態(tài)的,通常會(huì)通過C#的屬性(Properties)將狀態(tài)曝露給外界
好吧,對(duì)這些概念比較熟悉的讀者來說,我在此算是多啰嗦了幾句。接下來,讓我們結(jié)合CQRS架構(gòu)中命令處理器對(duì)領(lǐng)域模型的更改過程來看看,除了以上這些常規(guī)特征之外,聚合與聚合根還有哪些特殊之處。當(dāng)命令處理器接到操作命令時(shí),便開始對(duì)領(lǐng)域模型進(jìn)行更改,步驟如下:
首先,命令處理器通過倉儲(chǔ),獲取具有指定ID值的聚合(聚合的ID值就是聚合根的ID值)
然后,倉儲(chǔ)訪問事件存儲(chǔ)數(shù)據(jù)庫,根據(jù)需要獲取的聚合根的類型,以及ID值,獲取所有關(guān)聯(lián)的領(lǐng)域事件
其次,倉儲(chǔ)構(gòu)造聚合對(duì)象實(shí)例,并依照一定的順序,逐一將領(lǐng)域事件重新應(yīng)用在新構(gòu)建的聚合上
每當(dāng)有一個(gè)領(lǐng)域事件被應(yīng)用在聚合上時(shí),聚合本身的內(nèi)聯(lián)事件處理器會(huì)捕獲這個(gè)領(lǐng)域事件,并根據(jù)領(lǐng)域事件中的數(shù)據(jù),設(shè)置聚合中對(duì)象的狀態(tài)
當(dāng)所有的領(lǐng)域事件全部應(yīng)用在聚合上時(shí),聚合的狀態(tài)就是曾經(jīng)被保存時(shí)的狀態(tài)
然后,倉儲(chǔ)將已經(jīng)恢復(fù)了狀態(tài)的聚合返回給命令處理器,命令處理器調(diào)用聚合上的方法,對(duì)聚合進(jìn)行更改
在調(diào)用方法的時(shí)候,方法本身會(huì)產(chǎn)生一個(gè)領(lǐng)域事件,這個(gè)領(lǐng)域事件會(huì)立刻被聚合本身的內(nèi)聯(lián)事件處理器捕獲,并進(jìn)行處理。在處理的過程中,會(huì)更新聚合中對(duì)象的狀態(tài),同時(shí),這個(gè)領(lǐng)域事件還會(huì)被緩存在聚合中
命令處理器在完成對(duì)聚合的更改之后,便會(huì)調(diào)用倉儲(chǔ),將更改后的模型保存下來
接著,倉儲(chǔ)從聚合中獲得所有緩存的未曾保存的領(lǐng)域事件,并將所有這些領(lǐng)域事件逐個(gè)保存到事件存儲(chǔ)數(shù)據(jù)庫。在成功完成保存之后,會(huì)清空聚合中的事件緩存
最后,倉儲(chǔ)將所有的這些領(lǐng)域事件逐個(gè)地派發(fā)到事件消息總線
接下來在事件消息總線和事件處理器中將會(huì)發(fā)生的事情,我們今后還會(huì)討論,這里就不多說了。從這個(gè)過程,我們不難得出:
CQRS的聚合中,更改對(duì)象狀態(tài)必須通過領(lǐng)域事件,也就是說,不能向外界曝露直接訪問對(duì)象狀態(tài)的接口,更通俗地說,表示對(duì)象狀態(tài)的屬性(Property)不能有設(shè)置器(Setter)
CQRS聚合的聚合根中,會(huì)有一套內(nèi)聯(lián)的領(lǐng)域事件處理機(jī)制,用來捕獲并處理聚合中產(chǎn)生的領(lǐng)域事件
CQRS聚合的聚合根會(huì)有一個(gè)保存未提交領(lǐng)域事件的本地緩存,對(duì)該緩存的訪問應(yīng)該是線程安全的
CQRS的聚合需要能夠向倉儲(chǔ)提供必要的接口,比如清除事件緩存的方法等
此外,CQRS聚合是有版本號(hào)的,版本號(hào)通常是一個(gè)64位整型,表述歷史上發(fā)生在聚合上的領(lǐng)域事件一共有多少個(gè)。當(dāng)然,這個(gè)值在我們目前的討論中并非能夠真正用得上,但是,在倉儲(chǔ)重建聚合需要依賴快照時(shí),這個(gè)版本號(hào)就非常重要了。我會(huì)在后續(xù)文章中介紹
聽起來是不是非常復(fù)雜?確實(shí)如此。那我們就先從領(lǐng)域事件入手,逐步實(shí)現(xiàn)CQRS中的聚合與聚合根。
領(lǐng)域事件
領(lǐng)域事件,顧名思義,就是從領(lǐng)域模型中產(chǎn)生的事件消息。概念上很簡(jiǎn)單,比如,客戶登錄網(wǎng)站,就會(huì)由客戶登錄實(shí)體產(chǎn)生一個(gè)事件派發(fā)出去,例如CustomerLoggedOnEvent,表示客戶登錄這件事已經(jīng)發(fā)生了。雖然在DDD的實(shí)踐中,領(lǐng)域事件更多地在CQRS架構(gòu)中被討論,其實(shí)即便是非事件驅(qū)動(dòng)型架構(gòu),也可以通過領(lǐng)域模型來發(fā)布消息,達(dá)到系統(tǒng)解耦的目的。
延續(xù)之前的設(shè)計(jì),我們的領(lǐng)域事件繼承了IEvent接口,并增加了三個(gè)屬性/方法,此外,為了編程方便,我們實(shí)現(xiàn)了領(lǐng)域事件的抽象類,UML類圖如下:
圖中的綠色部分就是在之前我們的事件模型上新加的接口和類,用以表述領(lǐng)域事件的概念。其中:
aggregateRootId:發(fā)生該領(lǐng)域事件的聚合的聚合根的ID值
aggregateRootType:發(fā)生該領(lǐng)域事件的聚合的聚合根的類型
sequence:該領(lǐng)域事件的序列號(hào)
好了,如果說我們將發(fā)生在某聚合上的領(lǐng)域事件保存到關(guān)系型數(shù)據(jù)庫,那么,當(dāng)需要獲得該聚合的所有領(lǐng)域事件時(shí),只需要下面一句SQL就行了:
| 1 | SELECT?* FROM?[Events] WHERE?[AggregateRootId]=aggregateRootId AND?[AggregateRootType]=aggregateRootType ORDER?BY?[Sequence] ASC |
這就是最簡(jiǎn)單的事件存儲(chǔ)數(shù)據(jù)庫的實(shí)現(xiàn)了。不過,我們暫時(shí)不介紹這些內(nèi)容。
事實(shí)上,與標(biāo)準(zhǔn)的事件(IEvent接口)相比,除了上面三個(gè)主要的屬性之外,領(lǐng)域事件還可以包含更多的屬性和方法,這就要看具體的需求和設(shè)計(jì)了。不過目前為止,我們定義這三個(gè)屬性已經(jīng)夠用了,不要把問題搞得太復(fù)雜。
有了領(lǐng)域事件的基本模型,我們開始設(shè)計(jì)CQRS下的聚合。
聚合的設(shè)計(jì)與實(shí)現(xiàn)
由于外界訪問聚合都是通過聚合根來實(shí)現(xiàn)的,因此,針對(duì)聚合的操作都會(huì)被委托給聚合根來處理。比如,當(dāng)用戶地址發(fā)生變化時(shí),服務(wù)層會(huì)調(diào)用Customer.ChangeAddress方法,這個(gè)方法就會(huì)產(chǎn)生一個(gè)領(lǐng)域事件,并通過內(nèi)聯(lián)的事件處理機(jī)制更改聚合中Address值對(duì)象中的狀態(tài)。于是,從技術(shù)角度,聚合的設(shè)計(jì)也就是聚合根的實(shí)現(xiàn)。
接口與類之間的關(guān)系
首先需要設(shè)計(jì)的是與聚合相關(guān)的概念所表述的接口、類及其之間的關(guān)系。結(jié)合領(lǐng)域驅(qū)動(dòng)設(shè)計(jì)中的概念,我們得到下面的設(shè)計(jì):
其中,實(shí)體(IEntity)、聚合根(IAggregateRoot)都是大家耳熟能詳?shù)念I(lǐng)域驅(qū)動(dòng)設(shè)計(jì)的概念。由于實(shí)體都是通過Id進(jìn)行唯一標(biāo)識(shí),所以,IEntity會(huì)有一個(gè)id的屬性,為了簡(jiǎn)單起見,我們使用Guid作為它的類型。聚合根(IAggregateRoot)繼承于IEntity接口,有趣的是,在我們目前的場(chǎng)景中,IAggregateRoot并不包含任何成員,它僅僅是一個(gè)空接口,在整個(gè)框架代碼中,它僅作為泛型的類型約束。Note:這種做法其實(shí)也是非常常見的一種框架設(shè)計(jì)模式。具有事件溯源能力的聚合根(IAggregateRootWithEventSourcing)又繼承于IAggregateRoot接口,并且有如下三個(gè)成員:
uncommittedEvents:用于緩存發(fā)生在當(dāng)前聚合中的領(lǐng)域事件
version:表示當(dāng)前聚合的版本號(hào)
Replay:將指定的一系列領(lǐng)域事件“應(yīng)用”到當(dāng)前的聚合上,也就是所謂的事件回放
此外,你還發(fā)現(xiàn)我們還有兩個(gè)神奇的接口:IPurgable和IPersistedVersionSetter。這兩個(gè)接口的職責(zé)是:
IPurgable表示,實(shí)現(xiàn)了該接口的類型具有某種清空操作,比如清空某個(gè)隊(duì)列,或者將對(duì)象狀態(tài)恢復(fù)到初始狀態(tài)。讓IAggregateRootWithEventSourcing繼承于該接口是因?yàn)?#xff0c;當(dāng)倉儲(chǔ)完成了聚合中領(lǐng)域事件的保存和派發(fā)之后,需要清空聚合中緩存的事件,以保證在今后,發(fā)生在同一時(shí)間點(diǎn)的同樣的事件不會(huì)被再次保存和派發(fā)
IPersistedVersionSetter接口允許調(diào)用者對(duì)聚合的“保存版本號(hào)”進(jìn)行設(shè)置。這個(gè)版本號(hào)表示了在事件存儲(chǔ)中,屬于當(dāng)前聚合的所有事件的個(gè)數(shù)。試想,如果一個(gè)聚合的“保存版本號(hào)”為4(即在事件存儲(chǔ)中有4個(gè)事件是屬于該聚合的),那么,如果再有2個(gè)事件發(fā)生在這個(gè)聚合中,于是,該聚合的版本就是4+2=6.
Note:為什么不將這兩個(gè)接口中的方法直接放在IAggregateRootWithEventSourcing中呢?是因?yàn)閱我宦氊?zé)原則。聚合本身不應(yīng)該存在所謂之“清空緩存”或者“設(shè)置保存版本號(hào)”這樣的概念,這樣的概念對(duì)于技術(shù)人員來說比較容易理解,可是如果將這些技術(shù)細(xì)節(jié)加入領(lǐng)域模型中,就會(huì)污染領(lǐng)域模型,造成領(lǐng)域?qū)<覠o法理解領(lǐng)域模型,這是違背面向?qū)ο蠓治雠c設(shè)計(jì)的單一職責(zé)原則的,也違背了領(lǐng)域驅(qū)動(dòng)設(shè)計(jì)的原則。那么,即使把這些方法通過額外的接口獨(dú)立出去,實(shí)現(xiàn)了IAggregateRootWithEventSourcing接口的類型,不還是要實(shí)現(xiàn)這兩個(gè)接口中的方法嗎?這樣,聚合的訪問者不還是可以訪問這兩個(gè)額外的方法嗎?的確如此,這些接口是需要被實(shí)現(xiàn)的,但是我們可以使用C#中接口的顯式實(shí)現(xiàn),這樣的話,如果不將IAggregateRootWithEventSourcing強(qiáng)制轉(zhuǎn)換成IPurgable或者IPersistedVersionSetter的話,是無法直接通過聚合根對(duì)象本身來訪問這些方法的,這起到了非常好的保護(hù)作用。接口的顯式實(shí)現(xiàn)在軟件系統(tǒng)的框架設(shè)計(jì)中也是常用手段。
抽象類AggregateRootWithEventSourcing的實(shí)現(xiàn)
在上面的類圖中,IAggregateRootWithEventSourcing最終由AggregateRootWithEventSourcing抽象類實(shí)現(xiàn)。不要抱怨類的名字太長(zhǎng),它有助于我們理解這一類型在我們的領(lǐng)域模型中的角色和功能。下面的代碼列出了該抽象類的主要部分的實(shí)現(xiàn):
| public?abstract?class?AggregateRootWithEventSourcing : IAggregateRootWithEventSourcing{private?readonly?Lazy<Dictionary<string, MethodInfo>> registeredHandlers;private?readonly?Queue<IDomainEvent> uncommittedEvents = new?Queue<IDomainEvent>();private?Guid id;private?long?persistedVersion = 0;private?object?sync = new?object();protected?AggregateRootWithEventSourcing(): this(Guid.NewGuid()){ }protected?AggregateRootWithEventSourcing(Guid id){registeredHandlers = new?Lazy<Dictionary<string, MethodInfo>>(() =>{var?registry = new?Dictionary<string, MethodInfo>();var?methodInfoList = from?mi in?this.GetType().GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance)let?returnType = mi.ReturnTypelet?parameters = mi.GetParameters()where?mi.IsDefined(typeof(HandlesInlineAttribute), false) &&returnType == typeof(void) &¶meters.Length == 1 &&typeof(IDomainEvent).IsAssignableFrom(parameters[0].ParameterType)select?new?{ EventName = parameters[0].ParameterType.FullName, MethodInfo = mi };foreach?(var?methodInfo in?methodInfoList){registry.Add(methodInfo.EventName, methodInfo.MethodInfo);}return?registry;});Raise(new?AggregateCreatedEvent(id));}public?Guid Id => id;long?IPersistedVersionSetter.PersistedVersion { set?=> Interlocked.Exchange(ref?this.persistedVersion, value); }public?IEnumerable<IDomainEvent> UncommittedEvents => uncommittedEvents;public?long?Version => this.uncommittedEvents.Count + this.persistedVersion;void?IPurgable.Purge(){lock?(sync){uncommittedEvents.Clear();}}public?void?Replay(IEnumerable<IDomainEvent> events){((IPurgable)this).Purge();events.OrderBy(e => e.Timestamp).ToList().ForEach(e =>{HandleEvent(e);Interlocked.Increment(ref?this.persistedVersion);});}[HandlesInline]protected?void?OnAggregateCreated(AggregateCreatedEvent @event){this.id = @event.NewId;}protected?void?Raise<TDomainEvent>(TDomainEvent domainEvent)where?TDomainEvent : IDomainEvent{lock?(sync){// 首先處理事件數(shù)據(jù)。this.HandleEvent(domainEvent);// 然后設(shè)置事件的元數(shù)據(jù),包括當(dāng)前事件所對(duì)應(yīng)的聚合根類型以及// 聚合的ID值。domainEvent.AggregateRootId = this.id;domainEvent.AggregateRootType = this.GetType().AssemblyQualifiedName;domainEvent.Sequence = this.Version + 1;// 最后將事件緩存在“未提交事件”列表中。this.uncommittedEvents.Enqueue(domainEvent);}}private?void?HandleEvent<TDomainEvent>(TDomainEvent domainEvent)where?TDomainEvent : IDomainEvent{var?key = domainEvent.GetType().FullName;if?(registeredHandlers.Value.ContainsKey(key)){registeredHandlers.Value[key].Invoke(this, new?object[] { domainEvent });}}} |
上面的代碼不算復(fù)雜,它根據(jù)上面的分析和描述,實(shí)現(xiàn)了IAggregateRootWithEventSourcing接口,篇幅原因,就不多做解釋了,不過有幾點(diǎn)還是可以鑒賞一下的:
使用Lazy類型來保證領(lǐng)域事件處理器的容器在整個(gè)聚合生命周期中只初始化一次
通過lock語句和Interlocked.Exchange來保證類型的線程安全和數(shù)值的原子操作
聚合根被構(gòu)造的時(shí)候,會(huì)找到當(dāng)前類型中所有標(biāo)記了HandlesInlineAttribute特性,并具有一定特征的函數(shù),將它們作為領(lǐng)域事件的內(nèi)聯(lián)處理器,注冊(cè)到容器中
每當(dāng)聚合中的某個(gè)業(yè)務(wù)操作(方法)需要更改聚合中的狀態(tài)時(shí),就調(diào)用Raise方法來產(chǎn)生領(lǐng)域事件,由對(duì)應(yīng)的內(nèi)聯(lián)處理器捕獲領(lǐng)域事件,并在處理器方法中設(shè)置聚合的狀態(tài)
Replay方法會(huì)遍歷所有給點(diǎn)的領(lǐng)域事件,調(diào)用HandleEvent方法,實(shí)現(xiàn)事件回放
現(xiàn)在,我們已經(jīng)實(shí)現(xiàn)了CQRS架構(gòu)下的聚合與聚合根,雖然實(shí)際上這個(gè)結(jié)構(gòu)有可能比我們的實(shí)現(xiàn)更為復(fù)雜,但是目前的這個(gè)設(shè)計(jì)已經(jīng)能夠滿足我們進(jìn)一步研究討論的需求了。下面,我們?cè)俑M(jìn)一步,看看CQRS中倉儲(chǔ)應(yīng)該如何實(shí)現(xiàn)。
倉儲(chǔ)實(shí)現(xiàn)初探
為什么說是“初探”?因?yàn)槲覀兡壳按蛩銓?shí)現(xiàn)的倉儲(chǔ)暫時(shí)不包含事件派發(fā)的邏輯,這部分內(nèi)容我會(huì)在后續(xù)文章中講解。首先看看,倉儲(chǔ)的接口是什么樣的。在CQRS架構(gòu)中,倉儲(chǔ)只具備兩種操作:
保存聚合
根據(jù)聚合ID(也就是聚合根的ID)值,獲取聚合對(duì)象
你或許會(huì)問,那根據(jù)某個(gè)條件查詢滿足該條件的所有聚合對(duì)象呢?注意,這是CQRS架構(gòu)中查詢部分的職責(zé),不屬于我們的討論范圍。
通常,倉儲(chǔ)的接口定義如下:
| public?interface?IRepository{Task SaveAsync<TAggregateRoot>(TAggregateRoot aggregateRoot)where?TAggregateRoot : class, IAggregateRootWithEventSourcing;Task<TAggregateRoot> GetByIdAsync<TAggregateRoot>(Guid id)where?TAggregateRoot : class, IAggregateRootWithEventSourcing;} |
| 1 |
與之前領(lǐng)域事件的設(shè)計(jì)類似,我們?yōu)閭}儲(chǔ)定義一個(gè)抽象類,所有倉儲(chǔ)的實(shí)現(xiàn)都應(yīng)該基于這個(gè)抽象類:
| public?abstract?class?Repository : IRepository{protected?Repository(){ }public?async Task<TAggregateRoot> GetByIdAsync<TAggregateRoot>(Guid id)where?TAggregateRoot : class, IAggregateRootWithEventSourcing{var?domainEvents = await LoadDomainEventsAsync(typeof(TAggregateRoot), id);var?aggregateRoot = ActivateAggregateRoot<TAggregateRoot>();aggregateRoot.Replay(domainEvents);return?aggregateRoot;}public?async Task SaveAsync<TAggregateRoot>(TAggregateRoot aggregateRoot)where?TAggregateRoot : class, IAggregateRootWithEventSourcing{var?domainEvents = aggregateRoot.UncommittedEvents;await this.PersistDomainEventsAsync(domainEvents);aggregateRoot.PersistedVersion = aggregateRoot.Version;aggregateRoot.Purge();}protected?abstract?Task<IEnumerable<IDomainEvent>> LoadDomainEventsAsync(Type aggregateRootType, Guid id);protected?abstract?Task PersistDomainEventsAsync(IEnumerable<IDomainEvent> domainEvents);private?TAggregateRoot ActivateAggregateRoot<TAggregateRoot>()where?TAggregateRoot : class, IAggregateRootWithEventSourcing{var?constructors = from?ctor in?typeof(TAggregateRoot).GetTypeInfo().GetConstructors()let?parameters = ctor.GetParameters()where?parameters.Length == 0 ||(parameters.Length == 1 && parameters[0].ParameterType == typeof(Guid))select?new?{ ConstructorInfo = ctor, ParameterCount = parameters.Length };if?(constructors.Count() > 0){TAggregateRoot aggregateRoot;var?constructorDefinition = constructors.First();if?(constructorDefinition.ParameterCount == 0){aggregateRoot = (TAggregateRoot)constructorDefinition.ConstructorInfo.Invoke(null);}else{aggregateRoot = (TAggregateRoot)constructorDefinition.ConstructorInfo.Invoke(new?object[] { Guid.NewGuid() });}// 將AggregateRoot下的所有事件清除。事實(shí)上,在AggregateRoot的構(gòu)造函數(shù)中,已經(jīng)產(chǎn)生了AggregateCreatedEvent。aggregateRoot.Purge();return?aggregateRoot;}return?null;}} |
代碼也是非常簡(jiǎn)單、容易理解的:GetByIdAsync方法根據(jù)給定的聚合根類型以及ID值,從后臺(tái)存儲(chǔ)中讀取所有屬于該聚合的領(lǐng)域事件,并在聚合上進(jìn)行回放,以便將聚合恢復(fù)到存儲(chǔ)前的狀態(tài);SaveAsync方法則從聚合根上獲得所有未被提交的領(lǐng)域事件,將這些事件保存到后臺(tái)存儲(chǔ),然后設(shè)置聚合的“已保存版本”,最后清空未提交事件的緩存。剩下的就是如何實(shí)現(xiàn)LoadDomainEventsAsync以及PersistDomainEventsAsync兩個(gè)方法了。而這兩個(gè)方法,原本就應(yīng)該是事件存儲(chǔ)對(duì)象的職責(zé)范圍了。
Note:你也許會(huì)問:如果某個(gè)聚合從開始到現(xiàn)在,已經(jīng)發(fā)生了大量的領(lǐng)域事件了,那么這樣一條條地將事件回放到聚合上,豈不是性能非常低下?沒錯(cuò),這個(gè)問題我們可以通過快照來解決。在后續(xù)文章中我會(huì)介紹。你還會(huì)問:日積月累,事件存儲(chǔ)系統(tǒng)中的事件數(shù)量豈不是會(huì)越來越多嗎?需要?jiǎng)h除嗎?答案是:不刪!不過可以對(duì)數(shù)據(jù)進(jìn)行歸檔,或者依賴一些第三方框架來處理這個(gè)問題,但是,從領(lǐng)域驅(qū)動(dòng)設(shè)計(jì)的角度,領(lǐng)域事件代表著整個(gè)領(lǐng)域模型系統(tǒng)中發(fā)生過的所有事情,事情既然已經(jīng)發(fā)生,就無法再被抹去,因此,刪除事件存儲(chǔ)系統(tǒng)中的事件是不合理的。那數(shù)據(jù)量越來越大怎么辦?答案是:或許,存儲(chǔ)硬件設(shè)備要比業(yè)務(wù)數(shù)據(jù)更便宜。
倉儲(chǔ)的實(shí)現(xiàn)我們暫且探索到這一步,目前我們只需要有一個(gè)正確的聚合保存、讀取(通過領(lǐng)域事件重塑)的邏輯就可以了,并不需要關(guān)心事件本身是如何被讀取被保存的。接下來,我們?cè)?NET Core的測(cè)試項(xiàng)目中,借助Moq框架,通過Mock一個(gè)假想的倉儲(chǔ),來驗(yàn)證整個(gè)系統(tǒng)從聚合、聚合根的實(shí)現(xiàn)到倉儲(chǔ)設(shè)計(jì)的正確性。
使用Moq框架,通過單元測(cè)試驗(yàn)證聚合、聚合根以及倉儲(chǔ)設(shè)計(jì)的正確性
Moq是一個(gè)很好的Mock框架,簡(jiǎn)單輕量,而且支持.NET Core,在單元測(cè)試的項(xiàng)目中使用Moq是一種很好的實(shí)踐。Moq上手非常簡(jiǎn)單,只需要在單元測(cè)試項(xiàng)目上添加Moq的NuGet依賴包就可以開始著手編寫測(cè)試用例了。為了測(cè)試我們的聚合根以及倉儲(chǔ)對(duì)聚合根保存、讀取的設(shè)計(jì),首先我們定義一個(gè)簡(jiǎn)單的聚合:
| public?class?Book : AggregateRootWithEventSourcing{public?void?ChangeTitle(string?newTitle){this.Raise(new?BookTitleChangedEvent(newTitle));}public?string?Title { get; private?set; }[HandlesInline]private?void?OnTitleChanged(BookTitleChangedEvent @event){this.Title = @event.NewTitle;}public?override?string?ToString(){return?Title;}} |
Book類是一個(gè)聚合根,它繼承AggregateRootWithEventSourcing抽象類,同時(shí)它有一個(gè)屬性,Title,表示書的名稱,而ChangeTitle方法(業(yè)務(wù)方法)會(huì)直接產(chǎn)生一個(gè)BookTitleChangedEvent領(lǐng)域事件,之后,OnTitleChanged成員函數(shù)會(huì)負(fù)責(zé)將領(lǐng)域事件中的NewTitle的值設(shè)置到Book聚合根的Title狀態(tài)上,完成書本標(biāo)題的更新。與之相關(guān)的BookTitleChangedEvent的定義如下:
| public?class?BookTitleChangedEvent : DomainEvent{public?BookTitleChangedEvent(string?newTitle){this.NewTitle = newTitle;}public?string?NewTitle { get; set; }public?override?string?ToString(){return?$"{Sequence} - {NewTitle}";}} |
首先,下面兩個(gè)測(cè)試用例用于測(cè)試Book聚合本身產(chǎn)生領(lǐng)域事件的過程是否正確,如果正確,那么當(dāng)Book本身本構(gòu)造時(shí),會(huì)產(chǎn)生一個(gè)AggregateCreatedEvent,如果更改書本的標(biāo)題,則又會(huì)產(chǎn)生一個(gè)BookTitleChangedEvent,所以,第一個(gè)測(cè)試中,book的版本應(yīng)該為1,而第二個(gè)則為2:
| [Fact]public?void?CreateBookTest(){// Arrange & Actvar?book = new?Book();// AssertAssert.NotEqual(Guid.Empty, book.Id);Assert.Equal(1, book.Version);}[Fact]public?void?ChangeBookTitleEventTest(){// Arrangevar?book = new?Book();// Actbook.ChangeTitle("Hit Refresh");// AssertAssert.Equal("Hit Refresh", book.Title);Assert.Equal(2, book.UncommittedEvents.Count());Assert.Equal(2, book.Version);} |
接下來,測(cè)試倉儲(chǔ)保存Book聚合的正確性,因?yàn)槲覀儧]有實(shí)現(xiàn)一個(gè)有效的倉儲(chǔ)實(shí)例,因此,這里借助Moq幫我們動(dòng)態(tài)生成。在下面的代碼中,讓Moq對(duì)倉儲(chǔ)抽象類的PersisDomainEventsAsync受保護(hù)成員進(jìn)行動(dòng)態(tài)生成,指定當(dāng)它被任何IEnumerable<IDomainEvent>作為參數(shù)調(diào)用時(shí),都將這些事件保存到一個(gè)本地的List中,于是,最后只需要檢查L(zhǎng)ist中的領(lǐng)域事件是否符合我們的要求就可以了。代碼如下:
| [Fact]public?async Task PersistBookTest(){// Arrangevar?domainEventsList = new?List<IDomainEvent>();var?mockRepository = new?Mock<Repository>();mockRepository.Protected().Setup<Task>("PersistDomainEventsAsync",ItExpr.IsAny<IEnumerable<IDomainEvent>>()).Callback<IEnumerable<IDomainEvent>>(evnts => domainEventsList.AddRange(evnts)).Returns(Task.CompletedTask);var?book = new?Book();// Actbook.ChangeTitle("Hit Refresh");await mockRepository.Object.SaveAsync(book);// AssertAssert.Equal(2, domainEventsList.Count);Assert.Empty(book.UncommittedEvents);Assert.Equal(2, book.Version);} |
同理,我們還可以測(cè)試倉儲(chǔ)讀取聚合并恢復(fù)聚合狀態(tài)的正確性,同樣還是使用Moq對(duì)倉儲(chǔ)的LoadDomainEventsAsync進(jìn)行Mock:
| [Fact]public?async Task RetrieveBookTest(){// Arrangevar?fakeId = Guid.NewGuid();var?domainEventsList = new?List<IDomainEvent>{new?AggregateCreatedEvent(fakeId),new?BookTitleChangedEvent("Hit Refresh")};var?mockRepository = new?Mock<Repository>();mockRepository.Protected().Setup<Task<IEnumerable<IDomainEvent>>>("LoadDomainEventsAsync",ItExpr.IsAny<Type>(),ItExpr.IsAny<Guid>()).Returns(Task.FromResult(domainEventsList.AsEnumerable()));// Actvar?book = await mockRepository.Object.GetByIdAsync<Book>(fakeId);// AssertAssert.Equal(fakeId, book.Id);Assert.Equal("Hit Refresh", book.Title);Assert.Equal(2, book.Version);Assert.Empty(book.UncommittedEvents);} |
好了,其它的幾個(gè)測(cè)試用例就不多做介紹了,使用Visual Studio運(yùn)行一下測(cè)試然后查看結(jié)果就可以了:
總結(jié)
本文又是一篇長(zhǎng)篇幅的文章,好吧,要介紹的東西太多,而且這些內(nèi)容又不能單獨(dú)割開成多個(gè)主題,所以也就很難控制篇幅了。文章主要介紹了基于CQRS架構(gòu)的聚合以及聚合根的設(shè)計(jì)與實(shí)現(xiàn),同時(shí)引出了倉儲(chǔ)的部分實(shí)現(xiàn),這些內(nèi)容也是為今后進(jìn)一步討論事件驅(qū)動(dòng)型架構(gòu)做準(zhǔn)備。本文介紹的內(nèi)容對(duì)于一個(gè)真實(shí)的CQRS系統(tǒng)實(shí)現(xiàn)來說還是有一定差距的,但總體結(jié)構(gòu)也大致如此。文中還提及了快照的概念,這部分內(nèi)容我今后在介紹事件存儲(chǔ)的實(shí)現(xiàn)部分還會(huì)詳細(xì)討論,下一章打算擴(kuò)展一下倉儲(chǔ)本身,了解一下倉儲(chǔ)對(duì)領(lǐng)域事件的派發(fā),以及事件處理器對(duì)領(lǐng)域事件的處理。
源代碼的使用
本系列文章的源代碼在https://github.com/daxnet/edasample這個(gè)Github Repo里,通過不同的release tag來區(qū)分針對(duì)不同章節(jié)的源代碼。本文的源代碼請(qǐng)參考chapter_4這個(gè)tag,如下:
相關(guān)文章:?
ASP.NET Core Web API下事件驅(qū)動(dòng)型架構(gòu)的實(shí)現(xiàn)(一):一個(gè)簡(jiǎn)單的實(shí)現(xiàn)
ASP.NET Core Web API下事件驅(qū)動(dòng)型架構(gòu)的實(shí)現(xiàn)(二):事件處理器中對(duì)象生命周期的管理
ASP.NET Core Web API下事件驅(qū)動(dòng)型架構(gòu)的實(shí)現(xiàn)(三):基于RabbitMQ的事件總線
原文地址:https://www.cnblogs.com/daxnet/p/8594287.html
.NET社區(qū)新聞,深度好文,歡迎訪問公眾號(hào)文章匯總 http://www.csharpkit.com
總結(jié)
以上是生活随笔為你收集整理的ASP.NET Core Web API下事件驱动型架构的实现(四):CQRS架构中聚合与聚合根的实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 部署用于生产的Exceptionlees
- 下一篇: 如何看待微软新开源的Service Fa