干货|kafka最佳实践
這里翻譯一篇關于 Kafka 實踐的文章,內(nèi)容來自 DataWorks Summit/Hadoop Summit?上一篇分享,PPT 參考?https://www.slideshare.net/HadoopSummit/apache-kafka-best-practices?里面講述了很多關于 Kafka 配置、監(jiān)控、優(yōu)化的內(nèi)容,絕對是在實踐中總結出的精華,有很大的借鑒參考意義,本文主要是根據(jù) PPT 的內(nèi)容進行翻譯及適當補充。
Kafka 的架構這里就不多做介紹了,直接不如正題。
Kafka 基本配置及性能優(yōu)化
這里主要是 Kafka 集群基本配置的相關內(nèi)容。
硬件要求
Kafka 集群基本硬件的保證
OS 調(diào)優(yōu)
-
OS page cache:應當可以緩存所有活躍的 Segment(Kafka 中最基本的數(shù)據(jù)存儲單位);
-
fd 限制:100k+;
-
禁用 swapping:簡單來說,swap 作用是當內(nèi)存的使用達到一個臨界值時就會將內(nèi)存中的數(shù)據(jù)移動到 swap 交換空間,但是此時,內(nèi)存可能還有很多空余資源,swap 走的是磁盤 IO,對于內(nèi)存讀寫很在意的系統(tǒng),最好禁止使用 swap 分區(qū);
-
TCP 調(diào)優(yōu);
-
JVM 配置
-
JDK 8 并且使用 G1 垃圾收集器;
-
至少要分配 6-8 GB 的堆內(nèi)存。
Kafka 磁盤存儲
-
使用多塊磁盤,并配置為 Kafka 專用的磁盤;
-
JBOD vs RAID10;
-
JBOD(Just a Bunch of Disks,簡單來說它表示一個沒有控制軟件提供協(xié)調(diào)控制的磁盤集合,它將多個物理磁盤串聯(lián)起來,提供一個巨大的邏輯磁盤,數(shù)據(jù)是按序存儲,它的性能與單塊磁盤類似)
-
JBOD 的一些缺陷:
-
任何磁盤的損壞都會導致異常關閉,并且需要較長的時間恢復;
-
數(shù)據(jù)不保證一致性;
-
多級目錄;
-
-
社區(qū)也正在解決這么問題,可以關注 KIP 112、113:
-
必要的工具用于管理 JBOD;
-
自動化的分區(qū)管理;
-
磁盤損壞時,Broker 可以將 replicas 遷移到好的磁盤上;
-
在同一個 Broker 的磁盤間 reassign replicas;
-
-
RAID 10 的特點:
-
可以允許單磁盤的損壞;
-
性能和保護;
-
不同磁盤間的負載均衡;
-
高命中來減少 space;
-
單一的 mount point;
-
-
文件系統(tǒng):
-
使用 EXT 或 XFS;
-
SSD;
-
基本的監(jiān)控
Kafka 集群需要監(jiān)控的一些指標,這些指標反應了集群的健康度。
-
CPU 負載;
-
Network Metrics;
-
File Handle 使用;
-
磁盤空間;
-
磁盤 IO 性能;
-
GC 信息;
-
ZooKeeper 監(jiān)控。
Kafka replica 相關配置及監(jiān)控
Kafka Replication
-
Partition 有兩種副本:Leader,Follower;
-
Leader 負責維護 in-sync-replicas(ISR)
-
replica.lag.time.max.ms:默認為10000,如果 follower 落后于 leader 的消息數(shù)超過這個數(shù)值時,leader 就將 follower 從 isr 列表中移除;
-
num.replica.fetchers,默認為1,用于從 leader 同步數(shù)據(jù)的 fetcher 線程數(shù);
-
min.insync.replica:Producer 端使用來用于保證 Durability(持久性);
-
Under Replicated Partitions
當發(fā)現(xiàn) replica 的配置與集群的不同時,一般情況都是集群上的 replica 少于配置數(shù)時,可以從以下幾個角度來排查問題:
-
JMX 監(jiān)控項:kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions;
-
可能的原因:
-
Broker 掛了?
-
Controller 的問題?
-
ZooKeeper 的問題?
-
Network 的問題?
-
-
解決辦法:
-
調(diào)整 ISR 的設置;
-
Broker 擴容。
-
Controller
-
負責管理 partition 生命周期;
-
避免 Controller’s ZK 會話超時:
-
ISR 抖動;
-
ZK Server 性能問題;
-
Broker 長時間的 GC;
-
網(wǎng)絡 IO 問題;
-
-
監(jiān)控:
-
kafka.controller:type=KafkaController,name=ActiveControllerCount,應該為1;
-
LeaderElectionRate。
-
Unclean leader 選舉
允許不在 isr 中 replica 被選舉為 leader。
-
這是 Availability 和 Correctness 之間選擇,Kafka 默認選擇了可用性;
-
unclean.leader.election.enable:默認為 true,即允許不在 isr 中 replica 選為 leader,這個配置可以全局配置,也可以在 topic 級別配置;
-
監(jiān)控:kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec。
Broker 配置
Broker 級別有幾個比較重要的配置,一般需要根據(jù)實際情況進行相應配置的:
-
log.retention.{ms, minutes, hours}?,?log.retention.bytes:數(shù)據(jù)保存時間;
-
message.max.bytes,?replica.fetch.max.bytes;
-
delete.topic.enable:默認為 false,是否允許通過 admin tool 來刪除 topic;
-
unclean.leader.election.enable?= false,參見上面;
-
min.insync.replicas?= 2:當 Producer 的 acks 設置為 all 或 -1 時,min.insync.replicas?代表了必須進行確認的最小 replica 數(shù),如果不夠的話 Producer 將會報?NotEnoughReplicas?或?NotEnoughReplicasAfterAppend?異常;
-
replica.lag.time.max.ms(超過這個時間沒有發(fā)送請求的話,follower 將從 isr 中移除), num.replica.fetchers;
-
replica.fetch.response.max.bytes;
-
zookeeper.session.timeout.ms?= 30s;
-
num.io.threads:默認為8,KafkaRequestHandlerPool 的大小。
Kafka 相關資源的評估
集群評估
-
Broker 評估
-
每個 Broker 的 Partition 數(shù)不應該超過2k;
-
控制 partition 大小(不要超過25GB);
-
-
集群評估(Broker 的數(shù)量根據(jù)以下條件配置)
-
數(shù)據(jù)保留時間;
-
集群的流量大小;
-
-
集群擴容:
-
磁盤使用率應該在 60% 以下;
-
網(wǎng)絡使用率應該在 75% 以下;
-
-
集群監(jiān)控
-
保持負載均衡;
-
確保 topic 的 partition 均勻分布在所有 Broker 上;
-
確保集群的階段沒有耗盡磁盤或帶寬。
-
Broker 監(jiān)控
-
Partition 數(shù):kafka.server:type=ReplicaManager,name=PartitionCount;
-
Leader 副本數(shù):kafka.server:type=ReplicaManager,name=LeaderCount;
-
ISR 擴容/縮容率:kafka.server:type=ReplicaManager,name=IsrExpandsPerSec;
-
讀寫速率:Message in rate/Byte in rate/Byte out rate;
-
網(wǎng)絡請求的平均空閑率:NetworkProcessorAvgIdlePercent;
-
請求處理平均空閑率:RequestHandlerAvgIdlePercent。
Topic 評估
-
partition 數(shù)
-
Partition 數(shù)應該至少與最大 consumer group 中 consumer 線程數(shù)一致;
-
對于使用頻繁的 topic,應該設置更多的 partition;
-
控制 partition 的大小(25GB 左右);
-
考慮應用未來的增長(可以使用一種機制進行自動擴容);
-
-
使用帶 key 的 topic;
-
partition 擴容:當 partition 的數(shù)據(jù)量超過一個閾值時應該自動擴容(實際上還應該考慮網(wǎng)絡流量)。
合理地設置 partition
-
根據(jù)吞吐量的要求設置 partition 數(shù):
-
假設 Producer 單 partition 的吞吐量為 P;
-
consumer 消費一個 partition 的吞吐量為 C;
-
而要求的吞吐量為 T;
-
那么 partition 數(shù)至少應該大于 T/P、T/c 的最大值;
-
-
更多的 partition,意味著:
-
更多的 fd;
-
可能增加 Unavailability(可能會增加不可用的時間);
-
可能增加端到端的延遲;
-
client 端將會使用更多的內(nèi)存。
-
關于 Partition 的設置可以參考這篇文章 https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster ,這里簡單講述一下,Partition 的增加將會帶來以下幾個優(yōu)點和缺點:
增加吞吐量:對于 consumer 來說,一個 partition 只能被一個 consumer 線程所消費,適當增加 partition 數(shù),可以增加 consumer 的并發(fā),進而增加系統(tǒng)的吞吐量;
需要更多的 fd:對于每一個 segment,在 broker 都會有一個對應的 index 和實際數(shù)據(jù)文件,而對于 Kafka Broker,它將會對于每個 segment 每個 index 和數(shù)據(jù)文件都會打開相應的 file handle(可以理解為 fd),因此,partition 越多,將會帶來更多的 fd;
可能會增加數(shù)據(jù)不可用性(主要是指增加不可用時間):主要是指 broker 宕機的情況,越多的 partition 將會意味著越多的 partition 需要 leader 選舉(leader 在宕機這臺 broker 的 partition 需要重新選舉),特別是如果剛好 controller 宕機,重新選舉的 controller 將會首先讀取所有 partition 的 metadata,然后才進行相應的 leader 選舉,這將會帶來更大不可用時間;
可能增加 End-to-end 延遲:一條消息只有其被同步到 isr 的所有 broker 上后,才能被消費,partition 越多,不同節(jié)點之間同步就越多,這可能會帶來毫秒級甚至數(shù)十毫秒級的延遲;
Client 將會需要更多的內(nèi)存:Producer 和 Consumer 都會按照 partition 去緩存數(shù)據(jù),每個 partition 都會帶來數(shù)十 KB 的消耗,partition 越多, Client 將會占用更多的內(nèi)存。
Producer 的相關配置、性能調(diào)優(yōu)及監(jiān)控
Quotas
-
避免被惡意 Client 攻擊,保證 SLA;
-
設置 produce 和 fetch 請求的字節(jié)速率閾值;
-
可以應用在 user、client-id、或者 user 和 client-id groups;
-
Broker 端的 metrics 監(jiān)控:throttle-rate、byte-rate;
-
replica.fetch.response.max.bytes:用于限制 replica 拉取請求的內(nèi)存使用;
-
進行數(shù)據(jù)遷移時限制貸款的使用,kafka-reassign-partitions.sh -- -throttle option。
Kafka Producer
-
使用 Java 版的 Client;
-
使用?kafka-producer-perf-test.sh?測試你的環(huán)境;
-
設置內(nèi)存、CPU、batch 壓縮;
-
batch.size:該值設置越大,吞吐越大,但延遲也會越大;
-
linger.ms:表示 batch 的超時時間,該值越大,吞吐越大、但延遲也會越大;
-
max.in.flight.requests.per.connection:默認為5,表示 client 在 blocking 之前向單個連接(broker)發(fā)送的未確認請求的最大數(shù),超過1時,將會影響數(shù)據(jù)的順序性;
-
compression.type:壓縮設置,會提高吞吐量;
-
acks:數(shù)據(jù) durability 的設置;
-
-
避免大消息
-
會使用更多的內(nèi)存;
-
降低 Broker 的處理速度;
-
性能調(diào)優(yōu)
-
如果吞吐量小于網(wǎng)絡帶寬
-
增加線程;
-
提高 batch.size;
-
增加更多 producer 實例;
-
增加 partition 數(shù);
-
-
設置 acks=-1 時,如果延遲增大:可以增大?num.replica.fetchers(follower 同步數(shù)據(jù)的線程數(shù))來調(diào)解;
-
跨數(shù)據(jù)中心的傳輸:增加 socket 緩沖區(qū)設置以及 OS tcp 緩沖區(qū)設置。
Prodcuer 監(jiān)控
-
batch-size-avg
-
compression-rate-avg
-
waiting-threads
-
buffer-available-bytes
-
record-queue-time-max
-
record-send-rate
-
records-per-request-avg
Kafka Consumer 配置、性能調(diào)優(yōu)及監(jiān)控
Kafka Consumer
-
使用?kafka-consumer-perf-test.sh?測試環(huán)境;
-
吞吐量問題:
-
partition 數(shù)太少;
-
OS page cache:分配足夠的內(nèi)存來緩存數(shù)據(jù);
-
應用的處理邏輯;
-
-
offset topic(__consumer_offsets)
-
offsets.topic.replication.factor:默認為3;
-
offsets.retention.minutes:默認為1440,即 1day;
– MonitorISR,topicsize;
-
-
offset commit較慢:異步 commit 或 手動 commit。
Consumer 配置
-
fetch.min.bytes?、fetch.max.wait.ms;
-
max.poll.interval.ms:調(diào)用?poll()?之后延遲的最大時間,超過這個時間沒有調(diào)用?poll()?的話,就會認為這個 consumer 掛掉了,將會進行 rebalance;
-
max.poll.records:當調(diào)用?poll()?之后返回最大的 record 數(shù),默認為500;
-
session.timeout.ms;
-
Consumer Rebalance
– check timeouts
– check processing times/logic
– GC Issues -
網(wǎng)絡配置;
Consumer 監(jiān)控
consumer 是否跟得上數(shù)據(jù)的發(fā)送速度。
-
Consumer Lag:consumer offset 與 the end of log(partition 可以消費的最大 offset) 的差值;
-
監(jiān)控
-
metric 監(jiān)控:records-lag-max;
-
通過?bin/kafka-consumer-groups.sh?查看;
-
用于 consumer 監(jiān)控的 LinkedIn’s Burrow;
-
-
減少 Lag
-
分析 consumer:是 GC 問題還是 Consumer hang 住了;
-
增加 Consumer 的線程;
-
增加分區(qū)數(shù)和 consumer 線程;
-
如何保證數(shù)據(jù)不丟
這個是常用的配置,
block.on.buffer.full:默認設置為 false,當達到內(nèi)存設置時,可能通過 block 停止接受新的 record 或者拋出一些錯誤,默認情況下,Producer 將不會拋出 BufferExhaustException,而是當達到?max.block.ms?這個時間后直接拋出 TimeoutException。設置為 true 的意義就是將?max.block.ms?設置為 Long.MAX_VALUE,未來版本中這個設置將被遺棄,推薦設置?max.block.ms。
《新程序員》:云原生和全面數(shù)字化實踐50位技術專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的干货|kafka最佳实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 假如我是面试官,我会这样虐你
- 下一篇: 面试|图解kafka的高可用机制