关于kafka中的timestamp与offset的对应关系
關于kafka中的timestamp與offset的對應關系
@(KAFKA)[storm, kafka, 大數據]
- 關于kafka中的timestamp與offset的對應關系
- 獲取單個分區的情況
- 同時從所有分區獲取消息的情況
- 結論
- 如何指定時間
- 出現UpdateOffsetException時的處理方法
- 相關源碼略讀
- 1入口
- 2處理邏輯
- 1建立offset與timestamp的對應關系并保存到數據中
- 2找到最近的最后一個滿足 timestamp target_timestamp 的 index
- 3找到滿足該條件的offset數組
- 3注意事項
獲取單個分區的情況
kafka通過offset記錄每條日志的偏移量,詳見《Kafka文件存儲機制那些事》。但是當用戶想讀取之前的信息時,他是不可能知道這些消息對應的offset的,用戶只能指定時間,比如說我從昨天的12點開始讀取消息。
這就有個問題了,怎么樣將用戶定義的時間轉化為集群內部的offset呢?
先簡單重溫一下kafka的物理存儲機制:每個topic分成多個分區,而一個分區對應磁盤中的一個目錄,目錄中會有多個文件,比如:
00000000000000000000.index 00000000000000000000.log 00000000000001145974.index 00000000000001145974.log可以看出來,每個segment file其實有2部分,一個index文件,一個log文件。文件名是這個文件內的第一個消息的offset。log文件記錄的是實際的消息內容。而index對log文件作了索引,當需要查看某個消息時,如果指定offset,很容易就定位到log文件中的具體位置。詳見上面說的文章。
但正如剛才所說,用戶不知道offset,而只知道時間,所以就需要轉換了。
kafka用了一個很直觀很簡單的方法:將文件名中的offset與文件的最后修改時間放入一個map中,然后再查找。詳細步驟如下:
(1)將文件名及文件的最后時間放入一個map中,時間使用的是13位的unix時間戳
(2)當用戶指定一個時間t0時,在map中找到最后一個時間t1早于t0的時間,然后返回這個文件名,即這個文件的第一個offset。
(3)這里只返回了一個分區的offset,而事實上需要返回所有分區的offset,所以對所有分區采取上述步驟。
(4)使用取到的消息,開始消費消息。
舉個例子:
w-r--r-- 1 hadoop hadoop 1073181076 8?? 11 10:20 00000000000066427499.log -rw-r--r-- 1 hadoop hadoop 14832 8?? 11 10:20 00000000000066427499.index -rw-r--r-- 1 hadoop hadoop 1073187364 8?? 11 10:40 00000000000067642947.log -rw-r--r-- 1 hadoop hadoop 14872 8?? 11 10:40 00000000000067642947.index -rw-r--r-- 1 hadoop hadoop 1073486959 8?? 11 11:04 00000000000068857698.log -rw-r--r-- 1 hadoop hadoop 14928 8?? 11 11:04 00000000000068857698.index -rw-r--r-- 1 hadoop hadoop 1073511817 8?? 11 11:25 00000000000070069880.log -rw-r--r-- 1 hadoop hadoop 14920 8?? 11 11:25 00000000000070069880.index -rw-r--r-- 1 hadoop hadoop 10485760 8?? 11 11:28 00000000000071279203.index -rw-r--r-- 1 hadoop hadoop 148277228 8?? 11 11:28 00000000000071279203.log我們有上述幾個文件
(1)當我需要消費從8月11日11:00開始的數據時,它會返回最后修改時間早于8月11日11:00的文件名,此外是修改時間第10:40的文件,offset為67642947.其實由于它的最后修改時間在10:40,我們需要的數據不可能在它里面,它直接返回11:40的文件即可,但可能是出于更保險的考慮,它返回了上一個文件。
(2)其它類似,當我消費11:20的數據,返回的offset為68857698.
(3)而當我消費的數據早于10:20的話,則返回的offset為空,如果是通過數組保存offset的,則提取第一個offset時會出現 java.lang.ArrayIndexOutOfBoundsException 異常。如在kafka編程指南中的SimpleConsumer中的代碼:
當然,也可以合理處理,當返回為空時,直接返回最早的offset即可。
(4)當消費的數據晚于最晚時刻,返回最新的消息。
注意:
(1)這里對kafka集群本身沒有任何的負擔,kafka消息也不需要記錄時間點這個字段,只有在需要定位的時候,才臨時構建一個map,然后將offset與時間讀入這個map中。
(2)冗余很多消息。這種方法粒度非常粗,是以文件作為粒度的,因此冗余的消息數據和文件的大小有關系,默認為1G。如果這個topic的數據非常少,則這1G的數據可以就是幾天前的數據了。
(3)有2個特殊的時間點:
需要查找的 timestamp 是 -1 或者 -2時,特殊處理
同時從所有分區獲取消息的情況
1、當同時從多個分區讀取消息時,只要有其中一個分區,它的所有文件的修改時間均晚于你指定的時間,就會出錯,因為這個分區返回的offset為空,除非你作了合理的處理。
2、storm!!!
storm0.9x版本遇到上述問題時,同樣會出錯,出現以下異常
而從0.10版本開始,改為了從最早時間開始消費消息。
3、還有個問題,如何將消息均勻的分布但各個分區中。比如在我們一個topic中,其中一個分區已經有60G數據,而另一個分區還不足2G,如果指定時間的話,由于小的那個分區的修改時間肯定是在近期的,所以當指定一個較前的時間點就會出錯。而且即使不出錯,從不同分區返回的消息也可能時間相差很遠。
如何將數據均勻的分布到各個分區,請參考kafka編程指南的partitioner介紹。
只要出現這個問題,都是由于數據不存在,有可能是:
(1)數據真的丟失了
(2)數據傾斜嚴重
結論
如何指定時間
如果需要指定從某個時間點開始處理日志,則:
(1)就指定那個時間即可,不需要提前,因此返回的消息一定是在這個時間點之前的。
(2)如果這個時間點是繁忙時段,它返回的消息時間可能只是這個時間點之前的一小段時間。
(3)如果這個時間點是個空閑時間,它返回的日志時間可能是很長一段時間的日志。
(4)但不管是繁忙時間還是空閑時間,它都是多讀一個日志文件,所以冗余的日志數量是相同的。
舉個例子:
如果需要處理2015-08-15 15:00:00后的日志,則
(1)直接指定這個時間即可,不需要指定它之前的時間,如1:00, 2:00之類的,因為返回的日志時間決不會是3:00前的。同時,3:00進入kafka的數據有可能是3:00前的數據,決不會是3:00后的數據,所以也不需要考慮指定提前時間。
(2)由于這個時間日志一般較多,它返回的日志可能是2:30左右開始的。相反,如果是凌晨3:00,由于這個時間點日志較少,它返回的日志有可以是2、3小時前的。
出現UpdateOffsetException時的處理方法
(1)若是小項目,由于數據量不多,建議同頭開始處理,并通知SA檢查。
(2)若是大項目,一般是出現了數據傾斜,通知SA檢查數據情況。
相關源碼略讀
1、入口
Kafka Server 處理 Client 發送來的請求的入口在
文件夾: core/src/main/scala/kafka/server
類:kafka.server.KafkaApis
方法: handle
處理offset請求的函數: handleOffsetRequest
2、處理邏輯
處理邏輯主要分為四步
獲取partition
從partition中獲取offset
high water mark 處理(這一段的資料太少了)
異常處理
由于request中包含查詢多個partition的offset的請求。所以最終會返回一個map,保存有每個partition對應的offset
這里主要介紹從某一個partition中獲取offset的邏輯,代碼位置
kafka.log.Log#getOffsetsBefore(timestamp, maxNumOffsets)
從一個partition中獲取offset
(1)建立offset與timestamp的對應關系,并保存到數據中
//每個Partition由多個segment file組成。獲取當前partition中的segment列表 val segsArray = segments.view// 創建數組 var offsetTimeArray: Array[(Long, Long)] =null if(segsArray.last.size >0)offsetTimeArray =newArray[(Long, Long)](segsArray.length +1) elseoffsetTimeArray =newArray[(Long, Long)](segsArray.length)// 將 offset 與 timestamp 的對應關系添加到數組中 for(i <-0until segsArray.length)// 數據中的每個元素是一個二元組,(segment file 的起始 offset,segment file的最近修改時間)offsetTimeArray(i) = (segsArray(i).start, segsArray(i).messageSet.file.lastModified) if(segsArray.last.size >0)// 如果最近一個 segment file 不為空,將(最近的 offset, 當前之間)也添加到該數組中offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds) 通過這段邏輯,獲的一個數據 offsetTimeArray,每個元素是一個二元組,二元組內容是(offset, timestamp)(2)找到最近的最后一個滿足 timestamp < target_timestamp 的 index
var startIndex = -1 timestamp match {// 需要查找的 timestamp 是 -1 或者 -2時,特殊處理caseOffsetRequest.LatestTime => // OffsetRequest.LatestTime = -1startIndex = offsetTimeArray.length -1caseOffsetRequest.EarliestTime => // OffsetRequest.EarliestTime = -2startIndex =0case_ =>var isFound =falsedebug("Offset time array = "+ offsetTimeArray.foreach(o =>"%d, %d".format(o._1, o._2)))startIndex = offsetTimeArray.length -1 // 從最后一個元素反向找while(startIndex >=0&& !isFound) { // 找到滿足條件或者if(offsetTimeArray(startIndex)._2 <= timestamp) // offsetTimeArray 的每個元素是二元組,第二個位置是 timestampisFound =trueelsestartIndex -=1} }通過這段邏輯,實際找到的是 “最近修改時間早于目標timestamp的最近修改的segment file的起始offset”
但是獲取offset的邏輯并沒有結束,后續仍有處理
(3)找到滿足該條件的offset數組
實際上這個函數的功能是找到一組offset,而不是一個offset。第二個參數 maxNumOffsets 指定最多找幾個滿足條件的 offset。
獲取一組offset的邏輯 // 返回的數據的長度 = min(maxNumOffsets, startIndex + 1),startIndex是邏輯2中找到的index val retSize = maxNumOffsets.min(startIndex +1) val ret = newArray[Long](retSize)// 逐個將滿足條件的offset添加到返回的數據中 for(j <-0until retSize) {ret(j) = offsetTimeArray(startIndex)._1startIndex -=1 }// 降序排序返回。offset 越大數據越新。 // ensure that the returned seq is in descending order of offsets ret.toSeq.sortBy(- _)最終返回這個數組
3、注意事項
實際找到的offset并不是從目標timestamp開始的第一個offset。需要注意
當 timestamp 小于最老的數據文件的最近修改時間時,返回值是一個空數組。可能會導致使用時的問題。
調整segment file文件拆分策略的配置時,需要注意可能會造成的影響。
總結
以上是生活随笔為你收集整理的关于kafka中的timestamp与offset的对应关系的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kafka存储机制
- 下一篇: kafka集群编程指南