Kafka解析之失效副本
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-under-replicated-partitions/
簡介
Kafka從0.8.x版本開始引入副本機制,這樣可以極大的提高集群的可靠性和穩定性。不過這也使得Kafka變得更加復雜起來,失效副本就是所要面臨的一個難題。
通常情況下,Kafka中的每個分區(partition)都會分配多個副本(replica),具體的副本數量由Broker級別參數default.replication.factor(默認大小為1)指定,也可以在創建topic的時候通過 --replication-factor ${num} 顯式指定副本的數量(副本因子)。一般情況下,將前者default.replication.factor設置為大于1的值,這樣在參數auto.create.topic.enable為true的時候,自動創建的topic會根據default.replication.factor的值來創建副本數;或者更加通用的做法是使用后者而指定大于1的副本數。
每個分區的多個副本稱之為AR(assigned replicas),包含至多一個leader副本和多個follower副本。與AR對應的另一個重要的概念就是ISR(in-sync replicas),ISR是指與leader副本保持同步狀態的副本集合,當然leader副本本身也是這個集合中的一員。而ISR之外,也就是處于同步失敗或失效狀態的副本,副本對應的分區也就稱之為同步失效分區,即under-replicated分區。
失效副本的判定
怎么樣判定一個分區是否有副本是處于同步失效狀態的呢?從Kafka 0.9.x版本開始通過唯一的一個參數replica.lag.time.max.ms(默認大小為10,000)來控制,當ISR中的一個follower副本滯后leader副本的時間超過參數replica.lag.time.max.ms指定的值時即判定為副本失效,需要將此follower副本剔出除ISR之外。具體實現原理很簡單,當follower副本將leader副本的LEO(Log End Offset,每個分區最后一條消息的位置)之前的日志全部同步時,則認為該follower副本已經追趕上leader副本,此時更新該副本的lastCaughtUpTimeMs標識。Kafka的副本管理器(ReplicaManager)啟動時會啟動一個副本過期檢測的定時任務,而這個定時任務會定時檢查當前時間與副本的lastCaughtUpTimeMs差值是否大于參數replica.lag.time.max.ms指定的值。千萬不要錯誤的認為follower副本只要拉取leader副本的數據就會更新lastCaughtUpTimeMs,試想當leader副本的消息流入速度大于follower副本的拉取速度時,follower副本一直不斷的拉取leader副本的消息也不能與leader副本同步,如果還將此follower副本置于ISR中,那么當leader副本失效,而選取此follower副本為新的leader副本,那么就會有嚴重的消息丟失。
Kafka源碼注釋中說明了一般有兩種情況會導致副本失效:
這里筆者補充一點,如果通過工具增加了副本因子,那么新增加的副本在趕上leader副本之前也都是處于失效狀態的。如果一個follower副本由于某些原因(比如宕機)而下線,之后又上線,在追趕上leader副本之前也是出于失效狀態。
在Kafka 0.9.x版本之前還有另一個Broker級別的參數replica.lag.max.messages(默認大小為4000)也是用來判定失效副本的,當一個follower副本滯后leader副本的消息數超過replica.lag.max.messages的大小時則判定此follower副本為失效副本。它與replica.lag.time.max.ms參數判定出的失敗副本去并集組成一個失效副本的集合,從而進一步剝離出ISR。下面給出0.8.2.2版本的相關核心代碼以供參考:
def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = {val leaderLogEndOffset = leaderReplica.logEndOffsetval candidateReplicas = inSyncReplicas - leaderReplica// Case 1: Stuck followersval stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs)if(stuckReplicas.size > 0)debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))// Case 2: Slow followersval slowReplicas = candidateReplicas.filter(r =>r.logEndOffset.messageOffset >= 0 &&leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages)if(slowReplicas.size > 0)debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))stuckReplicas ++ slowReplicas }不過這個replica.lag.max.messages參數很難給定一個合適的值,若設置的太大則這個參數本身就沒有太多意義,若設置的太小則會讓follower副本反復的處于同步、未同步、同步的死循環中,進而又會造成ISR的頻繁變動。而且這個參數是Broker級別的,也就是說對Broker中的所有topic都生效,就以默認的值4000來說,對于消息流入速度很低的topic來說,比如TPS=10,這個參數并無用武之地;而對于消息流入速度很高的topic來說,比如TPS=20,000,這個參數的取值又會引入ISR的頻繁變動,所以從0.9.x版本開始就徹底移除了這一參數,相關的資料還可以參考KIP16。
具有失效副本的分區可以從側面洞悉出Kafka集群的很多問題,毫不夸張的說:如果只能用一個指標來衡量Kafka,那么失效副本分區的個數必然是首選。Kafka本身也提供了一個相關的指標,即UnderReplicatedPartitions,這個可以通過JMX訪問:
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions來獲取其值,取值范圍是大于等于0的整數。如果獲取的UnderReplicatedPartitions值大于0,就需要對其進行告警,并進一步診斷其背后的真正原因,有可能是某個Broker的問題,也有可能引申到整個集群的問題,也許還要引入其他一些信息、指標等配合找出問題之所在。注意:如果Kafka集群正在做分區遷移(kafka-reassign-partitions.sh)的時候,這個值也會大于0。
優先副本的選舉
在診斷失效副本之前,可以先嘗試執行一次優先副本的選舉操作來看看問題是否迎刃而解,反之也能夠將排查的范圍縮小。
所謂的優先副本是指在Kafka的AR列表中的第一個副本。理想情況下,優先副本就是該分區的leader副本,所以也可以稱之為preferred leader。Kafka要確保所有主題的優先副本在Kafka集群中均勻分布,這樣就保證了所有分區的Leader均衡分布。保證Leader在集群中均衡分布很重要,因為所有的讀寫請求都由分區leader副本進行處理,如果leader分布過于集中,就會造成集群負載不均衡。試想一下,如果某分區的leader副本在某個很空閑的Broker上,而它的follower副本宿主于另一個很繁忙的Broker上,那么此follower副本很可能由于分配不到足夠的系統資源而無法完成副本同步的任務,進而造成副本失效。
所謂的優先副本的選舉是指通過自動或者手動的方式促使優先副本選舉為leader,也就是分區平衡,這樣可以促進集群的均衡負載,也就進一步的降低失效副本生存的幾率。需要注意的是分區平衡并不意味著Kafka集群的負載均衡,因為這還要考慮到集群中的分區分配是否均衡。更進一步每個分區的leader的負載也是各不相同,有些leader副本的負載很高,比如需要承受TPS為3W的負荷,而有些leader副本只需承載個位數的負荷,也就是說就算集群中的分區分配均衡,leader分配也均衡也并不能確保整個集群的負載就是均衡的,還需要其他一些硬性的指標來做進一步的衡量,這個會在下面的內容中涉及,本小節只探討優先副本的選舉。
隨著集群運行時間的推移,可能部分節點的變化導致leader進行了重新選舉,若優先副本的宿主Broker在發生故障后由其他副本代替而擔任了新的leader,就算優先副本的宿主Broker故障恢復而重新回到集群時若沒有自動平衡的功能,該副本也不會成為分區的leader。Kafka具備分區自動平衡的功能,且默認情況下此功能是開啟的,與此對應的參數是
auto.leader.rebalance.enable=true。如果開啟分區自動平衡,則Kafka的Controller會創建一個分區重分配檢查及分區重分配操作(onPartitionReassignment)的定時任務,這個定時任務會輪詢所有的Broker,計算每個Broker的分區不平衡率(Broker中的不平衡率=非優先副本的leader個數 / 分區總數)是否超過leader.imbalance.per.broker.percentage配置的比率,默認是10%,如果超過設定的比率則會自動執行優先副本的選舉動作以求分區平衡。默認執行周期是leader.imbalance.check.interval.seconds=300,即5分鐘。
不過在生產環境中不建議將auto.leader.rebalance.enable設置為默認的true,因為這可能會引起負面的性能問題,也有可能會引起客戶端一定時間的阻塞。因為執行的時間無法自主掌控,如果在關鍵時期(比如電商大促波峰期)執行關鍵任務的關卡擺上一道優先副本的自動選舉操作,勢必會有業務阻塞、頻繁超時之類的風險。前面也分析過分區的均衡也不能確保集群的均衡,而集群一定程度上的不均衡也是可以忍受的,為防關鍵時期掉鏈子的行為,筆者建議還是把這類的掌控權把控在自己的手中,可以針對此類相關的埋點指標設置相應的告警,在合適的時機執行合適的操作。
優先副本的選舉是一個安全的(Kafka客戶端可以自動感知分區leader的變更)并且也容易執行的一類操作。執行優先副本的選舉是通過$KAFKA_HOME/bin/路徑下的kafka-preferred-replica-election.sh腳本來實現的。舉例某Kafka集群有3個Broker,編號(broker.id)為[0,1,2],且創建了名稱為“topic-1”、副本數為3, 分區數為9的一個topic,細節如下(注意其中的IP地址是虛構的):
[root@zzh kafka_1.0.0]# bin/kafka-topics.sh --describe --zookeeper 192.168.0.2:2181,192.168.0.3:2181,192.168.0.3:2181/kafka --topic topic-1 Topic:topic-1 PartitionCount:9 ReplicationFactor:3 Configs:Topic: topic-1 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1Topic: topic-1 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2Topic: topic-1 Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0Topic: topic-1 Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0Topic: topic-1 Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1Topic: topic-1 Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2Topic: topic-1 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1Topic: topic-1 Partition: 7 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2Topic: topic-1 Partition: 8 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0可以看到初始情況下,所有的leader都是AR中的第一個副本也就是優先副本。此時關閉再開啟broker.id=2那臺Broker,就可以使得topic-1中存在非優先副本的leader,細節如下:
Topic:topic-1 PartitionCount:9 ReplicationFactor:3 Configs:Topic: topic-1 Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1,2Topic: topic-1 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2Topic: topic-1 Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2Topic: topic-1 Partition: 3 Leader: 1 Replicas: 2,1,0 Isr: 1,0,2Topic: topic-1 Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2Topic: topic-1 Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2Topic: topic-1 Partition: 6 Leader: 0 Replicas: 2,0,1 Isr: 0,1,2Topic: topic-1 Partition: 7 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2Topic: topic-1 Partition: 8 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2此時可以執行對應的kafka-preferred-replica-election.sh腳本來進行優先副本的選舉操作,相關細節如下:
[root@zzh kafka_1.0.0]# bin/kafka-preferred-replica-election.sh --zookeeper 192.168.0.2:2181,192.168.0.3:2181,192.168.0.3:2181/kafka Created preferred replica election path with {"version":1,"partitions":[{"topic":"topic-1","partition":6},{"topic":"topic-1","partition":0},{"topic":"topic-1","partition":7},{"topic":"topic-1","partition":3},{"topic":"topic-1","partition":8},{"topic":"topic-1","partition":2},{"topic":"topic-1","partition":5},{"topic":"topic-1","partition":4},{"topic":"topic-1","partition":1}]} Successfully started preferred replica election for partitions Set([topic-1,6]], [topic-1,5], [topic-1,4], [topic-1,3], [topic-1,2], [topic-1,7], [topic-1,1], [topic-1,8], [topic-1,0])最終的leader分配又回到初始情況下的狀態。不過上面的執行方法是針對Kafka集群中的所有topic都執行一次優先副本的選舉,如果集群中存有大量的分區,這一操作有可能會失效,因為這個請求的內容會寫入到Zookeeper的節點之中,如果這個請求的內容體過大而超過節點所能存儲的數據(默認為1MB)時請求會失敗。Kafka提供了更細粒度的優先副本的選舉操作,它可以細化到某個topic的某個分區這個層級,這樣在面對一次請求過大的問題時可以選擇性的進行細粒度拆分,也可以在實現自定義的個性化優先副本的選舉操作。
在實現細粒度的優先副本的選舉操作之前,首先要建立一個JSON文件,將所需要的topic以及對應的分區編號記錄于其中,比如針對topic-1的編號為0的分區進行優先副本的選舉操作,對應的JSON文件內容如下(假設此文件命名為partitions.json):
{"partitions":[{"partition":0,"topic":"topic-1"}] }之后再執行kafka-preferred-replica-election.sh腳本時通過–path-to-json-file參數來指定此
JSON文件,相關細節如下:
失效副本的診斷及預警
在第2小節“失效副本的判定”中提及了UnderReplicatedPartitions指標,這個UnderReplicatedPartitions是一個Broker級別的指標,指的是leader副本在當前Broker上且具有失效副本的分區的個數,也就是說這個指標可以讓我們感知失效副本的存在以及波及的分區數量。這一類分區也就是文中篇頭所說的同步失效分區,即under-replicated分區。
如果集群中有多個Broker的UnderReplicatedPartitions保持一個大于0的穩定值時,一般暗示著集群中有Broker已經處于下線狀態。這種情況下,這個Broker中的分區個數與集群中的所有UnderReplicatedPartitions(處于下線的Broker是不會上報任何指標值的)之和是相等的。通常這類問題是由于機器硬件原因引起的,但也有可能是由于操作系統或者JVM引起的,可以根據這個方向繼續做進一步的深入調查。
如果集群中存在Broker的UnderReplicatedPartitions頻繁變動,或者處于一個穩定的大于0的值(這里特指沒有Broker下線的情況)時,一般暗示著集群出現了性能問題,通常這類問題很難診斷,不過我們可以一步一步的將問題的范圍縮小,比如先嘗試確定這個性能問題是否只存在于集群的某個Broker中,還是整個集群之上。如果確定集群中所有的under-replicated分區都是在單個Broker上,那么可以看出這個Broker出現了問題,進而可以針對這單一的Broker做專項調查,比如:操作系統、GC、網絡狀態或者磁盤狀態(比如:iowait、ioutil等指標)。
如果多個Broker中都出現了under-replicated分區,這個一般是整個集群的問題,但也有可能是單個Broker出現了問題,前者可以理解,后者有作何解釋?想象這樣一種情況,如果某個Broker在同步消息方面出了問題,那么其上的follower副本就無法及時有效與其他Broker上的leader副本上進行同步,這樣一來就出現了多個Broker都存在under-replicated分區的現象。有一種方法可以查看是否是單個Broker問題已經是哪個Broker出現了問題,就是通過kafka-topic.sh工具來查看集群中所有的under-replicated分區。
舉例說明,假設集群中有4個Broker,編號為[0,1,2,3],相關的under-replicated分區信息如下:
[root@zzh kafka-1.0.0]# bin/kafka-topics.sh --describe --zookeeper 192.168.0.2:2181,192.168.0.3:2181,192.168.0.3:2181/kafka --under-replicatedTopic: topic-1 Partition: 7 Leader: 0 Replicas: 0,1 Isr: 0Topic: topic-1 Partition: 1 Leader: 2 Replicas: 1,2 Isr: 2Topic: topic-2 Partition: 3 Leader: 3 Replicas: 1,3 Isr: 3Topic: topic-2 Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0Topic: topic-3 Partition: 7 Leader: 0 Replicas: 0,1 Isr: 0Topic: topic-3 Partition: 5 Leader: 3 Replicas: 1,3 Isr: 3Topic: topic-4 Partition: 6 Leader: 2 Replicas: 1,2 Isr: 2Topic: topic-4 Partition: 2 Leader: 2 Replicas: 1,2 Isr: 2在這個案例中,我們可以看到所有的ISR列表中都出現編號為1的Broker的缺失,進而可以將調查的中心遷移到這個Broker上。如果通過上面的步驟沒有定位到某個獨立的Broker,那么就需要針對整個集群層面做進一步的探究。
集群層面的問題一般也就是兩個方面:資源瓶頸以及負載不均衡。資源瓶頸指的是Broker在某硬件資源的使用上遇到了瓶頸,比如網絡、CPU、IO等層面。就以IO而論,Kafka中的消息都是落日志存盤的,生產者線程將消息寫入leader副本的性能和IO有著直接的關聯,follower副本的同步線程以及消費者的消費線程又要通過IO從磁盤中拉取消息,如果IO層面出現了瓶頸,那么勢必會影響全局的走向,與此同時消息的流入流出又都需要和網絡打交道。筆者建議硬件層面的指標可以關注CPU的使用率、網絡流入/流出速率、磁盤的讀/寫速率、iowait、ioutil等,也可以適當的關注下文件句柄數、socket句柄數以及內存等方面。
前面在講述優先副本的時候就涉及到了負載均衡,負載不均衡會影響leader與follower之間的同步效率,進而引起失效副本的產生。集群層面的負載均衡所要考慮的就遠比leader副本的分布均衡要復雜的多,需要考慮負載層面的各個因素,將前面所提及的分區數量(partitions)、leader數量(leaders)、CPU占用率(cpuUsed)、網絡流入/流出速率(nwBytesIn/nwBytesOut)、磁盤讀寫速率(ioRead/ioWrite)、iowait、ioutil、文件句柄數(fd)、內存使用率(memUsed)整合考慮。(這些指標不全是必須的,可以自定義增加或者減少。)在資源瓶頸這一方面我們可以單方面的針對每一個單一資源的使用情況設置一個合理的額定閾值,超過額定閾值可以輸出告警,進而作出進一步的響應動作,而這里的集群層面的資源整合負載又作何分析?
首先對每一個負載指標做歸一化的處理,歸一化是一種無量綱的處理手段,把數據映射到0-1范圍之內,這樣更加方便處理。就以分區數量為例,這里記為MpartitionsM_{partitions}Mpartitions?,對于擁有n個Broker的Kafka集群來說:$ M_{partitions}(n) $代表broker.id=n的Broker中擁有的分區數,那么對應的歸一化計算公式為:
用字母P代表每個指標的權重,那么對應前面的所提及的指標分別有:PpartitionsP_{partitions}Ppartitions?、PleadersP_{leaders}Pleaders?、PcpuUsedP_{cpuUsed}PcpuUsed?、PnwBytesInP_{nwBytesIn}PnwBytesIn?、PnwBytesOutP_{nwBytesOut}PnwBytesOut?、PioReadP_{ioRead}PioRead?、PioWriteP_{ioWrite}PioWrite?、PiowaitP_{iowait}Piowait?、PioutilP_{ioutil}Pioutil?、PmemUsedP_{memUsed}PmemUsed?、PfdP_{fd}Pfd?。由此一個Broker(n)的負載值的計算公式為:
各個權重的取值就需要根據實踐檢驗去調節,不過也可以簡單的將各個指標的權重看的一致,那么計算公式也可以簡化為:
將Bn{B_{n}}Bn?進一步的再做歸一化處理:
如果將整個集群的負載量看做是1,那么這個Db(n)D_b(n)Db?(n) 代表每個Broker所占的負載比重,如果這里采用“餅圖”來做集群負載數據可視化的話,那么這個Db(n)D_b(n)Db?(n)就代表作每個扇區的比重值。在發現under-replicated分區的時候,可以按照Db(n)D_b(n)Db?(n) 值從大到小的順序逐一對各個Broker進行排查。
那么如何預警Kafka集群中有Broker負載過高或者過低的情況,這里可以引入均方差的概念,不過在計算均方差之前還需要來計算下Broker負載的平均值,這里用B\frac{}{B}B? 來表示:
這個B\frac{}{B}B?對應的歸一化值為:
對應的集群負載的均方差方差可表示為:
如果用rnr_nrn?表示某個Broker的負載偏離率,那么很明顯的有:
這個rnr_nrn?與前面優先副本的選舉中的leader.imbalance.per.broker.percentage參數有異曲同工之妙,而且比這個參數更加的精準,我們同樣可以設置Broker的負載偏離率的額定閾值r為10%,超過這個閾值可以發送告警。
假設集群中每個Broker的負載偏離率都無限接近r,那么對應的集群負載均方差也就最大:
比如對于一個具有4個Broker節點的Kafka集群來說,如果設置Broker的負載偏離率為10%,那么對應的集群負載均方差σ就不能超過0.025。針對集群負載均方差設置合理的告警可以提前預防失效副本的發生。
為了讓上面這段陳述變得不那么的生澀,這里舉一個簡單的示例來演示一下這些公式的具體用法。假設集群中有4(即n=4)個Broker節點,為了簡化說明只取MpartitionsM_{partitions}Mpartitions?、MleadersM_{leaders}Mleaders?、McpuUsedM_{cpuUsed}McpuUsed?、MnwBytesInM_{nwBytesIn}MnwBytesIn?、MnwBytesOutM_{nwBytesOut}MnwBytesOut?這幾個作為負載的考量指標,某一時刻集群中各個Broker的負載情況如下表所示:
首先計算Broker1的DpartitionsD_{partitions}Dpartitions?如下所示:
其余各個指標的歸一化值可以類推,具體如下表所示:
由上表看到經過簡單的歸一化處理就將有單位的各種類型的指標歸納為一個簡單的數值。進一步的我們省去各個指標權重的考慮,可以計算出此刻各個Broker的負載值:
同理可得:
如果把此刻的集群整體負載看成是1,也就是100%,各個Broker分攤這100%的負載,這樣可以將Broker的負載值做進一步的歸一化處理:
同理可得:
如果設置Broker的額定負載偏離率r為10%,那么我們進一步來計算下各個Broker的負載偏離率是否超過值,首先計算Broker1的負載偏離率:
同理可得:
可以看出這4個Broker都是相對均衡的,那么集群的負載均方差也就會在合理范圍之內(即小于0.025):
隨著集群運行時間的推移,某一時刻集群中各個Broker的負載情況發生了變化,具體如下表所示:
具體的計算過程就留給讀者自行驗算,最后集群的負載均方差為0.0595,大于0.025,所以可以看出發生了負載不均衡的現象。
寫在最后
失效副本會引起Kafka的多種異常發生,嚴重降低Kafka的可靠性,所以如何有效的預發以及在出現失效副本時如何精準的定位問題是至關重要的。本文盡量從Kafka本身的角度去剖析失效副本,篇幅限制這里并沒有針對操作系統、JVM以及集群硬件本身做更深層次的闡述。引起失效副本的原因也是千變萬化,希望這篇文章可以給讀者在解決相關問題時提供一定的思路。
PS: 吐槽下CSDN的公式編輯真是。。。。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-under-replicated-partitions/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的Kafka解析之失效副本的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka监控架构设计
- 下一篇: Linux IO磁盘篇整理小记