kafka消费组与重平衡机制详解
1.消費者組
1.1 介紹
消費者組,即 Consumer Group,應該算是 Kafka 比較有亮點的設計了。
那么何謂 Consumer Group 呢?
Consumer Group 是 Kafka 提供的可擴展且具有容錯性的消費者機制。既然是一個組,那么組內必然可以有多個消費者或消費者實例(Consumer Instance),它們共享一個公共的 ID,這個 ID 被稱為 Group ID。組內的所有消費者協調在一起來消費訂閱主題(Subscribed Topics)的所有分區(Partition)。當然,每個分區只能由同一個消費者組內的一個 Consumer 實例來消費。
大概可以總結為以下三點:
-
Consumer Group 下可以有一個或多個 Consumer 實例。這里的實例可以是一個單獨的進程,也可以是同一進程下的線程。在實際場景中,使用進程更為常見一些。
-
Group ID 是一個字符串,在一個 Kafka 集群中,它標識唯一的一個 Consumer Group。
-
Consumer Group 下所有實例訂閱的主題的單個分區,只能分配給組內的某個 Consumer 實例消費。這個分區當然也可以被其他的 Group 消費。
Consumer Group 之間彼此獨立,互不影響,它們能夠訂閱相同的一組主題而互不干涉。kafka可以利用這一機制,同時實現消息引擎的兩大模型:點對點模型和發布/訂閱模型:如果所有實例都屬于同一個 Group,那么它實現的就是消息隊列模型;如果所有實例分別屬于不同的 Group,那么它實現的就是發布 / 訂閱模型。
1.2 實例數量
在實際使用場景中,我怎么知道一個 Group 下該有多少個 Consumer 實例呢?理想情況下,Consumer 實例的數量應該等于該 Group 訂閱主題的分區總數。
舉個簡單的例子,假設一個 Consumer Group 訂閱了 3 個主題,分別是 A、B、C,它們的分區數依次是 1、2、3,那么通常情況下,為該 Group 設置 6 個 Consumer 實例是比較理想的情形,因為它能最大限度地實現高伸縮性。
如果你有 3 個實例,那么平均下來每個實例大約消費 2 個分區(6 / 3 = 2);如果你設置了 8 個實例,那么很遺憾,有 2 個實例(8 – 6 = 2)將不會被分配任何分區,它們永遠處于空閑狀態。因此,在實際使用過程中一般不推薦設置大于總分區數的 Consumer 實例。設置多余的實例只會浪費資源,而沒有任何好處。
2.重平衡機制
2.1 介紹
Rebalance 本質上是一種協議,規定了一個 Consumer Group 下的所有 Consumer 如何達成一致,來分配訂閱 Topic 的每個分區。比如某個 Group 下有 20 個 Consumer 實例,它訂閱了一個具有 100 個分區的 Topic。正常情況下,Kafka 平均會為每個 Consumer 分配 5 個分區。這個分配的過程就叫 Rebalance。
那么 Consumer Group 何時進行 Rebalance 呢?Rebalance 的觸發條件有 3 個。
組成員數發生變更。比如有新的 Consumer 實例加入組或者離開組,抑或是有 Consumer 實例崩潰被“踢出”組。
訂閱主題數發生變更。Consumer Group 可以使用正則表達式的方式訂閱主題,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明該 Group 訂閱所有以字母 t 開頭、字母 c 結尾的主題。在 Consumer Group 的運行過程中,你新創建了一個滿足這樣條件的主題,那么該 Group 就會發生 Rebalance。
訂閱主題的分區數發生變更。Kafka 當前只能允許增加一個主題的分區數。當分區數增加時,就會觸發訂閱該主題的所有 Group 開啟 Rebalance。
Rebalance 發生時,Group 下所有的 Consumer 實例都會協調在一起共同參與。你可能會問,每個 Consumer 實例怎么知道應該消費訂閱主題的哪些分區呢?這就需要分配策略的協助了。
當前 Kafka 默認提供了 3 種分配策略,每種策略都有一定的優勢和劣勢。
三種策略具體介紹:https://blog.csdn.net/fy_java1995/article/details/106405169
2.2 注意點
首先,Rebalance 過程對 Consumer Group 消費過程有極大的影響。如果你了解 JVM 的垃圾回收機制,你一定聽過萬物靜止的收集方式,即著名的 stop the world,簡稱 STW。
Java中Stop-The-World機制簡稱STW,是在執行垃圾收集算法時,Java應用程序的其他所有線程都被掛起(除了垃圾收集幫助器之外)。Java中一種全局暫?,F象,全局停頓,所有Java代碼停止,native代碼可以執行,但不能與JVM交互;這些現象多半是由于gc引起。
在 STW 期間,所有應用線程都會停止工作,表現為整個應用程序僵在那邊一動不動。Rebalance 過程也和這個類似,在 Rebalance 過程中,所有 Consumer 實例都會停止消費,等待 Rebalance 完成。這是 Rebalance 為人詬病的一個方面。
所以,我們應該盡量避免ReBalance。
在實際情況中,大部分情況下,都是由于Consumer實例的增加或減少導致的ReBalance。
當 Consumer Group 完成 Rebalance 之后,每個 Consumer 實例都會定期地向 Coordinator 發送心跳請求,表明它還存活著。如果某個 Consumer 實例不能及時地發送這些心跳請求,Coordinator 就會認為該 Consumer 已經“死”了,從而將其從 Group 中移除,然后開啟新一輪 Rebalance。Consumer 端有個參數,叫 session.timeout.ms,就是被用來表征此事的。該參數的默認值是 10 秒,即如果 Coordinator 在 10 秒之內沒有收到 Group 下某 Consumer 實例的心跳,它就會認為這個 Consumer 實例已經掛了。可以這么說,session.timeout.ms 決定了 Consumer 存活性的時間間隔。
除了這個參數,Consumer 還提供了一個允許你控制發送心跳請求頻率的參數,就是 heartbeat.interval.ms。這個值設置得越小,Consumer 實例發送心跳請求的頻率就越高。頻繁地發送心跳請求會額外消耗帶寬資源,但好處是能夠更加快速地知曉當前是否開啟 Rebalance,因為,目前 Coordinator 通知各個 Consumer 實例開啟 Rebalance 的方法,就是將 REBALANCE_NEEDED 標志封裝進心跳請求的響應體中。
除了以上兩個參數,Consumer 端還有一個參數,用于控制 Consumer 實際消費能力對 Rebalance 的影響,即 max.poll.interval.ms 參數。它限定了 Consumer 端應用程序兩次調用 poll 方法的最大時間間隔。它的默認值是 5 分鐘,表示你的 Consumer 程序如果在 5 分鐘之內無法消費完 poll 方法返回的消息,那么 Consumer 會主動發起“離開組”的請求,Coordinator 也會開啟新一輪 Rebalance。
2.3 如何通知到其它消費者
重平衡過程是如何通知到其他消費者實例的?答案就是,靠消費者端的心跳線程(Heartbeat Thread)。
Kafka Java 消費者需要定期地發送心跳請求(Heartbeat Request)到 Broker 端的協調者,以表明它還存活著。在 Kafka 0.10.1.0 版本之前,發送心跳請求是在消費者主線程完成的,也就是你寫代碼調用 KafkaConsumer.poll 方法的那個線程。
這樣做有諸多弊病,最大的問題在于,消息處理邏輯也是在這個線程中完成的。因此,一旦消息處理消耗了過長的時間,心跳請求將無法及時發到協調者那里,導致協調者“錯誤地”認為該消費者已“死”。自 0.10.1.0 版本開始,社區引入了一個單獨的心跳線程來專門執行心跳請求發送,避免了這個問題。
但這和重平衡又有什么關系呢?其實,重平衡的通知機制正是通過心跳線程來完成的。當協調者決定開啟新一輪重平衡后,它會將“REBALANCE_IN_PROGRESS”封裝進心跳請求的響應中,發還給消費者實例。當消費者實例發現心跳響應中包含了“REBALANCE_IN_PROGRESS”,就能立馬知道重平衡又開始了,這就是重平衡的通知機制。
重平衡一旦開啟,Broker 端的協調者組件就要開始忙了,主要涉及到控制消費者組的狀態流轉。當前,Kafka 設計了一套消費者組狀態機(State Machine),來幫助協調者完成整個重平衡流程。嚴格來說,這套狀態機屬于非常底層的設計,Kafka 官網上壓根就沒有提到過,但你最好還是了解一下,因為它能夠幫助你搞懂消費者組的設計原理,比如消費者組的過期位移(Expired Offsets)刪除等。
目前,Kafka 為消費者組定義了 5 種狀態,它們分別是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。那么,這 5 種狀態的含義是什么呢?我們一起來看看下面這張表格。
?
狀態流轉圖如下:
?
一個消費者組最開始是 Empty 狀態,當重平衡過程開啟后,它會被置于 PreparingRebalance 狀態等待成員加入,之后變更到 CompletingRebalance 狀態等待分配方案,最后流轉到 Stable 狀態完成重平衡。
當有新成員加入或已有成員退出時,消費者組的狀態從 Stable 直接跳到 PreparingRebalance 狀態,此時,所有現存成員就必須重新申請加入組。當所有成員都退出組后,消費者組狀態變更為 Empty。Kafka 定期自動刪除過期位移的條件就是,組要處于 Empty 狀態。因此,如果你的消費者組停掉了很長時間(超過 7 天),那么 Kafka 很可能就把該組的位移數據刪除了。
重平衡的完整流程需要消費者端和協調者組件共同參與才能完成。我們先從消費者的視角來審視一下重平衡的流程。
2.4 消費者端重平衡流程
在消費者端,重平衡分為兩個步驟:分別是加入組和等待領導者消費者(Leader Consumer)分配方案。這兩個步驟分別對應兩類特定的請求:JoinGroup 請求和 SyncGroup 請求。
當組內成員加入組時,它會向協調者發送 JoinGroup 請求。在該請求中,每個成員都要將自己訂閱的主題上報,這樣協調者就能收集到所有成員的訂閱信息。一旦收集了全部成員的 JoinGroup 請求后,協調者會從這些成員中選擇一個擔任這個消費者組的領導者。
通常情況下,第一個發送 JoinGroup 請求的成員自動成為領導者。你一定要注意區分這里的領導者和之前我們介紹的領導者副本,它們不是一個概念。這里的領導者是具體的消費者實例,它既不是副本,也不是協調者。領導者消費者的任務是收集所有成員的訂閱信息,然后根據這些信息,制定具體的分區消費分配方案。
選出領導者之后,協調者會把消費者組訂閱信息封裝進 JoinGroup 請求的響應體中,然后發給領導者,由領導者統一做出分配方案后,進入到下一步:發送 SyncGroup 請求。
在這一步中,領導者向協調者發送 SyncGroup 請求,將剛剛做出的分配方案發給協調者。值得注意的是,其他成員也會向協調者發送 SyncGroup 請求,只不過請求體中并沒有實際的內容。這一步的主要目的是讓協調者接收分配方案,然后統一以 SyncGroup 響應的方式分發給所有成員,這樣組內所有成員就都知道自己該消費哪些分區了。
接下來,我用一張圖來形象地說明一下 JoinGroup 請求的處理過程。
?
就像前面說的,JoinGroup 請求的主要作用是將組成員訂閱信息發送給領導者消費者,待領導者制定好分配方案后,重平衡流程進入到 SyncGroup 請求階段。
下面這張圖描述的是 SyncGroup 請求的處理流程。
?
SyncGroup 請求的主要目的,就是讓協調者把領導者制定的分配方案下發給各個組內成員。當所有成員都成功接收到分配方案后,消費者組進入到 Stable 狀態,即開始正常的消費工作。
2.5 Broker端重平衡流程
要剖析協調者端處理重平衡的全流程,我們必須要分幾個場景來討論。這幾個場景分別是新成員加入組、組成員主動離組、組成員崩潰離組、組成員提交位移。
場景一:新成員入組
新成員入組是指組處于 Stable 狀態后,有新成員加入。如果是全新啟動一個消費者組,Kafka 是有一些自己的小優化的,流程上會有些許的不同。我們這里討論的是,組穩定了之后有新成員加入的情形。
當協調者收到新的 JoinGroup 請求后,它會通過心跳請求響應的方式通知組內現有的所有成員,強制它們開啟新一輪的重平衡。具體的過程和之前的客戶端重平衡流程是一樣的?,F在,我用一張時序圖來說明協調者一端是如何處理新成員入組的。
?
場景二:組成員主動離組。
何謂主動離組?就是指消費者實例所在線程或進程調用 close() 方法主動通知協調者它要退出。這個場景就涉及到了第三類請求:LeaveGroup 請求。協調者收到 LeaveGroup 請求后,依然會以心跳響應的方式通知其他成員,因此我就不再贅述了,還是直接用一張圖來說明。
?
場景三:組成員奔潰離組。
崩潰離組是指消費者實例出現嚴重故障,突然宕機導致的離組。它和主動離組是有區別的,因為后者是主動發起的離組,協調者能馬上感知并處理。但崩潰離組是被動的,協調者通常需要等待一段時間才能感知到,這段時間一般是由消費者端參數 session.timeout.ms 控制的。也就是說,Kafka 一般不會超過 session.timeout.ms 就能感知到這個崩潰。當然,后面處理崩潰離組的流程與之前是一樣的,我們來看看下面這張圖。
?
場景四:重平衡時協調者對組內成員提交位移的處理。
正常情況下,每個組內成員都會定期匯報位移給協調者。當重平衡開啟時,協調者會給予成員一段緩沖時間,要求每個成員必須在這段時間內快速地上報自己的位移信息,然后再開啟正常的 JoinGroup/SyncGroup 請求發送。還是老辦法,我們使用一張圖來說明。
?
總結
以上是生活随笔為你收集整理的kafka消费组与重平衡机制详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka数据存储详解
- 下一篇: Kafka在Spring项目中的实战演练