深入理解分布式消息队列
一、消息隊列的演進
分布式消息隊列中間件是是大型分布式系統中常見的中間件。消息隊列主要解決應用耦合、異步消息、流量削鋒等問題,具有高性能、高可用、可伸縮和最終一致性等特點。消息隊列已經逐漸成為企業應用系統內部通信的核心手段,使用較多的消息隊列有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、Pulsar 等,此外,利用數據庫(如 Redis、MySQL 等)也可實現消息隊列的部分基本功能。
1.基于 OS 的 MQ
單機消息隊列可以通過操作系統原生的進程間通信機制來實現,如消息隊列、共享內存等。比如我們可以在共享內存中維護一個雙端隊列:
消息產出進程不停地往隊列里添加消息,同時消息消費進程不斷地從隊尾有序地取出這些消息。添加消息的任務我們稱為 producer,而取出并使用消息的任務,我們稱之為 consumer。這種模式在早期單機多進程模式中比較常見, 比如 IO 進程把收到的網絡請求存入本機 MQ,任務處理進程從本機 MQ 中讀取任務并進行處理。
單機 MQ 易于實現,但是缺點也很明顯:因為依賴于單機 OS 的 IPC 機制,所以無法實現分布式的消息傳遞,并且消息隊列的容量也受限于單機資源。
2.基于 DB 的 MQ
即使用存儲組件(如 Mysql 、 Redis 等)存儲消息, 然后在消息的生產側和消費側實現消息的生產消費邏輯,從而實現 MQ 功能。以 Redis 為例, 可以使用 Redis 自帶的 list 實現。Redis list 使用 lpush 命令,從隊列左邊插入數據;使用 rpop 命令,從隊列右邊取出數據。與單機 MQ 相比, 該方案至少滿足了分布式, 但是仍然帶有很多無法接受的缺陷。
熱 key 性能問題:不論是用 codis 還是 twemproxy 這種集群方案,對某個隊列的讀寫請求最終都會落到同一臺 redis 實例上,并且無法通過擴容來解決問題。如果對某個 list 的并發讀寫非常高,就產生了無法解決的熱 key,嚴重可能導致系統崩潰
沒有消費確認機制:每當執行 rpop 消費一條數據,那條消息就被從 list 中永久刪除了。如果消費者消費失敗,這條消息也沒法找回了。
不支持多訂閱者:一條消息只能被一個消費者消費,rpop 之后就沒了。如果隊列中存儲的是應用的日志,對于同一條消息,監控系統需要消費它來進行可能的報警,BI 系統需要消費它來繪制報表,鏈路追蹤需要消費它來繪制調用關系……這種場景 redis list 就沒辦法支持了
不支持二次消費:一條消息 rpop 之后就沒了。如果消費者程序運行到一半發現代碼有 bug,修復之后想從頭再消費一次就不行了。
針對上述缺點,redis 5.0 開始引入 stream 數據類型,它是專門設計成為消息隊列的數據結構,借鑒了很多 kafka 的設計,但是隨著很多分布式 MQ 組件的出現,仍然顯得不夠友好, 畢竟 Redis 天生就不是用來做消息轉發的。
3. 專用分布式 MQ 中間件
隨著時代的發展,一個真正的消息隊列,已經不僅僅是一個隊列那么簡單了,業務對 MQ 的吞吐量、擴展性、穩定性、可靠性等都提出了嚴苛的要求。因此,專用的分布式消息中間件開始大量出現。常見的有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、Pulsar 等等。
二、消息隊列設計要點
消息隊列本質上是一個消息的轉發系統, 把一次 RPC 就可以直接完成的消息投遞,轉換成多次 RPC 間接完成,這其中包含兩個關鍵環節:
1.消息轉儲;
2.消息投遞:時機和對象;
基于此,消息隊列的整體設計思路是:
確定整體的數據流向:如 producer 發送給 MQ,MQ 轉發給 consumer,consumer 回復消費確認,消息刪除、消息備份等。
利用 RPC 將數據流串起來,最好基于現有的 RPC 框架,盡量做到無狀態,方便水平擴展。
存儲選型,綜合考慮性能、可靠性和開發維護成本等諸多因素。
消息投遞,消費模式 push、pull。
消費關系維護,單播、多播等,可以利用 zk、config server 等保存消費關系。
高級特性,如可靠投遞,重復消息,順序消息等, 很多高級特性之間是相互制約的關系,這里要充分結合應用場景做出取舍。
1.MQ 基本特性
RPC 通信
MQ 組件要實現和生產者以及消費者進行通信功能, 這里涉及到 RPC 通信問題。消息隊列的 RPC,和普通的 RPC 沒有本質區別。對于負載均衡、服務發現、序列化協議等等問題都可以借助現有 RPC 框架來實現,避免重復造輪子。
存儲系統
存儲可以做成很多方式。比如存儲在內存里,存儲在分布式 KV 里,存儲在磁盤里,存儲在數據庫里等等。但歸結起來,主要有持久化和非持久化兩種。
持久化的形式能更大程度地保證消息的可靠性(如斷電等不可抗外力),并且理論上能承載更大限度的消息堆積(外存的空間遠大于內存)。但并不是每種消息都需要持久化存儲。很多消息對于投遞性能的要求大于可靠性的要求,且數量極大(如日志)。這時候,消息不落地直接暫存內存,嘗試幾次 failover,最終投遞出去也未嘗不可。常見的消息隊列普遍兩種形式都支持。
從速度來看,理論上,文件系統>分布式 KV(持久化)>分布式文件系統>數據庫,而可靠性卻相反。還是要從支持的業務場景出發作出最合理的選擇。
高可用
MQ 的高可用,依賴于 RPC 和存儲的高可用。通常 RPC 服務自身都具有服務自動發現,負載均衡等功能,保證了其高可用。存儲的高可用, 例如 Kafka,使用分區加主備模式,保證每一個分區內的高可用性,也就是每一個分區至少要有一個備份且需要做數據的同步。
推拉模型
push 和 pull 模型各有利弊,兩種模式也都有被市面上成熟的消息中間件選用。
1.慢消費
慢消費是 push 模型最大的致命傷,如果消費者的速度比發送者的速度慢很多,會出現兩種惡劣的情況:
1.消息在 broker 的堆積。假設這些消息都是有用的無法丟棄的,消息就要一直在 broker 端保存。
2.broker 推送給 consumer 的消息 consumer 無法處理,此時 consumer 只能拒絕或者返回錯誤。
而 pull 模式下,consumer 可以按需消費,不用擔心自己處理不了的消息來騷擾自己,而 broker 堆積消息也會相對簡單,無需記錄每一個要發送消息的狀態,只需要維護所有消息的隊列和偏移量就可以了。所以對于慢消費,消息量有限且到來的速度不均勻的情況,pull 模式比較合適。
2.消息延遲與忙等
這是 pull 模式最大的短板。由于主動權在消費方,消費方無法準確地決定何時去拉取最新的消息。如果一次 pull 取到消息了還可以繼續去 pull,如果沒有 pull 取到則需要等待一段時間重新 pull。
消息投放時機
即消費者應該在什么時機消費消息。一般有以下三種方式:
攢夠了一定數量才投放。
到達了一定時間就投放。
有新的數據到來就投放。
至于如何選擇,也要結合具體的業務場景來決定。比如,對及時性要求高的數據,可用采用方式 3 來完成。
消息投放對象
不管是 JMS 規范中的 Topic/Queue,Kafka 里面的 Topic/Partition/ConsumerGroup,還是 AMQP(如 RabbitMQ)的 Exchange 等等, 都是為了維護消息的消費關系而抽象出來的概念。本質上,消息的消費無外乎點到點的一對一單播,或一對多廣播。另外比較特殊的情況是組間廣播、組內單播。比較通用的設計是,不同的組注冊不同的訂閱,支持組間廣播。組內不同的機器,如果注冊一個相同的 ID,則單播;如果注冊不同的 ID(如 IP 地址+端口),則廣播。
例如 pulsar 支持的訂閱模型有:
Exclusive:獨占型,一個訂閱只能有一個消息者消費消息。
Failover:災備型,一個訂閱同時只有一個消費者,可以有多個備份消費者。一旦主消費者故障則備份消費者接管。不會出現同時有兩個活躍的消費者。
Shared:共享型,一個訂閱中同時可以有多個消費者,多個消費者共享 Topic 中的消息。
Key_Shared:鍵共享型,多個消費者各取一部分消息。
通常會在公共存儲上維護廣播關系,如 config server、zookeeper 等。
2.隊列高級特性
常見的高級特性有可靠投遞、消息丟失、消息重復、事務等等,他們并非是 MQ 必備的特性。由于這些特性可能是相互制約的,所以不可能完全兼顧。所以要依照業務的需求,來仔細衡量各種特性實現的成本、利弊,最終做出最為合理的設計。
可靠投遞
如何保證消息完全不丟失?
直觀的方案是,在任何不可靠操作之前,先將消息落地,然后操作。當失敗或者不知道結果(比如超時)時,消息狀態是待發送,定時任務不停輪詢所有待發送消息,最終一定可以送達。但是,這樣必然導致消息可能會重復,并且在異常情況下,消息延遲較大。
例如:
producer 往 broker 發送消息之前,需要做一次落地。
請求到 server 后,server 確保數據落地后再告訴客戶端發送成功。
支持廣播的消息隊列需要對每個接收者,持久化一個發送狀態,直到所有接收者都確認收到,才可刪除消息。
即對于任何不能確認消息已送達的情況,都要重推消息。但是,隨著而來的問題就是消息重復。在消息重復和消息丟失之間,無法兼顧,要結合應用場景做出取舍。
消費確認
當 broker 把消息投遞給消費者后,消費者可以立即確認收到了消息。但是,有些情況消費者可能需要再次接收該消息(比如收到消息、但是處理失敗),即消費者主動要求重發消息。所以,要允許消費者主動進行消費確認。
順序消息
對于 push 模式,要求支持分區且單分區只支持一個消費者消費,并且消費者只有確認一個消息消費后才能 push 另外一個消息,還要發送者保證發送順序唯一。
對于 pull 模式,比如 kafka 的做法:
producer 對應 partition,并且單線程。
consumer 對應 partition,消費確認(或批量確認),單線程消費。
但是這樣也只是實現了消息的分區有序性,并不一定全局有序。總體而言,要求消息有序的 MQ 場景還是比較少的。
三、Kafka
Kafka 是一個分布式發布訂閱消息系統。它以高吞吐、可持久化、可水平擴展、支持流數據處理等多種特性而被廣泛使用(如 Storm、Spark、Flink)。在大數據系統中,數據需要在各個子系統中高性能、低延遲的不停流轉。傳統的企業消息系統并不是非常適合大規模的數據處理,但 Kafka 出現了,它可以高效的處理實時消息和離線消息,降低編程復雜度,使得各個子系統可以快速高效的進行數據流轉,Kafka 承擔高速數據總線的作用。
kafka 基礎概念
BrokerKafka 集群包含一個或多個服務器,這種服務器被稱為 broker。
TopicTopic 在邏輯上可以被認為是一個 queue,每條消費都必須指定它的 Topic,可以簡單理解為必須指明把這條消息放進哪個 queue 里。為了使得 Kafka 的吞吐率可以線性提高,物理上把 Topic 分成一個或多個 Partition,每個 Partition 在物理上對應一個文件夾,該文件夾下存儲這個 Partition 的所有消息和索引文件。
PartitionParition 是物理上的概念,每個 Topic 包含一個或多個 Partition。
Producer負責發布消息到 Kafka broker。
Consumer消息消費者,向 Kafka broker 讀取消息的客戶端。
Consumer Group每個 Consumer 屬于一個特定的 Consumer Group(可為每個 Consumer 指定 group name,若不指定 group name 則屬于默認的 group)。
一個典型的 kafka 集群包含若干 Producer,若干個 Broker(kafka 支持水平擴展)、若干個 Consumer Group,以及一個 zookeeper 集群。Producer 使用 push 模式將消息發布到 broker。consumer 使用 pull 模式從 broker 訂閱并消費消息。多個 broker 協同工作,producer 和 consumer 部署在各個業務邏輯中。kafka 通過 zookeeper 管理集群配置及服務協同。
這樣就組成了一個高性能的分布式消息發布和訂閱系統。Kafka 有一個細節是和其他 mq 中間件不同的點,producer 發送消息到 broker 的過程是 push,而 consumer 從 broker 消費消息的過程是 pull,主動去拉數據。而不是 broker 把數據主動發送給 consumer。
Producer 發送消息到 broker 時,會根據 Paritition 機制選擇將其存儲到哪一個 Partition。如果 Partition 機制設置合理,所有消息可以均勻分布到不同的 Partition 里,這樣就實現了負載均衡。如果一個 Topic 對應一個文件,那這個文件所在的機器 I/O 將會成為這個 Topic 的性能瓶頸,而有了 Partition 后,不同的消息可以并行寫入不同 broker 的不同 Partition 里,極大的提高了吞吐率。
Kafka 特點
優點:
高性能:單機測試能達到 100w tps
低延時:生產和消費的延時都很低,e2e 的延時在正常的 cluster 中也很低
可用性高:replicate+ isr + 選舉 機制保證
工具鏈成熟:監控 運維 管理 方案齊全
生態成熟:大數據場景必不可少 kafka stream
不足:
無法彈性擴容:對 partition 的讀寫都在 partition leader 所在的 broker,如果該 broker 壓力過大,也無法通過新增 broker 來解決問題
擴容成本高:集群中新增的 broker 只會處理新 topic,如果要分擔老 topic-partition 的壓力,需要手動遷移 partition,這時會占用大量集群帶寬
消費者新加入和退出會造成整個消費組 rebalance:導致數據重復消費,影響消費速度,增加延遲
partition 過多會使得性能顯著下降:ZK 壓力大,broker 上 partition 過多讓磁盤順序寫幾乎退化成隨機寫
高吞吐機制
順序存取
如果把消息以隨機的方式寫入到磁盤,那么磁盤首先要做的就是尋址,也就是定位到數據所在的物理地址,在磁盤上就要找到對應柱面、磁頭以及對應的扇區;這個過程相對內存來說會消耗大量時間,為了規避隨機讀寫帶來的時間消耗,kafka 采用順序寫的方式存儲數據。
頁緩存
即使是順序存取,但是頻繁的 I/O 操作仍然會造成磁盤的性能瓶頸,所以 kafka 使用了頁緩存和零拷貝技術。當進程準備讀取磁盤上的文件內容時, 操作系統會先查看待讀取的數據是否在頁緩存中,如果存在則直接返回數據, 從而避免了對物理磁盤的 I/O 操作;
如果沒有命中, 則操作系統會向磁盤發起讀取請求并將讀取的數據頁存入頁緩存, 之后再將數據返回給進程。一個進程需要將數據寫入磁盤, 那么操作系統也會檢測數據對應的頁是否在頁緩存中,如果不存在, 則會先在頁緩存中添加相應的頁, 最后將數據寫入對應的頁。被修改過后的頁也就變成了臟頁, 操作系統會在合適的時間把臟頁中的數據寫入磁盤, 以保持數據的 一 致性。
Kafka 中大量使用了頁緩存, 這是 Kafka 實現高吞吐的重要因素之 一 。雖然消息都是先被寫入頁緩存,然后由操作系統負責具體的刷盤任務的, 但在 Kafka 中同樣提供了同步刷盤及間斷性強制刷盤(fsync),可以通過參數來控制。
同步刷盤能夠保證消息的可靠性,避免因為宕機導致頁緩存數據還未完成同步時造成的數據丟失。但是實際使用上,我們沒必要去考慮這樣的因素以及這種問題帶來的損失,消息可靠性可以由多副本來解決,同步刷盤會帶來性能的影響。
頁緩存的好處:
I/O Scheduler 會將連續的小塊寫組裝成大塊的物理寫從而提高性能;
I/O Scheduler 會嘗試將一些寫操作重新按順序排好,從而減少磁頭移動時間;
充分利用所有空閑內存(非 JVM 內存);
讀操作可以直接在 Page Cache 內進行,如果消費和生產速度相當,甚至不需要通過物理磁盤交換數據;
如果進程重啟,JVM 內的 Cache 會失效,但 Page Cache 仍然可用。
零拷貝
零拷貝技術可以減少 CPU 的上下文切換和數據拷貝次數。
常規方式
應用程序一次常規的數據請求過程,發生了 4 次拷貝,2 次 DMA 和 2 次 CPU,而 CPU 發生了 4 次的切換。(DMA 簡單理解就是,在進行 I/O 設備和內存的數據傳輸的時候,數據搬運的工作全部交給 DMA 控制器,而 CPU 不再參與任何與數據搬運相關的事情)
零拷貝的方式
通過零拷貝優化,CPU 只發生了 2 次的上下文切換和 3 次數據拷貝。
批量發送
Kafka 允許進行批量發送消息,先將消息緩存在內存中,然后一次請求批量發送出去,這種策略將大大減少服務端的 I/O 次數。
數據壓縮
Kafka 還支持對消息集合進行壓縮,Producer 可以通過 GZIP 或 Snappy 格式對消息集合進行壓縮,Producer 壓縮之后,在 Consumer 需進行解壓,雖然增加了 CPU 的工作,但在對大數據處理上,瓶頸在網絡上而不是 CPU,所以這個成本很值得。
高可用機制
副本
Producer 在發布消息到某個 Partition 時,先通過 ZooKeeper 找到該 Partition 的 Leader,然后無論該 Topic 的 Replication Factor 為多少,Producer 只將該消息發送到該 Partition 的 Leader。Leader 會將該消息寫入其本地 Log。
每個 Follower 都從 Leader pull 數據。這種方式上,Follower 存儲的數據順序與 Leader 保持一致。Follower 在收到該消息后,向 Leader 發送 ACK, 并把消息寫入其 Log。一旦 Leader 收到了 ISR 中的所有 Replica 的 ACK,該消息就被認為已經 commit 了,Leader 將增加 HW 并且向 Producer 發送 ACK。
為了提高性能,每個 Follower 在接收到數據后就立馬向 Leader 發送 ACK,而非等到數據寫入 Log 中。因此,對于已經 commit 的消息,Kafka 只能保證它被存于多個 Replica 的內存中,而不能保證它們被持久化到磁盤中,也就不能完全保證異常發生后該條消息一定能被 Consumer 消費。Consumer 讀消息也是從 Leader 讀取,只有被 commit 過的消息才會暴露給 Consumer。Kafka Replication 的數據流如下圖所示:
對于 Kafka 而言,定義一個 Broker 是否“活著”包含兩個條件:
一是它必須維護與 ZooKeeper 的 session(這個通過 ZooKeeper 的 Heartbeat 機制來實現)。
二是 Follower 必須能夠及時將 Leader 的消息復制過來,不能“落后太多”。
Leader 會跟蹤與其保持同步的 Replica 列表,該列表稱為 ISR(即 in-sync Replica)。如果一個 Follower 宕機,或者落后太多,Leader 將把它從 ISR 中移除。這里所描述的“落后太多”指 Follower 復制的消息落后于 Leader 后的條數超過預定值或者 Follower 超過一定時間未向 Leader 發送 fetch 請求。Kafka 的復制機制既不是完全的同步復制,也不是單純的異步復制。
完全同步復制要求所有能工作的 Follower 都復制完,這條消息才會被認為 commit,這種復制方式極大的影響了吞吐率(高吞吐率是 Kafka 非常重要的一個特性)。異步復制方式下,Follower 異步的從 Leader 復制數據,數據只要被 Leader 寫入 log 就被認為已經 commit,這種情況下如果 Follower 都復制完都落后于 Leader,而如果 Leader 突然宕機,則會丟失數據。而 Kafka 的這種使用 ISR 的方式則很好的均衡了確保數據不丟失以及吞吐率。Follower 可以批量的從 Leader 復制數據,這樣極大的提高復制性能(批量寫磁盤),極大減少了 Follower 與 Leader 的差距。
一條消息只有被 ISR 里的所有 Follower 都從 Leader 復制過去才會被認為已提交。這樣就避免了部分數據被寫進了 Leader,還沒來得及被任何 Follower 復制就宕機了,而造成數據丟失(Consumer 無法消費這些數據)。而對于 Producer 而言,它可以選擇是否等待消息 commit。這種機制確保了只要 ISR 有一個或以上的 Follower,一條被 commit 的消息就不會丟失。
故障恢復
Leader 故障
leader 發生故障后,會從 ISR 中選出一個新的 leader,之后,為保證多個副本之間的數據一致性,其余的 follower 會先將各自的 log 文件高于 HW 的部分截掉,然后從新的 leader 同步數據。注意:這只能保證副本之間的數據一致性,并不能保證數據不丟失或者不重復。
Kafka 在 ZooKeeper 中動態維護了一個 ISR(in-sync replicas),這個 ISR 里的所有 Replica 都跟上了 leader,只有 ISR 里的成員才有被選為 Leader 的可能。在這種模式下,對于 f+1 個 Replica,一個 Partition 能在保證不丟失已經 commit 的消息的前提下容忍 f 個 Replica 的失敗。
LEO:每個副本最大的 offset。
HW:消費者能見到的最大的 offset,ISR 隊列中最小的 LEO。
Follower 故障
follower 發生故障后會被臨時踢出 ISR 集合,待該 follower 恢復后,follower 會 讀取本地磁盤記錄的上次的 HW,并將 log 文件高于 HW 的部分截取掉,從 HW 開始向 leader 進行同步數據操作。等該 follower 的 LEO 大于等于該 partition 的 HW,即 follower 追上 leader 后,就可以重新加入 ISR 了。
擴展性
由于 Broker 存儲著特定分區的數據, 因此,不管是 Broker 還是分區的擴縮容,都是比較復雜的,屬于典型的“有狀態服務”擴縮容問題。接下來,我們看一下 Pulsar 是怎么針對 kafka 的不足進行優化的。
四、Pulsar
Apache Pulsar 是 Apache 軟件基金會頂級項目,是下一代云原生分布式消息流平臺,集消息、存儲、輕量化函數式計算為一體。采用計算與存儲分離架構設計,支持多租戶、持久化存儲、多機房跨區域數據復制,具有強一致性、高吞吐、低延時及高可擴展性等流數據存儲特性。在消息領域,Pulsar 是第一個將存儲計算分離云原生架構落地的開源項目。
服務和存儲分離
在 kafka 的基礎上,把數據存儲功能從 Broker 中分離出來,Broker 僅面向生產者、消費者提供數據讀寫能力,但其自身并不存儲數據。而在 Broker 層下面使用 Bookie 作為存儲層,承擔具體的數據存儲職責。在 Pulsar 中,broker 的含義和 kafka 中的 broker 是一致的,就是一個運行的 Pulsar 實例, 提供多個分區的讀寫服務。由于 broker 層不在承擔數據存儲職責,使得 Broker 層成為無狀態服務。這樣一來,Broker 的擴縮容就變得非常簡單。
相比之下,服務存儲集于一體的 Kafka 就非常難以擴容。
Broker 和 Bookie 相互獨立,方便實現獨立的擴展以及獨立的容錯
Broker 無狀態,便于快速上、下線,更加適合于云原生場景
分區存儲不受限于單個節點存儲容量
Bookie 數據分布均勻
分片存儲
1.在 Kafka 分區(Partition)概念的基礎上,按照時間或大小,把分區切分成分片(Segment)。
2.同一個分區的分片,分散存儲在集群中所有的 Bookie 節點上。
3.同一個分片,擁有多個副本,副本數量可以指定,存儲于不同的 Bookie 節點。
Pulsar 性能
和 Kafka 一樣,Pulsar 也使用了順序讀寫和零拷貝等技術來提高系統的性能。
此外,Pulsar 還設計了分層緩存機制,在服務層和存儲層都做了分層緩存,來提高性能。
生產者發送消息時,調用 Bookie 層寫入消息時,同時將消息寫入 broker 緩存中。
實時消費時(追尾讀),首先從 broker 緩存中讀取數據,避免從持久層 bookie 中讀取,從而降低投遞延遲。
讀取歷史消息(追趕讀)場景中,bookie 會將磁盤消息讀入 bookie 讀緩存中,從而避免每次都讀取磁盤數據,降低讀取延時。
Pulsar 擴展性
分片存儲解決了分區容量受單節點存儲空間限制的問題,當容量不夠時,可以通過擴容 Bookie 節點的方式支撐更多的分區數據,也解決了分區數據傾斜問題,數據可以均勻的分配在 Bookie 節點上。
Broker 和 Bookie 靈活的容錯以及無縫的擴容能力讓 Apache Pulsar 具備非常高的可用性,實現了無限制的分區存儲。
Broker 擴展
在 Pulsar 中 Broker 是無狀態的,可以通過增加節點的方式實現快速擴容。當需要支持更多的消費者或生產者時,可以簡單地添加更多的 Broker 節點來滿足業務需求。Pulsar 支持自動的分區負載均衡,在 Broker 節點的資源使用率達到閾值時,會將負載遷移到負載較低的 Broker 節點。新增 Broker 節點時,分區也將在 Brokers 中做平衡遷移,一些分區的所有權會轉移到新的 Broker 節點。
Bookie 擴展
存儲層的擴容,通過增加 Bookie 節點來實現。通過資源感知和數據放置策略,流量將自動切換到新的 Apache Bookie 中,整個過程不會涉及到不必要的數據搬遷。即擴容時,不會將舊數據從現有存儲節點重新復制到新存儲節點。
如圖所示,起始狀態有四個存儲節點,Bookie1, Bookie2, Bookie3, Bookie4,以 Topic1-Part2 為例,當這個分區的最新的存儲分片是 SegmentX 時,對存儲層擴容,添加了新的 Bookie 節點,BookieX,BookieY,那么當存儲分片滾動之后,新生成的存儲分片, SegmentX+1,SegmentX+2,會優先選擇新的 Bookie 節點(BookieX,BookieY)來保存數據。
Pulsar 可用性
Broker 容錯
如下圖,假設當存儲分片滾動到 SegmentX 時,Broker2 節點失敗。此時生產者和消費者向其他的 Broker 發起請求,這個過程會觸發分區的所有權轉移,即將 Broker2 擁有的分區 Topic1-Part2 的所有權轉移到其他的 Broker(Broker3)。
由于數據存儲和數據服務分離,所以新 Broker 接管分區的所有權時,它不需要復制 Partiton 的數據。新的分區 Owner(Broker3)會產生一個新的分片 SegmentX+1, 如果有新數據到來,會存儲在新的分片 Segment x+1 上,不會影響分區的可用性。
即當某個 Broker 實例故障時,整個集群的消息存儲能力仍然完好。此時,集群只是喪失了特定分區的消息服務,只需要把這些分區的服務權限分配給其他 Broker 即可。
注意,和 Kafka 一樣, Pulsar 的一個分區仍然只能由一個 Broker 提供服務,否則無法保證消息的分區有序性。
Bookie 容錯
如下圖,假設 Bookie 2 上的 Segment 4 損壞。Bookie Auditor 會檢測到這個錯誤并進行復制修復。Bookie 中的副本修復是 Segment 級別的多對多快速修復,BookKeeper 可以從 Bookie 3 和 Bookie 4 讀取 Segment 4 中的消息,并在 Bookie 1 處修復 Segment 4。如果是 Bookie 節點故障,這個 Bookie 節點上所有的 Segment 會按照上述方式復制到其他的 Bookie 節點。
所有的副本修復都在后臺進行,對 Broker 和應用透明,Broker 會產生新的 Segment 來處理寫入請求,不會影響分區的可用性。
Pulsar 其他特性
基于上述的設計特點,Pulsar 提供了很多特性。
讀寫分離
Pulsar 另外一個有吸引力的特性是提供了讀寫分離的能力,讀寫分離保證了在有大量滯后消費(磁盤 IO 會增加)時,不會影響服務的正常運行,尤其是不會影響到數據的寫入。讀寫分離的能力由 Bookie 提供,簡單說一下 Bookie 存儲涉及到的概念:
Journals:Journal 文件包含了 Bookie 事務日志,在 Ledger (可以認為是分片的一部分) 更新之前,Journal 保證描述更新的事務寫入到 Non-volatile 的存儲介質上;
Entry logger:Entry 日志文件管理寫入的 Entry,來自不同 ledger 的 entry 會被聚合然后順序寫入;
Index files:每個 Ledger 都有一個對應的索引文件,記錄數據在 Entry 日志文件中的 Offset 信息。
Entry 的讀寫入過程下圖所示,數據的寫入流程:
數據首先會寫入 Journal,寫入 Journal 的數據會實時落到磁盤;
然后,數據寫入到 Memtable ,Memtable 是讀寫緩存;
寫入 Memtable 之后,對寫入請求進行響應;
Memtable 寫滿之后,會 Flush 到 Entry Logger 和 Index cache,Entry Logger 中保存了數據,Index cache 保存了數據的索引信息,然后由后臺線程將 Entry Logger 和 Index cache 數據落到磁盤。
數據的讀取流程:
如果是 Tailing read 請求,直接從 Memtable 中讀取 Entry;
如果是 Catch-up read(滯后消費)請求,先讀取 Index 信息,然后索引從 Entry Logger 文件讀取 Entry。
一般在進行 Bookie 的配置時,會將 Journal 和 Ledger 存儲磁盤進行隔離,減少 Ledger 對于 Journal 寫入的影響,并且推薦 Journal 使用性能較好的 SSD 磁盤,讀寫分離主要體現在:
寫入 Entry 時,Journal 中的數據需要實時寫到磁盤,Ledger 的數據不需要實時落盤,通過后臺線程批量落盤,因此寫入的性能主要受到 Journal 磁盤的影響;
讀取 Entry 時,首先從 Memtable 讀取,命中則返回;如果不命中,再從 Ledger 磁盤中讀取,所以對于 Catch-up read 的場景,讀取數據會影響 Ledger 磁盤的 IO,對 Journal 磁盤沒有影響,也就不會影響到數據的寫入。
所以,數據寫入是主要是受 Journal 磁盤的負載影響,不會受 Ledger 磁盤的影響。另外,Segment 存儲的多個副本都可以提供讀取服務,相比于主從副本的設計,Apache Pulsar 可以提供更好的數據讀取能力。
通過以上分析,Apache Pulsar 使用 Apache BookKeeper 作為數據存儲,可以帶來下列的收益:
支持將多個 Ledger 的數據寫入到同一個 Entry logger 文件,可以避免分區膨脹帶來的性能下降問題
支持讀寫分離,可以在滯后消費場景導致磁盤 IO 上升時,保證數據寫入的不受影響
支持全副本讀取,可以充分利用存儲副本的數據讀取能力
多種消費模型
Pulsar 提供了多種訂閱方式來消費消息,分為三種類型:獨占(Exclusive),故障切換(Failover)或共享(Share)。
Exclusive 獨占訂閱 :在任何時間,一個消費者組(訂閱)中有且只有一個消費者來消費 Topic 中的消息。
Failover 故障切換:多個消費者(Consumer)可以附加到同一訂閱。但是,一個訂閱中的所有消費者,只會有一個消費者被選為該訂閱的主消費者。其他消費者將被指定為故障轉移消費者。當主消費者斷開連接時,分區將被重新分配給其中一個故障轉移消費者,而新分配的消費者將成為新的主消費者。發生這種情況時,所有未確認(ack)的消息都將傳遞給新的主消費者。
Share 共享訂閱:使用共享訂閱,在同一個訂閱背后,用戶按照應用的需求掛載任意多的消費者。訂閱中的所有消息以循環分發形式發送給訂閱背后的多個消費者,并且一個消息僅傳遞給一個消費者。
當消費者斷開連接時,所有傳遞給它但是未被確認(ack)的消息將被重新分配和組織,以便發送給該訂閱上剩余的剩余消費者。
多種 ACK 模型
消息確認(ACK)的目的就是保證當發生故障后,消費者能夠從上一次停止的地方恢復消費,保證既不會丟失消息,也不會重復處理已經確認(ACK)的消息。在 Pulsar 中,每個訂閱中都使用一個專門的數據結構–游標(Cursor)來跟蹤訂閱中的每條消息的確認(ACK)狀態。每當消費者在分區上確認消息時,游標都會更新。
Pulsar 提供兩種消息確認方法:
單條確認(Individual Ack),單獨確認一條消息。被確認后的消息將不會被重新傳遞
累積確認(Cumulative Ack),通過累積確認,消費者只需要確認它收到的最后一條消息
上圖說明了單條確認和累積確認的差異(灰色框中的消息被確認并且不會被重新傳遞)。對于累計確認,M12 之前的消息被標記為 Acked。對于單獨進行 ACK,僅確認消息 M7 和 M12, 在消費者失敗的情況下,除了 M7 和 M12 之外,其他所有消息將被重新傳送。
直播精彩分享
- END -
看完一鍵三連在看,轉發,點贊
是對文章最大的贊賞,極客重生感謝你
推薦閱讀
深入理解虛擬化
深入理解零拷貝技術
后端技術趨勢指南|如何選擇自己的技術方向
總結
以上是生活随笔為你收集整理的深入理解分布式消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解虚拟化
- 下一篇: 今年你参与开源了吗?