消息中间件核心实体(1)
接上一篇《消息中間件核心實體(0)》,這一篇繼續介紹消息中間件中的一些實體。
上一篇主要是Message、Topic、TopicMeta和Queue這樣最基礎的實體,這幾篇介紹一些發送和消費的過程中會涉及到的實體和組件。
1. 發送
1.1 增強Message屬性
Message一般只包含topic、tag、content這些屬性,這些屬性也是使用方在發送時會涉及到的內容。但是光有這些屬性往往是不夠的,比如我們會需要記錄產生這條消息的Producer的信息;記錄消息的產生時間和產生的IP信息等等。這些信息都是在Client中給消息附加上去的,對發送方來說是透明的,所以不會在Message實體中暴露,而是我們會增加一個實體:EnhancedMessage。
EnhancedMessage繼承自Message,并會增加一些如下的屬性:
-
bornTime
-
bornAddress
-
producer
-
etc
引申一點,Producer發送消息的大致過程如下:
增強Message屬性,得到EnhancedMessage的實例
獲取可以寫入的隊列(也可以理解成獲取分區)
向隊列寫入消息(可以是隊列暴露寫入接口或者由專門的寫入工具寫入到隊列中)
偽代碼:
EnhancedMessage msg = enhance(message); // 根據消息選擇一個可以寫入的目標隊列 WritableQueue queue = router.select(msg); // 寫入消息(queue實現write方法進行寫入) Result result = queue.write(msg);// write過程 // 將消息序列化成自定義協議的網絡包 Packet messagePacket = Serializer.encode(msg); // 發送網絡包 bootstrap.write(messagePacket);上面的WritableQueue暴露了API去寫入,具體實現可以是寫入到網絡,即遠端的一個Partition。而在做單元測試或者本地測試的時候,可以覆蓋write的實現,而不用真正寫入到網絡中,這會使代碼更容易測試測試。
上面兩幅圖是Rocket開源版本中發送相關的一些代碼,私以為這段代碼非常的不優雅,讀起來特別累,特別是requestHeader的各種屬性設置。
這段是Rocket開源版本中真正將消息寫入到網絡的實現,看起來總是非常臃腫,另外不知道是如何mock這些實現以達到在本地做測試的目的的。
1.2 Queue的路由選擇
發送過程中會涉及到隊列的選擇(分區的選擇),一條消息最終會根據一定的策略落到一個分區中,這里需要一個組件來完成選擇(把這個組件單獨抽象出來,這樣便于控制寫入的目標來進行測試,抽象出來也可以由使用方來實現,這樣可以按照使用方自己的場景做特定的路由)。
路由組件非常的簡單,一般是Router會根據topic獲取到topic的元數據(元數據包含了多有分區的信息),然后根據消息的屬性或者用戶的參數計算出落到哪個分區,比如可以根據用戶的參數對分區總數取模來選擇分區,這樣可以做到將某一類消息發送到一個分區,比如同一個用戶的消息或同一筆訂單的不同消息。
這個組件會比較簡單,但是在集成的時候需要注意一點,這個組件用戶可以自己注入到Producer中來達到控制分區選擇策略的目的。
RocketMQ在TopicPublishInfo中實現分區的選擇,TopicPublishInfo包含了隊列信息(List<MessageQueue> messageQueueList屬性),筆者更傾向于抽象出獨立的路由組件,以便在特定的場景用戶可以自己實現路由,或者在測試時可以做到使用特定路由規則。
2. 消費
消費可以分為多種方式,從獲取消息的方式上可以分為Pull和Push兩種類型的Consumer;從消費消息的方式上可以分為集群消費和廣播消費。這里不展開討論各種模式的實現(以后單獨會討論Consumer該實現那些內容),會以Push模式&集群消費的Consumer為例,把消費流程中涉及到的一些組件進行介紹。
2.1 分配分區
集群消費中需要保證每個分區有且只有一個Consumer在進行消費。如果某個分區沒有Consumer消費,那么使用方拿不到完整的數據;如果某個分區被兩個Consumer消費,那么會產生大量的重復消息。所以這里需要實現一個分區分配策略,使在分布式環境中,每個Consumer拿到屬于自己的分區,且相互交叉。下面是四個分區兩個Consumer默認情況下的分配結果。
實現的策略一般是:
拿到一個Topic所有的分區,對這個列表進行排序
拿到當前所有的Consumer,對Consumer列表進行排序
根據自己所處的Consumer列表的位置和Consumer總數,從分區列表中獲取對應的一部分
每個分區和Consumer都有唯一的ID,這樣各自按照排序后的結果進行分配,可以達到相互不交叉且不遺漏的目的。(在Consumer總數或分區數發生變化的過程中可能分配結果不正確,這個過程是短暫的,且在消費時還會結合鎖去保證分區只有一個Consumer消費,所以不會對實際消費產生影響)。
同樣記住一點,這個分配策略是需要暴露出去的,系統可以默認實現集群消費和廣播消費的基礎策略,用戶可以實現自己的分配策略注入到系統中。
2.2 消息緩存
消費端一個重要的組件是消息緩存。為了提升性能,在消費端消息的獲取和消息的消費是異步的。Consumer內部有線程專門從服務端獲取消息寫入到消息緩存中,另外有線程從緩存中獲取消息調用用戶的回調接口來執行業務操作。
消息緩存除了提供基礎的put和take來實現存入消息和取出消息,還需要自身容量,水位控制等配置。
本身Buffer不是很復雜的部分,但是需要考慮一些流控策略,比如Buffer使用率到多少時降低從服務端獲取數據的頻率。
RocketMQ中實現消息緩存由ProcessQueue實現,筆者傾向于獨立出Buffer模塊,另外Buffer需要提供鎖,以實現順序消費。
2.3 消費進度
還有一個重要的實體是消費進度,系統需要記錄“每個”Consumer的消費進度,且這個數據需要被持久化。
消費進度需要記錄某個Group對某個Topic的某個分區的消費位點。進度是按照Topic維度去組織的(持久化在服務端),結構如下:
topicgroup0cursor0、cursor1、cursor2...group1...實現的對象應該是: class Cursors {String topic;Cursor cursor;class Cursor {String group;// 用數組來存儲一個group消費的一個topic的所有分區的進度// 分區數一般情況下不會變更(變更場景很少),用數據就可以long[] cursors;} }Consumer可以在每一次獲取消息時將消費進度提交到服務端,在服務端來更新Cursors內部的數據。
3. 結語
最近兩篇內容將一些基礎實體和組件簡單的介紹了一下,下一篇討論一下消息應該由Server Push給Consumer還是Consumer主動來Pull消息。
往期文章:
消息中間件核心實體(0)
消息的寫入和讀取流程
NameServer模塊劃分
Client模塊劃分
Broker模塊劃分
消息中間件架構討論
業務方對消息中間件的需求
消息中間件中的一些概念
什么是分布式消息中間件?
歡迎關注公眾號來交流MQ相關問題。
?
轉載于:https://www.cnblogs.com/hzmark/p/mq_entity_1.html
總結
以上是生活随笔為你收集整理的消息中间件核心实体(1)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: html 嵌入编辑excel 开源_网页
- 下一篇: WinUSB