漫游Kafka实现篇之消息和日志
原文地址:http://blog.csdn.net/honglei915/article/details/37760631
Kafka視頻教程同步首發(fā),歡迎觀看!
消息格式
日志
一個(gè)叫做“my_topic”且有兩個(gè)分區(qū)的的topic,它的日志有兩個(gè)文件夾組成,my_topic_0和my_topic_1,每個(gè)文件夾里放著具體的數(shù)據(jù)文件,每個(gè)數(shù)據(jù)文件都是一系列的日志實(shí)體,每個(gè)日志實(shí)體有一個(gè)4個(gè)字節(jié)的整數(shù)N標(biāo)注消息的長(zhǎng)度,后邊跟著N個(gè)字節(jié)的消息。每個(gè)消息都可以由一個(gè)64位的整數(shù)offset標(biāo)注,offset標(biāo)注了這條消息在發(fā)送到這個(gè)分區(qū)的消息流中的起始位置。每個(gè)日志文件的名稱都是這個(gè)文件第一條日志的offset.所以第一個(gè)日志文件的名字就是00000000000.kafka.所以每相鄰的兩個(gè)文件名字的差就是一個(gè)數(shù)字S,S差不多就是配置文件中指定的日志文件的最大容量。
消息的格式都由一個(gè)統(tǒng)一的接口維護(hù),所以消息可以在producer,broker和consumer之間無縫的傳遞。存儲(chǔ)在硬盤上的消息格式如下所示:
消息長(zhǎng)度: 4 bytes (value: 1+4+n) 版本號(hào): 1 byte CRC校驗(yàn)碼: 4 bytes 具體的消息: n bytes
寫操作
消息被不斷的追加到最后一個(gè)日志的末尾,當(dāng)日志的大小達(dá)到一個(gè)指定的值時(shí)就會(huì)產(chǎn)生一個(gè)新的文件。對(duì)于寫操作有兩個(gè)參數(shù),一個(gè)規(guī)定了消息的數(shù)量達(dá)到這個(gè)值時(shí)必須將數(shù)據(jù)刷新到硬盤上,另外一個(gè)規(guī)定了刷新到硬盤的時(shí)間間隔,這對(duì)數(shù)據(jù)的持久性是個(gè)保證,在系統(tǒng)崩潰的時(shí)候只會(huì)丟失一定數(shù)量的消息或者一個(gè)時(shí)間段的消息。
讀操作
讀操作需要兩個(gè)參數(shù):一個(gè)64位的offset和一個(gè)S字節(jié)的最大讀取量。S通常比單個(gè)消息的大小要大,但在一些個(gè)別消息比較大的情況下,S會(huì)小于單個(gè)消息的大小。這種情況下讀操作會(huì)不斷重試,每次重試都會(huì)將讀取量加倍,直到讀取到一個(gè)完整的消息??梢耘渲脝蝹€(gè)消息的最大值,這樣服務(wù)器就會(huì)拒絕大小超過這個(gè)值的消息。也可以給客戶端指定一個(gè)嘗試讀取的最大上限,避免為了讀到一個(gè)完整的消息而無限次的重試。
在實(shí)際執(zhí)行讀取操縱時(shí),首先需要定位數(shù)據(jù)所在的日志文件,然后根據(jù)offset計(jì)算出在這個(gè)日志中的offset(前面的的offset是整個(gè)分區(qū)的offset),然后在這個(gè)offset的位置進(jìn)行讀取。定位操作是由二分查找法完成的,Kafka在內(nèi)存中為每個(gè)文件維護(hù)了offset的范圍。
下面是發(fā)送給consumer的結(jié)果的格式:
刪除
日志管理器允許定制刪除策略。目前的策略是刪除修改時(shí)間在N天之前的日志(按時(shí)間刪除),也可以使用另外一個(gè)策略:保留最后的N GB數(shù)據(jù)的策略(按大小刪除)。為了避免在刪除時(shí)阻塞讀操作,采用了copy-on-write形式的實(shí)現(xiàn),刪除操作進(jìn)行時(shí),讀取操作的二分查找功能實(shí)際是在一個(gè)靜態(tài)的快照副本上進(jìn)行的,這類似于Java的CopyOnWriteArrayList。
可靠性保證
日志文件有一個(gè)可配置的參數(shù)M,緩存超過這個(gè)數(shù)量的消息將被強(qiáng)行刷新到硬盤。一個(gè)日志矯正線程將循環(huán)檢查最新的日志文件中的消息確認(rèn)每個(gè)消息都是合法的。合法的標(biāo)準(zhǔn)為:所有文件的大小的和最大的offset小于日志文件的大小,并且消息的CRC32校驗(yàn)碼與存儲(chǔ)在消息實(shí)體中的校驗(yàn)碼一致。如果在某個(gè)offset發(fā)現(xiàn)不合法的消息,從這個(gè)offset到下一個(gè)合法的offset之間的內(nèi)容將被移除。
有兩種情況必須考慮:1,當(dāng)發(fā)生崩潰時(shí)有些數(shù)據(jù)塊未能寫入。2,寫入了一些空白數(shù)據(jù)塊。第二種情況的原因是,對(duì)于每個(gè)文件,操作系統(tǒng)都有一個(gè)inode(inode是指在許多“類Unix文件系統(tǒng)”中的一種數(shù)據(jù)結(jié)構(gòu)。每個(gè)inode保存了文件系統(tǒng)中的一個(gè)文件系統(tǒng)對(duì)象,包括文件、目錄、大小、設(shè)備文件、socket、管道, 等等),但無法保證更新inode和寫入數(shù)據(jù)的順序,當(dāng)inode保存的大小信息被更新了,但寫入數(shù)據(jù)時(shí)發(fā)生了崩潰,就產(chǎn)生了空白數(shù)據(jù)塊。CRC校驗(yàn)碼可以檢查這些塊并移除,當(dāng)然因?yàn)楸罎⒍磳懭氲臄?shù)據(jù)塊也就丟失了。
總結(jié)
以上是生活随笔為你收集整理的漫游Kafka实现篇之消息和日志的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 漫游Kafka实战篇之客户端编程实例
- 下一篇: 漫游Kafka实现篇之分布式