logstash的lumberjack协议解析
最近在實(shí)現(xiàn)一個agent采集服務(wù)器日志,設(shè)計(jì)agent需要能夠?qū)?shù)據(jù)發(fā)送給logstash。當(dāng)然logstash支持很多輸入?yún)f(xié)議,其中,logstash技術(shù)棧(包括ElasticSearch)內(nèi)有一種叫做lumberjack的協(xié)議,可能是專門為傳輸日志數(shù)據(jù)設(shè)計(jì)的。不過網(wǎng)上對于lumberjack協(xié)議沒有公開的資料,而且實(shí)現(xiàn)上,只有java、ruby、golang版。筆者通過參考golang版(elastic/go-lumber)和java版(logstash-forwarder-java),用C實(shí)現(xiàn)了agent對lumberjack的支持。本文總結(jié)一下lumberjack協(xié)議的協(xié)議報(bào)文格式。
lumberjack版本
lumberjack總共有兩個版本,logstash-forwarder-java只實(shí)現(xiàn)了第一版,elastic/go-lumber兩個版本都實(shí)現(xiàn)了。新版的logstash作為服務(wù)端同時支持兩個版本。相比而言,V2在格式定義上支持json,因此比V1簡化很多,而且冗余信息略少于V1。由于json的引入,使得V2版本支持json支持的所有類型,而V1卻甚至無法表達(dá)一個整型類型(只能全部用字符串表達(dá))。
日志對象
一個日志對象是一個map<string,byte[]>,所以可以用json形式來序列化表達(dá)(V2才支持json)。當(dāng)然這種數(shù)據(jù)結(jié)構(gòu)不限于傳輸日志數(shù)據(jù)。一個完整的lumberjack報(bào)文可以包含多個日志對象,即可以支持批量發(fā)送日志。在同一個報(bào)文中,每個日志對象用sequence(uint32_t)來區(qū)分,類似數(shù)組的索引(index)。
V1版本格式
Window頭
Window頭包含有協(xié)議的版本、W標(biāo)志、日志對象數(shù)量 這3個信息,下面是Window頭的報(bào)文格式:
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 +---------------+---------------+---------------------------------------------------------------+ | '1'(0x31) | 'W'(0x57) | window size(uint32 bigendian) | +---------------+---------------+---------------------------------------------------------------+'1'(0x31)是一個ASCII字符1占用一個字節(jié),表示V1版本
'W'(0x57)是W標(biāo)識,表示是Window頭
window size是一個uint32_t的整型(大端存儲),這個值表示報(bào)文的日志對象有多少個
日志對象格式
一個日志對象包含對象頭和對象體
下面是日志對象頭
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 +---------------+---------------+---------------------------------------------------------------+ | '1'(0x31) | 'D'(0x44) | seq(uint32 bigendian) | +---------------------------------------------------------------+-------------------------------+ | count of key value pairs(uint32 bigendian) | +---------------------------------------------------------------+'1'(0x31)是一個ASCII字符1占用一個字節(jié),表示V1版本
'D'(0x44)是D標(biāo)識,表示是Data
seq即為日志對象的序號,一般可以從1開始累加,用來在后面的確認(rèn)報(bào)文中會帶上序號,發(fā)送端就可以知道接收端究竟確認(rèn)了哪些對象了
count of key value pairs,由于日志對象的基本數(shù)據(jù)類型是map<string,bytes[]>所以構(gòu)成了一系列的key value pair,這個值就是標(biāo)識一個日志對象中,究竟包含多少個key value pair。
下面是日志對象體
既然日志對象是map<string,bytes[]>結(jié)構(gòu),那么數(shù)據(jù)體就是要存儲這個結(jié)構(gòu),我們來繼續(xù)看日志體的格式
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 +---------------------------------------------------------------+ | 1st key size(in bytes,big endian) | + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | | | 1st key payload | | | + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | 1st payload size(in bytes,big endian) | + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | | | 1st payload payload | | | + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | 2nd key size(in bytes,big endian) | + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | | | 2nd key payload | | | + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | 2nd payload size(in bytes,big endian) | + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | | | 2nd payload payload | | | : :可以看到,每個<string,byte[]>分為key和payload兩部分,string自然是key,而byte[]自然是payload。
在組包時,先填入key的長度,然后填入key,再填入payload長度,再填入payload,如此往復(fù),就可以完成一個map<string,byte[]>結(jié)構(gòu)的表達(dá)。
最終,一個完整的lumberjack請求包,包含:
Window頭 + N x 日志對象(日志對象頭+日志對象體)ACK
lumberjack設(shè)計(jì)了應(yīng)用層ACK機(jī)制,即接收端可以在一次請求過程中,隨時確認(rèn)收到的每一個日志對象(通過seq標(biāo)示)。當(dāng)然也可以全部收完以后,只確認(rèn)最后一個日志對象。客戶端一般可以設(shè)計(jì)成等待服務(wù)端確認(rèn)最后一個日志對象的ACK后,放心的認(rèn)為服務(wù)端接收全部收完了。
ACK的格式如下:
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 +---------------+---------------+---------------------------------------------------------------+ | '1'(0x31) | 'A'(0x41) | record seq(uint32 bigendian) | +---------------+---------------+---------------------------------------------------------------+壓縮報(bào)文
lumberjack支持將日志對象數(shù)據(jù)壓縮后發(fā)送,不過在一個完成的報(bào)文中,不能同時包含壓縮和非壓縮數(shù)據(jù),即要么對所有的日志對象一起壓縮,要么都不壓縮。壓縮形態(tài)的報(bào)文格式為:
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 +---------------+---------------+---------------------------------------------------------------+ | '1'(0x31) | 'C'(0x43) | payload length(in bytes,uint32 bigendian) | +-----------------------------------------------------------------------------------------------+ | | | compressed record segment(s) | | | +-----------------------------------------------------------------------------------------------+'C'(0x43)表示Compress數(shù)據(jù)
payload length表示壓縮數(shù)據(jù)的長度
compressed record segment(s)是對一個或多個日志對象(包括日志頭和日志體)進(jìn)行壓縮以后的字節(jié)數(shù)據(jù)
所以一個完整的壓縮形態(tài)的數(shù)據(jù)報(bào)文應(yīng)該是:
Window頭 + 壓縮報(bào)文值得注意的是,盡管使用了壓縮報(bào)文,Window頭中的日志對象數(shù)量還是需要指明日志對象的數(shù)量的。
關(guān)于壓縮,是基于deflate進(jìn)行的,使用默認(rèn)的壓縮級別即可(level6)。各個語言都可以使用zlib進(jìn)行壓縮和解壓,以C語言為例,zlib Usage Example。
下面的代碼用于壓縮一段內(nèi)存數(shù)據(jù),代碼大致如下:
#include <zlib.h>z_stream strm; int ret; unsigned char output[CHUNK]; strm.zalloc = Z_NULL; strm.zfree = Z_NULL; strm.opaque = Z_NULL; //Z_DEFAULT_COMPRESSION是-1,是默認(rèn)的壓縮級別,相當(dāng)于壓縮級別6 deflateInit(&strm, Z_DEFAULT_COMPRESSION);strm.next_in = (unsigned char *)input; strm.avail_in = len; strm.next_out = output;do {strm.avail_out = CHUNK;ret = deflate(&strm, Z_FINISH);//寫如結(jié)果memcpy(result,output,CHUNK-strm.avail_out); } while(strm.avail_out == 0);deflateEnd(&strm);V2版本格式
V2版本就沒有V1版本那么啰嗦了,直接上圖:
window header與v1的協(xié)議一致,只是version部分填入2。
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 +---------------+---------------+---------------------------------------------------------------+ | '2'(0x31) | 'W'(0x57) | window size(uint32 bigendian) | +---------------+---------------+---------------------------------------------------------------+日志對象的頭部用J代替D,表示是json格式的數(shù)據(jù)
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 +---------------+---------------+---------------------------------------------------------------+ | '2'(0x32) | 'J'(0x4A) | seq(uint32 bigendian) | +---------------------------------------------------------------+-------------------------------+ | payload length(uint32 bigendian) | +---------------------------------------------------------------+ | | | json encode string | | | + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +除此之外,由于payload部分不需要通過協(xié)議本身來表達(dá)map<string,byte[]>,直接借助json格式,所以只需增加json字符串的串長就可以了(圖中的payload length)。
V2版本的ACK與V1版本完全相同。
總結(jié)
以上是生活随笔為你收集整理的logstash的lumberjack协议解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: IOS 系统振动调用
- 下一篇: error C2872: “ACCESS