RabbitMQ之镜像队列
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-mirror-queue/
概述
如果RabbitMQ集群只有一個broker節點,那么該節點的失效將導致整個服務臨時性的不可用,并且可能會導致message的丟失(尤其是在非持久化message存儲于非持久化queue中的時候)。當然可以將所有的publish的message都設置為持久化的,并且使用持久化的queue,但是這樣仍然無法避免由于緩存導致的問題:因為message在發送之后和被寫入磁盤并執行fsync之間存在一個雖然短暫但是會產生問題的時間窗。通過publisher的confirm機制能夠確保客戶端知道哪些message已經存入磁盤,盡管如此,一般不希望遇到因單點故障導致的服務不可用。
如果RabbitMQ集群是由多個broker節點構成的,那么從服務的整體可用性上來講,該集群對于單點失效是有彈性的,但是同時也需要注意:盡管exchange和binding能夠在單點失效問題上幸免于難,但是queue和其上持有的message卻不行,這是因為queue及其內容僅僅存儲于單個節點之上,所以一個節點的失效表現為其對應的queue不可用。
引入RabbitMQ的鏡像隊列機制,將queue鏡像到cluster中其他的節點之上。在該實現下,如果集群中的一個節點失效了,queue能自動地切換到鏡像中的另一個節點以保證服務的可用性。在通常的用法中,針對每一個鏡像隊列都包含一個master和多個slave,分別對應于不同的節點。slave會準確地按照master執行命令的順序進行命令執行,故slave與master上維護的狀態應該是相同的。除了publish外所有動作都只會向master發送,然后由master將命令執行的結果廣播給slave們,故看似從鏡像隊列中的消費操作實際上是在master上執行的。
一旦完成了選中的slave被提升為master的動作,發送到鏡像隊列的message將不會再丟失:publish到鏡像隊列的所有消息總是被直接publish到master和所有的slave之上。這樣一旦master失效了,message仍然可以繼續發送到其他slave上。
RabbitMQ的鏡像隊列同時支持publisher confirm和事務兩種機制。在事務機制中,只有當前事務在全部鏡像queue中執行之后,客戶端才會收到Tx.CommitOk的消息。同樣的,在publisher confirm機制中,向publisher進行當前message確認的前提是該message被全部鏡像所接受了。
鏡像隊列的設置
鏡像隊列的配置通過添加policy完成,policy添加的命令為:
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]-p Vhost: 可選參數,針對指定vhost下的queue進行設置 Name: policy的名稱 Pattern: queue的匹配模式(正則表達式) Definition:鏡像定義,包括三個部分ha-mode, ha-params, ha-sync-modeha-mode:指明鏡像隊列的模式,有效值為 all/exactly/nodesall:表示在集群中所有的節點上進行鏡像exactly:表示在指定個數的節點上進行鏡像,節點的個數由ha-params指定nodes:表示在指定的節點上進行鏡像,節點名稱通過ha-params指定ha-params:ha-mode模式需要用到的參數ha-sync-mode:進行隊列中消息的同步方式,有效值為automatic和manual priority:可選參數,policy的優先級例如,對隊列名稱以“queue_”開頭的所有隊列進行鏡像,并在集群的兩個節點上完成進行,policy的設置命令為:
rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue "^queue_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'也可以通過RabbitMQ的web管理界面設置:
或者通過HTTP API的方式,詳細可以參考官方文檔:Highly Available (Mirrored) Queues
鏡像隊列的原理
普通MQ的結構
通常隊列由兩部分組成:一部分是AMQQueue,負責AMQP協議相關的消息處理,即接收生產者發布的消息、向消費者投遞消息、處理消息confirm、acknowledge等等;另一部分是BackingQueue,它提供了相關的接口供AMQQueue調用,完成消息的存儲以及可能的持久化工作等。
在RabbitMQ中BackingQueue又由5個子隊列組成:Q1, Q2, Delta, Q3和Q4。RabbitMQ中的消息一旦進入隊列,不是固定不變的,它會隨著系統的負載在隊列中不斷流動,消息的不斷發生變化。與這5個子隊列對于,在BackingQueue中消息的生命周期分為4個狀態:
- Alpha:消息的內容和消息索引都在RAM中。Q1和Q4的狀態。
- Beta:消息的內容保存在DISK上,消息索引保存在RAM中。Q2和Q3的狀態。
- Gamma:消息內容保存在DISK上,消息索引在DISK和RAM都有。Q2和Q3的狀態。
- Delta:消息內容和索引都在DISK上。Delta的狀態。
注意:對于持久化的消息,消息內容和消息所有都必須先保存在DISK上,才會處于上述狀態中的一種,而Gamma狀態的消息是只有持久化的消息才會有的狀態。
上述就是RabbitMQ的多層隊列結構的設計,我們可以看出從Q1到Q4,基本經歷RAM->DISK->RAM這樣的過程。這樣設計的好處是:當隊列負載很高的情況下,能夠通過將一部分消息由磁盤保存來節省內存空間,當負載降低的時候,這部分消息又漸漸回到內存,被消費者獲取,使得整個隊列具有很好的彈性。下面我們就來看一下,整個消息隊列的工作流程。
引起消息流動主要有兩方面因素:其一是消費者獲取消息;其二是由于內存不足引起消息換出到磁盤。RabbitMQ在系統運行時會根據消息傳輸的速度計算一個當前內存中能夠保存的最大消息數量(Target_RAM_Count),當內存中的消息數量大于該值時,就會引起消息的流動。進入隊列的消息,一般會按照Q1->Q2->Delta->Q3->Q4的順序進行流動,但是并不是每條消息都一定會經歷所有的狀態,這個取決于當前系統的負載狀況。
當消費者獲取消息時,首先會從Q4隊列中獲取消息,如果Q4獲取成功,則返回。如果Q4為空,則嘗試從Q3獲取消息,首先系統會判斷Q3是否為空,如果為空則返回隊列為空,即此時隊列中無消息(后續會論證)。如果不為空,則取出Q3的消息,然后判斷此時Q3和Delta隊列的長度,如果都為空,則可認為Q2、Delta、Q3、Q4全部為空(后續會論證),此時將Q1中消息直接轉移到Q4中,下次直接從Q4中獲取消息。如果Q3為空,Delta不為空,則將Delta轉移到Q3中,如果Q3不為空,則直接下次從Q3中獲取消息。在將Delta轉移到Q3的過程中,RabbitMQ是按照索引分段讀取的,首先讀取某一段,直到讀到的消息非空為止,然后判斷讀取的消息個數與Delta中的消息個數是否相等,如果相等,則斷定此時Delta中已無消息,則直接將Q2和剛讀到的消息一并放入Q3中。如果不相等,則僅將此次讀取到的消息轉移到Q3。這就是消費者引起的消息流動過程。
消息換出的條件是內存中保存的消息數量+等待ACK的消息的數量>Target_RAM_Count。當條件出發時,系統首先會判斷如果當前進入等待ACK的消息的速度大于進入隊列的消息的速度時,會先處理等待ACK的消息。
最后我們來分析一下前面遺留的兩個問題,一個是為什么Q3隊列為空即可以認定整個隊列為空。試想如果Q3為空,Delta不空,則在Q3取出最后一條消息時,Delta上的消息就會被轉移到Q3上,Q3空矛盾。如果Q2不空,則在Q3取出最后一條消息,如果Delta為空,則會將Q2的消息并入到Q3,與Q3為空矛盾。如果Q1不為空,則在Q3取出最后一條消息,如果Delta和Q3均為空時,則將Q1的消息轉移到Q4中,與Q4為空矛盾。這也解釋了另外一個問題,即為什么Q3和Delta為空,Q2就為空。
通常在負載正常時,如果消息被消費的速度不小于接收新消息的速度,對于不需要保證可靠不丟的消息極可能只會有Alpha狀態。對于durable=true的消息,它一定會進入gamma狀態,若開啟publish confirm機制,只有到了這個階段才會確認該消息已經被接受,若消息消費速度足夠快,內存也充足,這些消息也不會繼續走到下一狀態。
通常在系統負載較高時,已接受到的消息若不能很快被消費掉,這些消息就會進入到很深的隊列中去,增加處理每個消息的平均開銷。因為要花更多的時間和資源處理“積壓”的消息,所以用于處理新來的消息的能力就會降低,使得后來的消息又被積壓進入很深的隊列,繼續加大處理每個消息的平均開銷,這樣情況就會越來越惡化,使得系統的處理能力大大降低。
根據官網資料,應對這一問題,有三個措施:
鏡像隊列的結構
鏡像隊列基本上就是一個特殊的BackingQueue,它內部包裹了一個普通的BackingQueue做本地消息持久化處理,在此基礎上增加了將消息和ack復制到所有鏡像的功能。所有對mirror_queue_master的操作,會通過組播GM(下面會講到)的方式同步到各slave節點。GM負責消息的廣播,mirror_queue_slave負責回調處理,而master上的回調處理是由coordinator負責完成。mirror_queue_slave中包含了普通的BackingQueue進行消息的存儲,master節點中BackingQueue包含在mirror_queue_master中由AMQQueue進行調用。
消息的發布(除了Basic.Publish之外)與消費都是通過master節點完成。master節點對消息進行處理的同時將消息的處理動作通過GM廣播給所有的slave節點,slave節點的GM收到消息后,通過回調交由mirror_queue_slave進行實際的處理。
對于Basic.Publish,消息同時發送到master和所有slave上,如果此時master宕掉了,消息還發送slave上,這樣當slave提升為master的時候消息也不會丟失。
GM, Guarenteed Multicast. GM模塊實現的一種可靠的組播通訊協議,該協議能夠保證組播消息的原子性,即保證組中活著的節點要么都收到消息要么都收不到。它的實現大致如下:
將所有的節點形成一個循環鏈表,每個節點都會監控位于自己左右兩邊的節點,當有節點新增時,相鄰的節點保證當前廣播的消息會復制到新的節點上;當有節點失效時,相鄰的節點會接管保證本次廣播的消息會復制到所有的節點。在master節點和slave節點上的這些gm形成一個group,group(gm_group)的信息會記錄在mnesia中。不同的鏡像隊列形成不同的group。消息從master節點對于的gm發出后,順著鏈表依次傳送到所有的節點,由于所有節點組成一個循環鏈表,master節點對應的gm最終會收到自己發送的消息,這個時候master節點就知道消息已經復制到所有的slave節點了。
新增節點
新節點的加入過程如下圖所示:
每當一個節點加入或者重新加入(例如從網絡分區中恢復過來)鏡像隊列,之前保存的隊列內容會被清空。
節點的失效
如果某個slave失效了,系統處理做些記錄外幾乎啥都不做:master依舊是master,客戶端不需要采取任何行動,或者被通知slave失效。
如果master失效了,那么slave中的一個必須被選中為master。被選中作為新的master的slave通常是最老的那個,因為最老的slave與前任master之間的同步狀態應該是最好的。然而,需要注意的是,如果存在沒有任何一個slave與master完全同步的情況,那么前任master中未被同步的消息將會丟失。
消息的同步
將新節點加入已存在的鏡像隊列是,默認情況下ha-sync-mode=manual,鏡像隊列中的消息不會主動同步到新節點,除非顯式調用同步命令。當調用同步命令后,隊列開始阻塞,無法對其進行操作,直到同步完畢。當ha-sync-mode=automatic時,新加入節點時會默認同步已知的鏡像隊列。由于同步過程的限制,所以不建議在生產的active隊列(有生產消費消息)中操作。
可以使用下面的命令來查看那些slaves已經完成同步:
rabbitmqctl list_queues name slave_pids synchronised_slave_pids可以通過手動的方式同步一個queue:
rabbitmqctl sync_queue name同樣也可以取消某個queue的同步功能:
rabbitmqctl cancel_sync_queue name當然這些都可以通過management插件來設置。
補充要點
鏡像隊列不能作為負載均衡使用,因為每個操作在所有節點都要做一遍。
ha-mode參數和durable declare對exclusive隊列都并不生效,因為exclusive隊列是連接獨占的,當連接斷開,隊列自動刪除。所以實際上這兩個參數對exclusive隊列沒有意義。
當所有slave都出在(與master)未同步狀態時,并且ha-promote-on-shutdown設置為when-synced(默認)時,如果master因為主動的原因停掉,比如是通過rabbitmqctl stop命令停止或者優雅關閉OS,那么slave不會接管master,也就是此時鏡像隊列不可用;但是如果master因為被動原因停掉,比如VM或者OS crash了,那么slave會接管master。這個配置項隱含的價值取向是保證消息可靠不丟失,放棄可用性。如果ha-promote-on-shutdown設置為always,那么不論master因為何種原因停止,slave都會接管master,優先保證可用性。
鏡像隊列中最后一個停止的節點會是master,啟動順序必須是master先啟動,如果slave先啟動,它會有30s的等待時間,等待master的啟動,然后加入cluster中(如果30s內master沒有啟動,slave會自動停止)。當所有節點因故(斷電等)同時離線時,每個節點都認為自己不是最后一個停止的節點。要恢復鏡像隊列,可以嘗試在30s之內啟動所有節點。
對于鏡像隊列,客戶端Basic.Publish操作會同步到所有節點(消息同時發送到master和所有slave上,如果此時master宕掉了,消息還發送slave上,這樣當slave提升為master的時候消息也不會丟失),而其他操作則是通過master中轉,再由master將操作作用于slave。比如一個Basic.Get操作,假如客戶端與slave建立了TCP連接,首先是slave將Basic.Get請求發送至master,由master備好數據,返回至slave,投遞給消費者。
當slave宕掉了,除了與slave相連的客戶端連接全部斷開之外,沒有其他影響。
當master宕掉時,會有以下連鎖反應:
鏡像隊列的恢復
前提:兩個節點A和B組成以鏡像隊列。
場景1:A先停,B后停
該場景下B是master,只要先啟動B,再啟動A即可。或者先啟動A,再在30s之內啟動B即可恢復鏡像隊列。(如果沒有在30s內回復B,那么A自己就停掉自己)
場景2:A,B同時停
該場景下可能是由掉電等原因造成,只需在30s內聯系啟動A和B即可恢復鏡像隊列。
場景3:A先停,B后停,且A無法恢復。
因為B是master,所以等B起來后,在B節點上調用rabbitmqctl forget_cluster_node A以接觸A的cluster關系,再將新的slave節點加入B即可重新恢復鏡像隊列。
場景4:A先停,B后停,且B無法恢復
該場景比較難處理,舊版本的RabbitMQ沒有有效的解決辦法,在現在的版本中,因為B是master,所以直接啟動A是不行的,當A無法啟動時,也就沒版本在A節點上調用rabbitmqctl forget_cluster_node B了,新版本中forget_cluster_node支持-offline參數,offline參數允許rabbitmqctl在離線節點上執行forget_cluster_node命令,迫使RabbitMQ在未啟動的slave節點中選擇一個作為master。當在A節點執行rabbitmqctl forget_cluster_node -offline B時,RabbitMQ會mock一個節點代表A,執行forget_cluster_node命令將B提出cluster,然后A就能正常啟動了。最后將新的slave節點加入A即可重新恢復鏡像隊列
場景5:A先停,B后停,且A和B均無法恢復,但是能得到A或B的磁盤文件
這個場景更加難以處理。將A或B的數據庫文件($RabbitMQ_HOME/var/lib目錄中)copy至新節點C的目錄下,再將C的hostname改成A或者B的hostname。如果copy過來的是A節點磁盤文件,按場景4處理,如果拷貝過來的是B節點的磁盤文件,按場景3處理。最后將新的slave節點加入C即可重新恢復鏡像隊列。
場景6:A先停,B后停,且A和B均無法恢復,且無法得到A和B的磁盤文件
無解。
參考資料
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-mirror-queue/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的RabbitMQ之镜像队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [Conclusion]RabbitMQ
- 下一篇: (RabbitMQ) Java Clie