Rocketmq学习2——Rocketmq消息过滤&事务消息&延迟消息原理源码浅析
系列文章目錄和關于我
零丶引入
在《Rocketmq學習1——Rocketmq架構&消息存儲&刷盤機制》中我們學習了rocketmq的架構,以及消息存儲設計,在此消息存儲設計之上,rocketmq提供了諸如:延時消息、事務消息、消息過濾、消息回溯等高級特性。這一篇將對這些高級特性的原理進行淺顯地學習。
這一篇不會展示這些高級特性怎么使用,如何使用可用查看rocketmq-example源碼
一丶消息過濾
RocketMQ分布式消息隊列的消息過濾方式有別于其它MQ中間件,在kafka中,如果想實現消息過濾,需要消費者拿到消息后,反序列化消息識別其中的tag進行過濾。
但是RocketMQ是在Consumer端訂閱消息時再做消息過濾的。RocketMQ這么做是在于其Producer端寫入消息和Consumer端訂閱消息采用分離存儲的機制來實現的,Consumer端訂閱消息是需要通過ConsumeQueue這個消息消費的邏輯隊列拿到一個索引,然后再從CommitLog里面讀取真正的消息實體內容,所以說到底也是還繞不開其存儲結構。其ConsumeQueue的存儲結構如下,可以看到其中有8個字節存儲的Message Tag的哈希值,基于Tag的消息過濾正是基于這個字段值的。
主要支持如下2種的過濾方式
(1) Tag過濾方式:Consumer端在訂閱消息時除了指定Topic還可以指定TAG,如果一個消息有多個TAG,可以用||分隔。其中,Consumer端會將這個訂閱請求構建成一個 SubscriptionData,發送一個Pull消息的請求給Broker端。Broker端從RocketMQ的文件存儲層—Store讀取數據之前,會用這些數據先構建一個MessageFilter,然后傳給Store。Store從 ConsumeQueue讀取到一條記錄后,會用它記錄的消息tag hash值去做過濾,由于在服務端只是根據hashcode進行判斷,無法精確對tag原始字符串進行過濾,故在消息消費端拉取到消息后,還需要對消息的原始tag字符串進行比對,如果不同,則丟棄該消息,不進行消息消費。
如上是tag消息過濾的大致邏輯,可用看到最終還是從commitLog中根據偏移量獲取消息,那么為什么rocketmq不解析一下消息內容,再次根據tag字符串進行比較昵?
這是因為這里使用了MappedByteBuffer避免將整個CommitLog讀取到內存中,如果試圖將消息讀取到內存中,比較tag的話,maybe出現磁盤IO和內核態和用戶態的切換(如果這個消息沒有被預先加載到物理內存中,操作系統會觸發一個缺頁中斷,這時候會從用戶態切換到內核態,從磁盤上讀取消息,然后加載到物理內容,然后再從內核態切換到用戶態)
(2) SQL92的過濾方式:這種方式的大致做法和上面的Tag過濾方式一樣,只是在Store層的具體過濾過程不太一樣,真正的 SQL expression 的構建和執行由rocketmq-filter模塊負責的。每次過濾都去執行SQL表達式會影響效率,所以RocketMQ使用了BloomFilter避免了每次都去執行。SQL92的表達式上下文為消息的屬性。
大致原理是,根據消息屬性中獲取序列化的布隆過濾器數據,如果布隆過濾器表示不符合那么肯定是不符合,如果符合那么需要進一步進行過濾。
二丶事務消息
1.事務消息大致流程
RocketMQ采用了2PC的思想來實現了提交事務消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息,如下圖所示。
上圖說明了事務消息的大致方案,其中分為兩個流程:正常事務消息的發送及提交、事務消息的補償流程。
-
事務消息發送及提交:
-
發送消息(half消息):這一階段的消息對消費者來說是不可見的,RocketMQ事務消息是這樣實現half消息不可見的:
如果消息是half消息,將備份原消息的主題與消息消費隊列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由于消費組未訂閱該主題,故消費端無法消費half類型的消息,然后RocketMQ會開啟一個定時任務,從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進行消費,根據
生產者組獲取一個服務提供者發送回查事務狀態請求,根據事務狀態來決定是提交或回滾消息。這里可看到生產者組的作用:如果生產者服務器A和B是一個生產者組,生產者A掛了,rocketmq會請求生產者B來回程事務提交狀態 -
服務端響應消息寫入結果。
-
根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行)。
-
根據本地事務狀態執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)
-
-
補償流程:補償階段用于解決消息Commit或者Rollback發生超時或者失敗的情況
- 對沒有Commit/Rollback的事務消息(pending狀態的消息),從服務端發起一次“回查”
- Producer收到回查消息,檢查回查消息對應的本地事務的狀態
- 根據本地事務狀態,重新Commit或者Rollback
可用看到rocketmq通過主動會查實現最終一致性,但是不會無限制的重試下去,默認回查15次,如果15次回查還 是無法得知事務狀態,rocketmq默認回滾該消息。
如下如果發送事務消息,那么會在消息中標記是一個事務消息
在Broker端,如果根據此字段可得知是否時事務消息,如果是,那么會有存儲為half消息
如上,可看到如果是事務消息會備份原topic,然后替換為事務topic,然后使用Store進行存儲。
2.Commit和Rollback操作以及Op消息的引入
在完成一階段寫入一條對用戶不可見的消息后,二階段如果是Commit操作,則需要讓消息對用戶可見;如果是Rollback則需要撤銷一階段的消息。先說Rollback的情況。對于Rollback,本身一階段的消息對用戶是不可見的,其實不需要真正撤銷消息(實際上RocketMQ也無法去真正的刪除一條消息,因為是順序寫文件的)。但是區別于這條消息沒有確定狀態(Pending狀態,事務懸而未決),需要一個操作來標識這條消息的最終狀態。RocketMQ事務消息方案中引入了Op消息的概念,用Op消息標識事務消息已經確定的狀態(Commit或者Rollback)。如果一條事務消息沒有對應的Op消息,說明這個事務的狀態還無法確定(可能是二階段失敗了)。引入Op消息后,事務消息無論是Commit或者Rollback都會記錄一個Op操作。Commit相對于Rollback只是在寫入Op消息前創建Half消息的索引。
3.Op消息的存儲和對應關系
RocketMQ將Op消息寫入到全局一個特定的Topic中通過源碼中的方法—TransactionalMessageUtil.buildOpTopic();這個Topic是一個內部的Topic(像Half消息的Topic一樣),不會被用戶消費。Op消息的內容為對應的Half消息的存儲的Offset,這樣通過Op消息能索引到Half消息進行后續的回查操作。
4.Half消息的索引構建
在執行二階段Commit操作時,需要構建出Half消息的索引。一階段的Half消息由于是寫到一個特殊的Topic,所以二階段構建索引時需要讀取出Half消息,并將Topic和Queue替換成真正的目標的Topic和Queue,之后通過一次普通消息的寫入操作來生成一條對用戶可見的消息。所以RocketMQ事務消息二階段其實是利用了一階段存儲的消息的內容,在二階段時恢復出一條完整的普通消息,然后走一遍消息寫入流程。
5.如何處理二階段失敗的消息?
如果在RocketMQ事務消息的二階段過程中失敗了,例如在做Commit操作時,出現網絡問題導致Commit失敗,那么需要通過一定的策略使這條消息最終被Commit。RocketMQ采用了一種補償機制,稱為“回查”。Broker端對未確定狀態的消息發起回查,將消息發送到對應的Producer端(同一個Group的Producer),由Producer根據消息來檢查本地事務的狀態,進而執行Commit或者Rollback。Broker端通過對比Half消息和Op消息進行事務消息的回查并且推進CheckPoint(記錄那些事務消息的狀態是確定的)。
值得注意的是,rocketmq并不會無休止的的信息事務狀態回查,默認回查15次,如果15次回查還是無法得知事務狀態,rocketmq默認回滾該消息。
三丶延遲消息
定時消息(延遲隊列)是指消息發送到broker后,不會立即被消費,等待特定時間投遞給真正的topic。基本實現方式和事務消息類似
broker有配置項messageDelayLevel,默認值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18個level。可以配置自定義messageDelayLevel。注意, messageDelayLevel是broker的屬性,不屬于某個topic。發消息時,設置delayLevel等級即可: msg.setDelayLevel(level)。level有以下三種情況:
- level == 0,消息為非延遲消息
- 1<=level<=maxLevel,消息延遲特定時間,例如level==1,延遲1s
- level > maxLevel,則level== maxLevel,例如level==20,延遲2h
定時消息會暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一個queue只存相同延遲的消息,保證具有相同發送延遲 的消息能夠順序消費。broker會調度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。
如下是rocketmq基于調度線程池,實現定時任務處理延遲消息
總結
以上是生活随笔為你收集整理的Rocketmq学习2——Rocketmq消息过滤&事务消息&延迟消息原理源码浅析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MVCC多版本并发控制和幻读问题的解决
- 下一篇: JVM学习-自动内存管理