使用MongoDB进行事件流
MongoDB是一個(gè)非常出色的“ NoSQL”數(shù)據(jù)庫,具有廣泛的應(yīng)用程序。 在SoftwareMill開發(fā)的一個(gè)項(xiàng)目中,我們將其用作復(fù)制的事件存儲(chǔ),然后將事件從事件流傳輸?shù)狡渌M件。
介紹
基本思想非常簡(jiǎn)單(另請(qǐng)參閱Martin Fowler關(guān)于Event Sourcing的文章)。 我們的系統(tǒng)生成一系列事件。 這些事件將保留在事件存儲(chǔ)中。 系統(tǒng)中的其他組件遵循事件流并對(duì)其進(jìn)行“處理”。 例如,可以將它們匯總并寫入報(bào)告數(shù)據(jù)庫(另一方面,它類似于CQRS )。 這種方法有很多優(yōu)點(diǎn):
- 事件的讀取和寫入是解耦的(異步的)
- 鑒于它沒有死得太久,任何后續(xù)組件都可能死亡,然后“追趕”
- 可能有多個(gè)關(guān)注者。 跟隨者可以從從屬副本讀取數(shù)據(jù),以獲得更好的可伸縮性
- 事件活動(dòng)的爆發(fā)對(duì)事件接收器的影響減少; 最壞的情況下,報(bào)告生成速度會(huì)變慢
這里的關(guān)鍵組件當(dāng)然是快速可靠的事件存儲(chǔ)。 我們用來實(shí)現(xiàn)一個(gè)的MongoDB的三個(gè)關(guān)鍵功能是:
- 上限集合和尾部游標(biāo)
- 快速收集附件
- 復(fù)制集
采集
作為基礎(chǔ),我們使用有上限的集合 ,根據(jù)定義,該集合受大小限制。 如果編寫新事件將導(dǎo)致集合超出大小限制,則最早的事件將被覆蓋。 這給了我們類似于事件的循環(huán)緩沖區(qū)的功能。 (此外,我們也很安全地避免了磁盤空間不足錯(cuò)誤。)
在2.2版之前,默認(rèn)情況下,上限集合沒有_id字段(因此沒有索引)。 但是,由于我們希望事件能夠在整個(gè)副本集上可靠地寫入,因此_id字段及其上的索引都是必需的。
寫作活動(dòng)
編寫事件是一個(gè)簡(jiǎn)單的Mongo插入操作; 插入也可以分批完成。 根據(jù)我們對(duì)事件丟失的容忍度,我們可能會(huì)使用各種Mongo 寫入問題 (例如,等待來自單節(jié)點(diǎn)或多個(gè)節(jié)點(diǎn)的寫入確認(rèn))。
所有事件都是不可變的。 除了更好的,線程安全的Java代碼外,這是事件流的必要條件。 如果事件是可變的,事件接收器將如何知道更新的內(nèi)容? 而且,這對(duì)Mongo的性能有很好的影響。 由于永遠(yuǎn)不會(huì)更改數(shù)據(jù),因此寫入磁盤的文檔永遠(yuǎn)不會(huì)縮小或擴(kuò)展,因此無需在磁盤上移動(dòng)塊。 實(shí)際上,在具有上限的集合中,Mongo不允許增長(zhǎng)曾經(jīng)編寫的文檔。
閱讀活動(dòng)
讀取事件流要復(fù)雜一些。 首先,可能有多個(gè)閱讀器,每個(gè)閱讀器在流中具有不同的進(jìn)度。 其次,如果流中沒有事件,我們希望讀者等待一些事件可用,并避免主動(dòng)輪詢。 最后,我們想分批處理事件,以提高性能。
有尾游標(biāo)可以解決這些問題。 要?jiǎng)?chuàng)建這樣的游標(biāo),我們必須提供一個(gè)起點(diǎn)–事件的ID,我們將從該事件開始讀取; 如果未提供ID,則光標(biāo)將返回最早的可用事件。 因此,每個(gè)讀取器必須存儲(chǔ)它已讀取和處理的最后一個(gè)事件。
更重要的是,如果沒有新數(shù)據(jù)可用,可尾光標(biāo)可以有選擇地阻塞一段時(shí)間,從而解決了主動(dòng)輪詢問題。
(順便說一下,mongo用于在副本集之間復(fù)制數(shù)據(jù)的oplog集合也是一個(gè)有上限的集合。從屬M(fèi)ongo實(shí)例在該集合后面尾隨,流式傳輸“事件”(即數(shù)據(jù)庫操作),并按順序在本地應(yīng)用它們。 )
讀取Java中的事件
使用Mongo Java驅(qū)動(dòng)程序時(shí) ,有一些“問題”。 首先,您需要初始化游標(biāo)。 為此,我們需要提供(1)最后一個(gè)事件ID(如果存在); (2)我們要讀取事件的順序(此處為自然順序,即插入順序); (3)兩個(gè)關(guān)鍵的游標(biāo)選項(xiàng),我們希望游標(biāo)是可拖尾的,并且如果沒有新數(shù)據(jù),我們希望將其阻止:
DBObject query = lastReceivedEventId.isPresent()? BasicDBObjectBuilder.start('_id', BasicDBObjectBuilder.start('$gte', lastReceivedEventId.get()).get()).get(): null;DBObject sortBy = BasicDBObjectBuilder.start('$natural', 1).get();DBCollection collection = ... // must be a capped collection DBCursor cursor = collection.find(query).sort(sortBy).addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA);您可能想知道為什么我們使用>= last_id而不是> 。 由于生成Mongo ObjectId的方式在這里需要。 如果使用一個(gè)簡(jiǎn)單的> last_id我們可能會(huì)錯(cuò)過一些與last_id事件在同一秒之后但之后發(fā)生的事件。 這也意味著我們的Java代碼必須處理這一事實(shí),并丟棄收到的第一個(gè)事件。
游標(biāo)的類擴(kuò)展了基本的Java Iterator接口,因此非常易于使用。 因此,現(xiàn)在我們可以進(jìn)行批處理了。 在游標(biāo)上進(jìn)行迭代時(shí),驅(qū)動(dòng)程序?qū)⑴繌腗ongo服務(wù)器接收數(shù)據(jù); 因此我們可以像調(diào)用其他迭代器一樣簡(jiǎn)單地調(diào)用hasNext()和next()來接收后續(xù)元素,并且只有某些調(diào)用會(huì)真正導(dǎo)致與服務(wù)器的網(wǎng)絡(luò)通信。
在Mongo Java驅(qū)動(dòng)程序中,實(shí)際上可能阻塞的hasNext()是hasNext() 。 如果我們要分批處理事件,我們需要(a)只要有可用的元素就讀取它們,并且(b)在被阻止沒有更多事件之前有某種了解的方式,并且我們可以處理事件已經(jīng)批處理。 由于hasNext()可以阻止,因此我們無法直接執(zhí)行此操作。
這就是為什么我們引入了中間隊(duì)列( LinkedBlockingQueue )的原因。 在單獨(dú)的線程中,從游標(biāo)讀取的事件在到達(dá)時(shí)即被放入隊(duì)列中。 如果沒有事件,則線程將在cursor.hasNext()上cursor.hasNext() 。 阻塞隊(duì)列有一個(gè)可選的大小限制,因此,如果隊(duì)列已滿,則放置一個(gè)元素也將阻塞,直到有可用空間為止。 在事件消費(fèi)者線程中,我們首先嘗試以阻塞方式(使用.poll從隊(duì)列中讀取單個(gè)元素,因此我們?cè)谶@里等待所有事件可用。 然后,我們嘗試將隊(duì)列的全部?jī)?nèi)容消耗到一個(gè)臨時(shí)集合中(使用.drainTo ,構(gòu)建批處理,并可能獲取0個(gè)元素,但我們始終擁有第一個(gè))。
值得一提的是,如果集合為空,則Mongo不會(huì)阻止,因此我們必須回到主動(dòng)輪詢。 我們還必須考慮到游標(biāo)可能會(huì)在等待期間死亡的事實(shí)。 要對(duì)此進(jìn)行檢查,我們應(yīng)該驗(yàn)證cursor.getCursorId() != 0 ,其中0是“死光標(biāo)”的ID。 在這種情況下,我們只需要重新實(shí)例化游標(biāo)即可。
加起來
綜上所述,我們得到了一個(gè)非常快速的事件源/流解決方案。 從某種意義上說,這是“自我調(diào)節(jié)”,即如果事件活動(dòng)達(dá)到高峰,則事件接收器將大批量讀取這些事件。 如果事件活動(dòng)少,則將分批快速處理它們。
我們還將同一個(gè)Mongo實(shí)例用于其他目的。 從操作角度來看,擁有一個(gè)數(shù)據(jù)庫系統(tǒng)來聚簇和維護(hù)常規(guī)數(shù)據(jù)和事件肯定是一件好事。
參考: Adam Warski博客的Blog中來自我們的JCG合作伙伴 Adam Warski的MongoDB事件流 。
翻譯自: https://www.javacodegeeks.com/2012/11/event-streaming-with-mongodb.html
總結(jié)
以上是生活随笔為你收集整理的使用MongoDB进行事件流的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 转诊备案手续怎么办理(转诊备案)
- 下一篇: JavaOne 2012:在JVM上诊断