skynet源码分析之网络层——网关服务器
在上一篇文章里介紹Lua層通過lualib/skynet/socket.lua這個庫與網(wǎng)絡(luò)底層交互(http://www.cnblogs.com/RainRill/p/8707328.html)。除此之外,skynet還提供一個通用模板lualib/snax/gateserver來啟動一個網(wǎng)關(guān)服務(wù)器,通過TCP連接和客戶端交換數(shù)據(jù),這個庫不能與socket.lua共用,因為這個庫接管了底層傳來的socket類消息,具體用法參考官方wikihttps://github.com/cloudwu/skynet/wiki/GateServer。
1. 概述
gateserver注冊接收網(wǎng)絡(luò)底層傳過來的socket消息,通過netpack.filter解析消息包(第6行),稍后會著重分析如何解析,解析完返回的type有6中類型,每種類型指定特定的回調(diào)函數(shù)。注:當(dāng)一個包不完整時,type為nil,這種情況不需要處理。
"open":新連接建立;"close":關(guān)閉連接;"warning":當(dāng)fd上待發(fā)送的數(shù)據(jù)累積超過1M時,會收到這個消息;’"error":發(fā)生錯誤,關(guān)閉fd;“data”:表示收到一個完整的tcp包,回調(diào)函數(shù)把這個包傳給邏輯層去處理;
1 -- lualib/snax/gateserver.lua
2 skynet.register_protocol {
3 name = "socket",
4 id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
5 unpack = function ( msg, sz )
6 return netpack.filter( queue, msg, sz)
7 end,
8 dispatch = function (_, _, q, type, ...)
9 queue = q
10 if type then
11 MSG[type](...)
12 end
13 end
14 }
“more”:表示收到的數(shù)據(jù)不止一個tcp包,netpack.filter會把包依次放到隊列里,然后回調(diào)函數(shù)一個個從隊列中pop出來(第11行)
1 -- lualib/snax/gateserver.lua 2 local function dispatch_queue() 3 local fd, msg, sz = netpack.pop(queue) 4 if fd then 5 -- may dispatch even the handler.message blocked 6 -- If the handler.message never block, the queue should be -- 7 -- empty, so only fork once and then exit. 8 skynet.fork(dispatch_queue) 9 dispatch_msg(fd, msg, sz) 10 11 for fd, msg, sz in netpack.pop, queue do 12 dispatch_msg(fd, msg, sz) 13 end 14 end 15 end 16 17 MSG.more = dispatch_queue
2. 如何解析TCP數(shù)據(jù)包
為了說明如何解析TCP數(shù)據(jù)包,先了解下網(wǎng)絡(luò)底層是采用什么策略接收數(shù)據(jù)的。單個socket每次從內(nèi)核嘗試讀取的數(shù)據(jù)字節(jié)數(shù)為sz(第6行),這個值保存在s->p.size中,初始是MIN_READ_BUFFER(64b),當(dāng)實際讀到的數(shù)據(jù)等于sz時,sz擴大一倍(8-9行);如果小于sz的一半,則設(shè)置sz為原來的一半(10-11行)。
比如,客戶端發(fā)了一個1kb的數(shù)據(jù),socket線程會從內(nèi)核里依次讀取64b,128b,256b,512b,64b數(shù)據(jù),總共需讀取5次,即會向gateserver服務(wù)發(fā)5條消息,一個TCP包被切割成5個數(shù)據(jù)塊。第5次嘗試讀取1024b數(shù)據(jù),所以可能會讀到其他TCP包的數(shù)據(jù)(只要客戶端有發(fā)送其他數(shù)據(jù))。接下來,客戶端再發(fā)一個1kb的數(shù)據(jù),socket線程只需從內(nèi)核讀取一次即可。
1 // skynet-src/socket_server.c
2 static int
3 forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
4 int sz = s->p.size;
5 char * buffer = MALLOC(sz);
6 int n = (int)read(s->fd, buffer, sz);
7 ...
8 if (n == sz) {
9 s->p.size *= 2;
10 } else if (sz > MIN_READ_BUFFER && n*2 < sz) {
11 s->p.size /= 2;
12 }
13 }
netpack做的工作就是把這些數(shù)據(jù)塊組裝成一個完整的TCP包,再交給gateserver去處理。注:netpack約定,tcp包頭兩字節(jié)(大端方式)表示數(shù)據(jù)包長度。如果采用sproto打包方式,需附加4字節(jié)(32位)的session值。所以客戶端傳過來1kb數(shù)據(jù),實際數(shù)據(jù)只有1024-2-4=1018字節(jié)。
數(shù)據(jù)結(jié)構(gòu):
16-19行,用數(shù)組實現(xiàn)的隊列。當(dāng)客戶端連續(xù)發(fā)了幾個小的tcp包,gateserver收到的一條消息包可能包含多個tcp包,存放到這個隊列里
第20行,存放不完整的tcp包的指針數(shù)組,每一項是指向一個鏈表,fd hash值相同的組成一個鏈表。
1 // lualib-src/lua-netpack.c
2 struct netpack {
3 int id; //socket id
4 int size; //數(shù)據(jù)塊長度
5 void * buffer; //數(shù)據(jù)塊
6 };
7
8 struct uncomplete { //不完整tcp包結(jié)構(gòu)
9 struct netpack pack; //數(shù)據(jù)塊信息
10 struct uncomplete * next; //鏈表,指向下一個
11 int read; //已讀的字節(jié)數(shù)
12 int header; //第一個字節(jié)(代表數(shù)據(jù)長度的高8位)
13 };
14
15 struct queue {
16 int cap;
17 int head;
18 int tail;
19 struct netpack queue[QUEUESIZE]; //一次從內(nèi)核讀取多個tcp包時放入該隊列里
20 struct uncomplete * hash[HASHSIZE]; //指針數(shù)組,數(shù)組里每個位置指向一個不完整的tcp包鏈表,fd hash值相同的組成一個鏈表
21 };
解析流程:
最終會調(diào)用filter_data_這個接口解析,下面著重介紹:
參數(shù):fd socket; buffer從內(nèi)核中讀到的數(shù)據(jù)塊;size數(shù)據(jù)塊大小
先看后半段46-82行,當(dāng)queue里并沒有該socket剩余的數(shù)據(jù)塊,執(zhí)行46行分支。
46-51行,是一個不完整的tcp包,只有一個字節(jié)數(shù)據(jù),說明表示長度的頭部兩字節(jié)數(shù)據(jù)都還差一個字節(jié),構(gòu)造一個uncomplete結(jié)構(gòu)(簡稱uc),然后存在queue->hash里。uc->read設(shè)置為-1,uc->header存放這一個字節(jié)。返回給Lua層的type是nil,Lua層不需要處理。
52-54行,通過頭部兩字節(jié)計算tcp包的長度read_size,接下來比較收到的數(shù)據(jù)size與真正需要的數(shù)據(jù)pack_size。
56-63行,size<pack_size,說明tcp包還有未讀到的數(shù)據(jù),將已讀到的數(shù)據(jù)構(gòu)造一個uc結(jié)構(gòu),保存在queue->hash里,返回給Lua層的type是nil,Lua層不需要處理。uc->read已讀到字節(jié),uc->pack.size目標(biāo)字節(jié)數(shù)
64-73行,size=pack_size,說明是一個完整的tcp包,大部分是這種情況,把tcp包返回給Lua層即可,此時返回的type是“data”(第66行)。
74-82行,size>pack_size,說明不止一個tcp包的數(shù)據(jù),則先通過push_data保存第一個完整的tcp包(76行),接著通過push_more處理余下的數(shù)據(jù)(79行)。返回的type是"more"。
push_data做的工作是將tcp包保存在隊列里,供Lua層pop出使用。
push_more是一個遞歸操作,流程跟上面一樣,對比讀到的數(shù)據(jù)和需要的數(shù)據(jù)做對應(yīng)的處理。
接著看6-44行,之前收到了tcp包的部分?jǐn)?shù)據(jù)塊。
8-18行,說明之前只讀到一個字節(jié),加上該數(shù)據(jù)塊的第一個字節(jié),組成兩個字節(jié)計算出整個包的長度(12行)
第19行,目標(biāo)字節(jié)-已讀字節(jié)=需要的字節(jié)need。
20-27行,如果size<need,說明仍然還差數(shù)據(jù)塊沒收到,此時將數(shù)據(jù)附加到之前的uc->pack.buffer里。
28-44行,其他兩種情況跟上面處理流程一樣。
1 // lualib-src/lua-netpack.c
2 static int
3 filter_data_(lua_State *L, int fd, uint8_t * buffer, int size) {
4 struct queue *q = lua_touserdata(L,1);
5 struct uncomplete * uc = find_uncomplete(q, fd);
6 if (uc) { //之前收到該包的部分?jǐn)?shù)據(jù)塊,
7 // fill uncomplete
8 if (uc->read < 0) {//之前只收到一個字節(jié),加上該數(shù)據(jù)塊的第一個字節(jié),表示整個包的長度
9 // read size
10 assert(uc->read == -1);
11 int pack_size = *buffer;
12 pack_size |= uc->header << 8 ;
13 ++buffer;
14 --size;
15 uc->pack.size = pack_size;
16 uc->pack.buffer = skynet_malloc(pack_size);
17 uc->read = 0;
18 }
19 int need = uc->pack.size - uc->read;//包還差多少字節(jié)
20 if (size < need) {
21 memcpy(uc->pack.buffer + uc->read, buffer, size);
22 uc->read += size;
23 int h = hash_fd(fd);
24 uc->next = q->hash[h];
25 q->hash[h] = uc;
26 return 1;
27 }
28 memcpy(uc->pack.buffer + uc->read, buffer, need);
29 buffer += need;
30 size -= need;
31 if (size == 0) {
32 lua_pushvalue(L, lua_upvalueindex(TYPE_DATA));
33 lua_pushinteger(L, fd);
34 lua_pushlightuserdata(L, uc->pack.buffer);
35 lua_pushinteger(L, uc->pack.size);
36 skynet_free(uc);
37 return 5;
38 }
39 // more data
40 push_data(L, fd, uc->pack.buffer, uc->pack.size, 0);
41 skynet_free(uc);
42 push_more(L, fd, buffer, size);
43 lua_pushvalue(L, lua_upvalueindex(TYPE_MORE));
44 return 2;
45 } else {
46 if (size == 1) {
47 struct uncomplete * uc = save_uncomplete(L, fd);
48 uc->read = -1;
49 uc->header = *buffer;
50 return 1;
51 }
52 int pack_size = read_size(buffer); //需要數(shù)據(jù)包的字節(jié)數(shù)
53 buffer+=2;
54 size-=2;
55
56 if (size < pack_size) { //說明還有未獲得的數(shù)據(jù)包
57 struct uncomplete * uc = save_uncomplete(L, fd); //保存這個數(shù)據(jù)包
58 uc->read = size;
59 uc->pack.size = pack_size;
60 uc->pack.buffer = skynet_malloc(pack_size);
61 memcpy(uc->pack.buffer, buffer, size);
62 return 1;
63 }
64 if (size == pack_size) { //說明是一個完整包,把包返回給Lua層即可
65 // just one package
66 lua_pushvalue(L, lua_upvalueindex(TYPE_DATA));
67 lua_pushinteger(L, fd);
68 void * result = skynet_malloc(pack_size);
69 memcpy(result, buffer, size);
70 lua_pushlightuserdata(L, result);
71 lua_pushinteger(L, size);
72 return 5;
73 }
74 // more data
75 // 說明不止同一個數(shù)據(jù)包,還有額外的
76 push_data(L, fd, buffer, pack_size, 1); //保存第一個包到q->queue中
77 buffer += pack_size;
78 size -= pack_size;
79 push_more(L, fd, buffer, size); //處理余下的包
80 lua_pushvalue(L, lua_upvalueindex(TYPE_MORE));
81 return 2;
82 }
83 }
舉例,客戶端發(fā)了一個1kb的數(shù)據(jù),socket線程會從內(nèi)核里依次讀取64b,128b,256b,512b,64b數(shù)據(jù)。gateserver會執(zhí)行5次filter_data_,分支依次是56行,20行,20行,20行,31行。
總結(jié)
以上是生活随笔為你收集整理的skynet源码分析之网络层——网关服务器的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 玉溪红色烟草扁盒细支多少钱?
- 下一篇: 坐火车能带洗衣服的爆炸盐吗?