Kafka的优化建议
Kafka的優化建議
producer端:
-
設計上保證數據的可靠安全性,依據分區數做好數據備份,設立副本數等。
push數據的方式:同步異步推送數據:權衡安全性和速度性的要求,選擇相應的同步推送還是異步推送方式,當發現數據有問題時,可以改為同步來查找問題。 -
flush是kafka的內部機制,kafka優先在內存中完成數據的交換,然后將數據持久化到磁盤.kafka首先會把數據緩存(緩存到內存中)起來再批量flush.
可以通過log.flush.interval.messages和log.flush.interval.ms來配置flush間隔 -
可以通過replica機制保證數據不丟.
代價就是需要更多資源,尤其是磁盤資源,kafka當前支持GZip和Snappy壓縮,來緩解這個問題
是否使用replica(副本)取決于在可靠性和資源代價之間的balance(平衡) - broker到 Consumer kafka的consumer提供兩種接口.
high-level版本已經封裝了對partition和offset的管理,默認是會定期自動commit offset,這樣可能會丟數據的
low-level版本自己管理spout線程和partition之間的對應關系和每個partition上的已消費的offset(定期寫到zk)
并且只有當這個offset被ack后,即成功處理后,才會被更新到zk,所以基本是可以保證數據不丟的即使spout線程crash(崩潰),重啟后還是可以從zk中讀到對應的offset
- 異步要考慮到partition leader在未完成副本數follows的備份時就宕機的情況,即使選舉出了新的leader但是已經push的數據因為未備份就丟失了!
- 設置合適的方式,增大batch 大小來減小網絡IO和磁盤IO的請求,這是對于kafka效率的思考。
kafka不像hadoop更致力于處理大量級數據,kafka的消息隊列更擅長于處理小數據。針對具體業務而言,若是源源不斷的push大量的數據(eg:網絡爬蟲),可以考慮消息壓縮。但是這也一定程度上對CPU造成了壓力,還是得結合業務數據進行測試選擇
- 結合上游的producer架構,
broker端:
topic設置多分區,分區自適應所在機器,為了讓各分區均勻分布在所在的broker中,分區數要大于broker數。分區是kafka進行并行讀寫的單位,是提升kafka速度的關鍵。
broker能接收消息的最大字節數的設置一定要比消費端能消費的最大字節數要小,否則broker就會因為消費端無法使用這個消息而掛起。
broker可賦值的消息的最大字節數設置一定要比能接受的最大字節數大,否則broker就會因為數據量的問題無法復制副本,導致數據丟失
comsumer端:
關閉自動更新offset,等到數據被處理后再手動跟新offset。
在消費前做驗證前拿取的數據是否是接著上回消費的數據,不正確則return先行處理排錯。
一般來說zookeeper只要穩定的情況下記錄的offset是沒有問題,除非是多個consumer group 同時消費一個分區的數據,其中一個先提交了,另一個就丟失了。
問題:
kafka的數據一開始就是存儲在PageCache上的,定期flush到磁盤上的,也就是說,不是每個消息都被存儲在磁盤了,如果出現斷電或者機器故障等,PageCache上的數據就丟失了。
這個是總結出的到目前為止沒有發生丟失數據的情況
//producer用于壓縮數據的壓縮類型。默認是無壓縮。正確的選項值是none、gzip、snappy。壓縮最好用于批量處理,批量處理消息越多,壓縮性能越好props.put("compression.type", "gzip");//增加延遲props.put("linger.ms", "50");//這意味著leader需要等待所有備份都成功寫入日志,這種策略會保證只要有一個備份存活就不會丟失數據。這是最強的保證。,props.put("acks", "all");//無限重試,直到你意識到出現了問題,設置大于0的值將使客戶端重新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什么不同。允許重試將潛在的改變數據的順序,如果這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。props.put("retries ", MAX_VALUE);props.put("reconnect.backoff.ms ", 20000);props.put("retry.backoff.ms", 20000);//關閉unclean leader選舉,即不允許非ISR中的副本被選舉為leader,以避免數據丟失props.put("unclean.leader.election.enable", false);//關閉自動提交offsetprops.put("enable.auto.commit", false);限制客戶端在單個連接上能夠發送的未響應請求的個數。設置此值是1表示kafka broker在響應請求之前client不能再向同一個broker發送請求。注意:設置此參數是為了避免消息亂序props.put("max.in.flight.requests.per.connection", 1);Kafka重復消費原因
強行kill線程,導致消費后的數據,offset沒有提交,partition就斷開連接。比如,通常會遇到消費的數據,處理很耗時,導致超過了Kafka的session timeout時間(0.10.x版本默認是30秒),那么就會re-blance重平衡,此時有一定幾率offset沒提交,會導致重平衡后重復消費。
如果在close之前調用了consumer.unsubscribe()則有可能部分offset沒提交,下次重啟會重復消費
kafka數據重復 kafka設計的時候是設計了(at-least once)至少一次的邏輯,這樣就決定了數據可能是重復的,kafka采用基于時間的SLA(服務水平保證),消息保存一定時間(通常為7天)后會被刪除
kafka的數據重復一般情況下應該在消費者端,這時log.cleanup.policy = delete使用定期刪除機制
總結
以上是生活随笔為你收集整理的Kafka的优化建议的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kafka丢消息
- 下一篇: JSR94(Java Rule Engi