kafka查看topic中的数据_实战!Kafka Manager能统计出Topic中的记录条数吗?
問題描述
今天現場實施同事說Kafka Manager上顯示有3500w條記錄,但使用我們的平臺落地后,一統計發現只有2200w條記錄,這是不是說明我們的平臺存在丟數據的可能。
經了解,對接方是通過如下界面來判斷topic中的記錄條數的。
?????上圖是Kafka Manager的其中一個界面,該界面顯示了Kafka Topic的分區數,Broker的分布情況,以及每個Topic中Recent Offset之和。(在各個分區中,Offset值都是從0開始往后遞增的)
在很久以前,我們團隊其實已經考慮到了數據丟失的可能,于是,利用StreamingListener接口去監聽StreamingListenerBatchCompleted事件,只有監聽到該事件,才會去提交offset。理論上來說,我們的平臺是可以保證“至少一次”的語義。竟然敢懷疑我維護了四年的大數據平臺!不能忍,我得證明不是我的問題啊~
在開啟甩鍋大法之前,首先介紹一下與本題有一定關聯度的知識點——Kafka的老化機制。
kafka老化機制
有兩種情況,會觸發Kafka的老化機制
一、根據設定時間觸發Kafka老化。
相關的配置項有
log.retention.hourslog.retention.minuteslog.retention.ms這三個參數都是用來設置老化時間的,只是時間單位不太一樣。這些參數都是topic級別的,既可以通過命令的方式對特定的topic設置老化時間,也可以在server.properties文件里配置。
命令方式設置老化時間如下:
./kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name aypayp1 --alter --add-config retention.ms=300000默認情況下,配置文件中設置了log.retention.hours為168,另外兩個沒有給初始值,所以默認情況下,kafka老化時間是7天。
二、根據每個分區保存數據大小。?
log.retention.bytes?該參數指定了每個分區保留多少字節數據,數據量超過該值,便可觸發老化的動作。該參數值默認為-1,是沒有限制的。
????另外,還要介紹一個配置項
log.retention.check.interval.ms默認值300000????該參數表示kafka進程中起了一個定時線程,該線程5分鐘掃描本機器管理的所有partition數據,是否有分區滿足老化條件,從而觸發真正的老化動作。
????綜上:
????????默認情況下,kafka僅僅根據時間來觸發老化動作。
Kafka?Manager介紹
Kafka Manager是Yahoo開源的項目,也是為了更方便去維護Yahoo的kafka集群。在我的平常使用中,我可以用它來做如下的工作:
1、觀察kafka topic 吞吐量,即每秒進入多少條數據。根據這個信息,我們就可以判斷Spark Streaming要達到什么樣的處理速度,才能夠及時處理數據。
2、觀察kafka中最新的offset值之和,一般可以大致知道topic中的數據量。
3、觀察消費組的消費情況,消費組消費了各個分區消費了多少條,滯留了多少條。這個信息,也可以判斷自己系統的處理能力。
4、觀察各個分區的數據是否均衡。
甩鍋過程
????下面過程用來證明Kafka Manager列出的summed recent offset值,在老化的情況下,并不能代表topic總條數。
1、向topic aypayp1 發送10條數據,并在kafka Manager中確定summed recent offset值也是10。
2、用測試代碼,重頭消費aypayp1 ?topic中的數據。
結果如下:
證明數據也是進去了。
3、對該topic設置老化時間,命令如下:
./kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name aypayp1 --alter --add-config retention.ms=300000查看是否生效
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic aypayp1 --describe4、五分鐘后,再往該topic中發送10條消息,并用Kafka Manager確認summed recent offset值為20。
5、再過五分鐘,重頭消費該topic,消費結果如下:
????通過上述過程,可以證明對接方的觀點是錯誤的。
總結
1、kafka manager上顯示的summed recent offset值,只是表征該topic接收到多少條消息,并不能代表去消費時,就能消費出這么多的消息。
2、Kafka提供了kafka-run-class.sh腳本,利用該腳本也可以查看topic每個partition的offset值區間范圍,該腳本的使用方式
# 查看各個partition的最小offset值。sh kafka-run-class.sh kafka.tools.GetOffsetShell --topic mytopic --time -2 --broker-list host1:9092,host2:9092,host3:9092#?查看各個partition的最新offset值。sh kafka-run-class.sh kafka.tools.GetOffsetShell --topic mytopic--time?-1?--broker-list?host1:9092,host2:9092,host3:9092????查看GetOffsetShell類可知,--time設為-1時,表示使用latest方式;--time設為-2時,表示使用earliest方式。通過?該類的底層實現可知,其實就是用latest或earliest方式去消費該topic,從而拿到這個區間范圍。
總結
以上是生活随笔為你收集整理的kafka查看topic中的数据_实战!Kafka Manager能统计出Topic中的记录条数吗?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 支付宝支付回调异常_支付宝崩了是怎么回事
- 下一篇: 与基础事务管理器的通信失败 存货申请_金