Redis运行流程源码解析--转载
http://blog.nosqlfan.com/html/4007.html
http://www.searchdatabase.com.cn/showcontent_62166.htm
導讀:本文分析源碼基于Redis 2.4.7 stable版本,對Redis運行流程,命令處理的內部實現進行了深入講解。
關鍵詞:Redis?源碼?運行流程?框架?
概述
Redis通過定義一個 struct redisServer 類型的全局變量server 來保存服務器的相關信息(比如:配置信息,統計信息,服務器狀態等等)。啟動時通過讀取配置文件里邊的信息對server進行初始化(如果沒有指定配置文 件,將使用默認值對sever進行初始化),初始化的內容有:起監聽端口,綁定有新連接時的回調函數,綁定服務器的定時函數,虛擬內存初始化,log初始 化等等。
啟動
初始化服務器配置
先來看看redis 的main函數的入口
Redis.c:1694
| int?main(int?argc,?char?**argv)?{ ????time_t?start; ????initServerConfig(); ????if?(argc?==?2)?{ ????????if?(strcmp(argv[1],?"-v")?==?0?|| ????????????strcmp(argv[1],?"--version")?==?0)?version(); ????????if?(strcmp(argv[1],?"--help")?==?0)?usage(); ????????resetServerSaveParams(); ????????loadServerConfig(argv[1]); ????}?else?if?((argc?>?2))?{ ????????usage(); ????}?else?{ ????????... ????} ????if?(server.daemonize)?daemonize(); ????initServer(); ????... |
- initServerConfig初始化全局變量 server 的屬性為默認值。
- 如果命令行指定了配置文件, resetServerSaveParams重置對落地備份的配置(即重置為默認值)并讀取配置文件的內容對全局變量 server 再進行初始化 ,沒有在配置文件中配置的將使用默認值。
- 如果服務器配置成后臺執行,則對服務器進行 daemonize。
- initServer初始化服務器,主要是設置信號處理函數,初始化事件輪詢,起監聽端口,綁定有新連接時的回調函數,綁定服務器的定時函數,初始化虛擬內存和log等等。
- 創建服務器監聽端口。
Redis.c:923
| ????if?(server.port?!=?0)?{ ????????server.ipfd=?anetTcpServer(server.neterr,server.port,server.bindaddr); ????????if?(server.ipfd?==?ANET_ERR)?{ ????????????redisLog(REDIS_WARNING,?"Opening?port?%d:?%s", ????????????????server.port,?server.neterr); ????????????exit(1); ????????} ????} |
- anetTcpServer創建一個socket并進行監聽,然后把返回的socket fd賦值給server.ipfd。
事件輪詢結構體定義
先看看事件輪詢的結構體定義
Ae.h:88
| /*?State?of?an?event?based?program?*/ typedef?struct?aeEventLoop?{ ????int?maxfd; ????long?long?timeEventNextId; ????aeFileEvent?events[AE_SETSIZE];?/*?Registered?events?*/ ????aeFiredEvent?fired[AE_SETSIZE];?/*?Fired?events?*/ ????aeTimeEvent?*timeEventHead; ????int?stop; ????void?*apidata;?/*?This?is?used?for?polling?API?specific?data?*/ ????aeBeforeSleepProc?*beforesleep; }?aeEventLoop; |
- maxfd是最大的文件描述符,主要用來判斷是否有文件事件需要處理(ae.c:293)和當使用select 來處理網絡IO時作為select的參數(ae_select.c:50)。
- timeEventNextId 是下一個定時事件的ID。
- events[AE_SETSIZE]用于保存通過aeCreateFileEvent函數創建的文件事件,在sendReplyToClient函數和freeClient函數中通過調用aeDeleteFileEvent函數刪除已經處理完的事件。
- fired[AE_SETSIZE] 用于保存已經觸發的文件事件,在對應的網絡I/O函數中進行賦值(epoll,select,kqueue),不會對fired進行刪除操作,只會一直覆 蓋原來的值。然后在aeProcessEvents函數中對已經觸發的事件進行處理。
- timeEventHead 是定時事件鏈表的頭,定時事件的存儲用鏈表實現。
- Stop 用于停止事件輪詢處理。
- apidata 用于保存輪詢api需要的數據,即aeApiState結構體,對于epoll來說,aeApiState結構體的定義如下:
| typedef?struct?aeApiState?{ ????int?epfd; ????struct?epoll_event?events[AE_SETSIZE]; }?aeApiState; |
- beforesleep 是每次進入處理事件時執行的函數。
創建事件輪詢
Redis.c:920
| ??server.el?=?aeCreateEventLoop(); Ae.c:55 aeEventLoop?*aeCreateEventLoop(void)?{ ????aeEventLoop?*eventLoop; ????int?i; ????eventLoop?=?zmalloc(sizeof(*eventLoop)); ????if?(!eventLoop)?return?NULL; ????eventLoop->timeEventHead?=?NULL; ????eventLoop->timeEventNextId?=?0; ????eventLoop->stop?=?0; ????eventLoop->maxfd?=?-1; ????eventLoop->beforesleep?=?NULL; ????if?(aeApiCreate(eventLoop)?==?-1)?{ ????????zfree(eventLoop); ????????return?NULL; ????} /*?Events?with?mask?==?AE_NONE?are?not?set.?So?let's?initialize ?*?the?vector?with?it.?*/ ????for?(i?=?0;?i?<?AE_SETSIZE;?i++) ????????eventLoop->events[i].mask?=?AE_NONE; ????return?eventLoop; } |
綁定定時函數和有新連接時的回調函數
redis.c:973
| aeCreateTimeEvent(server.el,?1,?serverCron,?NULL,?NULL); if?(server.ipfd?>?0?&& ????aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL)?==?AE_ERR)?oom("creating?file?event"); |
- aeCreateTimeEvent 創建定時事件并綁定回調函數serverCron,這個定時事件第一次是超過1毫秒就有權限執行,如果其他事件的處理時間比較長,可能會出現超過一定時間 都沒執行情況。這里的1毫秒只是超過后有可執行的權限,并不是一定會執行。第一次執行后,如果還要執行,是由定時函數的返回值確定的,在 processTimeEvents(ae.c:219)中,當調用定時回調函數后,獲取定時回調函數的返回值,如果返回值不等于-1,則設置定時回調函 數的下一次觸發時間為當前時間加上定時回調函數的返回值,即調用間隔時間。serverCron的返回值是100ms,表明從二次開始,每超過100ms 就有權限執行。(定時回調函數serverCron用于更新lru時鐘,更新服務器的狀態,打印一些服務器信息,符合條件的情況下對hash表進行重哈 希,啟動后端寫AOF或者檢查后端寫AOF或者備份是否完成,檢查過期的KEY等等)
- aeCreateFileEvent創建監聽端口的socket fd的文件讀事件(即注冊網絡io事件)并綁定回調函數acceptTcpHandler。
進入事件輪詢
初始化后將進入事件輪詢
Redis.c:1733
| ????aeSetBeforeSleepProc(server.el,beforeSleep); ????aeMain(server.el); ????aeDeleteEventLoop(server.el); |
- 設置每次進入事件處理前會執行的函數beforeSleep。
- 進入事件輪詢aeMain。
- 退出事件輪詢后刪除事件輪詢,釋放事件輪詢占用內存aeDeleteEventLoop(不過沒在代碼中發現有執行到這一步的可能,服務器接到shutdown命令時通過一些處理后直接就通過exit退出了,可能是我看錯了,待驗證)。
事件輪詢函數aeMain
看看aeMain的內容
Ae.c:382
| void?aeMain(aeEventLoop?*eventLoop)?{ ????eventLoop->stop?=?0; ????while?(!eventLoop->stop)?{ ????????if?(eventLoop->beforesleep?!=?NULL) ????????????eventLoop->beforesleep(eventLoop); ????????aeProcessEvents(eventLoop,?AE_ALL_EVENTS); ????} } |
- 每次進入事件處理前,都會調用設置的beforesleep,beforeSleep函數主要是處理被阻塞的命令和根據配置寫AOF。
- aeProcessEvents處理定時事件和網絡io事件。
啟動完畢,等待客戶端請求
到進入事件輪詢函數后,redis的啟動工作就做完了,接下來就是等待客戶端的請求了。
接收請求
新連接到來時的回調函數
在綁定定時函數和有新連接時的回調函數中說到了綁定有新連接來時的回調函數acceptTcpHandler,現在來看看這個函數的具體內容
Networking.c:427
| void?acceptTcpHandler(aeEventLoop?*el,?int?fd,?void?*privdata,?int?mask)?{ ????int?cport,?cfd; ????char?cip[128]; ????REDIS_NOTUSED(el); ????REDIS_NOTUSED(mask); ????REDIS_NOTUSED(privdata); ????cfd?=?anetTcpAccept(server.neterr,?fd,?cip,?&cport); ????if?(cfd?==?AE_ERR)?{ ????????redisLog(REDIS_WARNING,"Accepting?client?connection:?%s",?server.neterr); ????????return; ????} ????redisLog(REDIS_VERBOSE,"Accepted?%s:%d",?cip,?cport); ????acceptCommonHandler(cfd); } |
- anetTcpAccept 函數 accept新連接,返回的cfd是新連接的socket fd。
- acceptCommonHandler 函數是對新建立的連接進行處理,這個函數在使用 unix socket 時也會被用到。
接收客戶端的新連接
接下來看看anetTcpAccept函數的具體內容
| Anet.c:330 int?anetTcpAccept(char?*err,?int?s,?char?*ip,?int?*port)?{ ????int?fd; ????struct?sockaddr_in?sa; ????socklen_t?salen?=?sizeof(sa); ????if?((fd?=?anetGenericAccept(err,s,(struct?sockaddr*)&sa,&salen))?==?ANET_ERR) ????????return?ANET_ERR; ????if?(ip)?strcpy(ip,inet_ntoa(sa.sin_addr)); ????if?(port)?*port?=?ntohs(sa.sin_port); ????return?fd; } |
再進去anetGenericAccept 看看
Anet.c:313
| static?int?anetGenericAccept(char?*err,?int?s,?struct?sockaddr?*sa,?socklen_t?*len)?{ ????int?fd; ????while(1)?{ ????????fd?=?accept(s,sa,len); ????????if?(fd?==?-1)?{ ????????????if?(errno?==?EINTR) ????????????????continue; ????????????else?{ ????????????????anetSetError(err,?"accept:?%s",?strerror(errno)); ????????????????return?ANET_ERR; ????????????} ????????} ????????break; ????} ????return?fd; } |
- anetTcpAccept 函數中調用anetGenericAccept 函數進行接收新連接,anetGenericAccept函數在 unix socket 的新連接處理中也會用到。
- anetTcpAccept 函數接收新連接后,獲取客戶端得ip,port 并返回。
創建redisClient進行接收處理
anetTcpAccept 運行完后,返回新連接的socket fd, 然后返回到調用函數acceptTcpHandler中,繼續執行acceptCommonHandler 函數
Networking.c:403
| static?void?acceptCommonHandler(int?fd)?{ ????redisClient?*c; ????if?((c?=?createClient(fd))?==?NULL)?{ ????????redisLog(REDIS_WARNING,"Error?allocating?resoures?for?the?client"); ????????close(fd);?/*?May?be?already?closed,?just?ingore?errors?*/ ????????return; ????} ????/*?If?maxclient?directive?is?set?and?this?is?one?client?more...?close?the ?????*?connection.?Note?that?we?create?the?client?instead?to?check?before ?????*?for?this?condition,?since?now?the?socket?is?already?set?in?nonblocking ?????*?mode?and?we?can?send?an?error?for?free?using?the?Kernel?I/O?*/ ????if?(server.maxclients?&&?listLength(server.clients)?>?server.maxclients)?{ ????????char?*err?=?"-ERR?max?number?of?clients?reached\r\n"; ????????/*?That's?a?best?effort?error?message,?don't?check?write?errors?*/ ????????if?(write(c->fd,err,strlen(err))?==?-1)?{ ????????????/*?Nothing?to?do,?Just?to?avoid?the?warning...?*/ ????????} ????????freeClient(c); ????????return; ????} ????server.stat_numconnections++; } |
- 創建一個 redisClient 來處理新連接,每個連接都會創建一個 redisClient 來處理。
- 如果配置了最大并發客戶端,則對現有的連接數進行檢查和處理。
- 最后統計連接數。
綁定有數據可讀時的回調函數
Networking.c:15
| redisClient?*createClient(int?fd)?{ ????redisClient?*c?=?zmalloc(sizeof(redisClient)); ????c->bufpos?=?0; ????anetNonBlock(NULL,fd); ????anetTcpNoDelay(NULL,fd); ????if?(aeCreateFileEvent(server.el,fd,AE_READABLE, ????????readQueryFromClient,?c)?==?AE_ERR) ????{ ????????close(fd); ????????zfree(c); ????????return?NULL; ????} ????selectDb(c,0); ????c->fd?=?fd; ????c->querybuf?=?sdsempty(); c->reqtype?=?0; ... } |
- 創建新連接的socket fd對應的文件讀事件,綁定回調函數readQueryFromClient。
- 如果創建成功,則對 redisClient 進行一系列的初始化,因為 redisClient 是通用的,即不管是什么命令的請求,都是通過創建一個 redisClient 來處理的,所以會有比較多的字段需要初始化。
createClient 函數執行完后返回到調用處acceptCommonHandler函數,然后從acceptCommonHandler函數再返回到acceptTcpHandler函數。
接收請求完畢,準備接收客戶端得數據
到此為止,新連接到來時的回調函數acceptTcpHandler執行完畢,在這個回調函數中創建了一個redisClient來處理這個客戶端接下 來的請求,并綁定了接收的新連接的讀文件事件。當有數據可讀時,網絡i/o輪詢(比如epoll)會有事件觸發,此時綁定的回調函數 readQueryFromClient將會調用來處理客戶端發送過來的數據。
讀取客戶端請求的數據
在綁定有數據可讀時的回調函數中的createClient函數中綁定了一個有數據可讀時的回調函數readQueryFromClient函數,現在看看這個函數的具體內容
Networking.c:874
| void?readQueryFromClient(aeEventLoop?*el,?int?fd,?void?*privdata,?int?mask)?{ ????redisClient?*c?=?(redisClient*)?privdata; ????char?buf[REDIS_IOBUF_LEN]; ????int?nread; ????REDIS_NOTUSED(el); ????REDIS_NOTUSED(mask); ????server.current_client?=?c; ????nread?=?read(fd,?buf,?REDIS_IOBUF_LEN); ????if?(nread?==?-1)?{ ????????if?(errno?==?EAGAIN)?{ ????????????nread?=?0; ????????}?else?{ ????????????redisLog(REDIS_VERBOSE,?"Reading?from?client:?%s",strerror(errno)); ????????????freeClient(c); ????????????return; ????????} ????}?else?if?(nread?==?0)?{ ????????redisLog(REDIS_VERBOSE,?"Client?closed?connection"); ????????freeClient(c); ????????return; ????} ????if?(nread)?{ ????????c->querybuf?=?sdscatlen(c->querybuf,buf,nread); ????????c->lastinteraction?=?time(NULL); ????}?else?{ ????????server.current_client?=?NULL; ????????return; ????} ????if?(sdslen(c->querybuf)?>?server.client_max_querybuf_len)?{ ????????sds?ci?=?getClientInfoString(c),?bytes?=?sdsempty(); ????????bytes?=?sdscatrepr(bytes,c->querybuf,64); ????????redisLog(REDIS_WARNING,"Closing?client?that?reached?max?query?buffer?length:?%s?(qbuf?initial?bytes:?%s)",?ci,?bytes); ????????sdsfree(ci); ????????sdsfree(bytes); ????????freeClient(c); ????????return; ????} ????processInputBuffer(c); ????server.current_client?=?NULL; } |
- 調用系統函數read來讀取客戶端傳送過來的數據,調用read后對讀取過程中被系統中斷的情況(nread == -1 && errno == EAGAIN),客戶端關閉的情況(nread == 0)進行了判斷處理。
- 如果讀取的數據超過限制(1GB)則報錯。
- 讀取完后進入processInputBuffer進行協議解析。
請求協議
從readQueryFromClient函數讀取客戶端傳過來的數據,進入processInputBuffer函數進行協議解析,可以把processInputBuffer函數看作是輸入數據的協議解析器
Networking.c:835
| void?processInputBuffer(redisClient?*c)?{ ????/*?Keep?processing?while?there?is?something?in?the?input?buffer?*/ ????while(sdslen(c->querybuf))?{ ????????/*?Immediately?abort?if?the?client?is?in?the?middle?of?something.?*/ ????????if?(c->flags?&?REDIS_BLOCKED?||?c->flags?&?REDIS_IO_WAIT)?return; ????????/*?REDIS_CLOSE_AFTER_REPLY?closes?the?connection?once?the?reply?is ?????????*?written?to?the?client.?Make?sure?to?not?let?the?reply?grow?after ?????????*?this?flag?has?been?set?(i.e.?don't?process?more?commands).?*/ ????????if?(c->flags?&?REDIS_CLOSE_AFTER_REPLY)?return; ????????/*?Determine?request?type?when?unknown.?*/ ????????if?(!c->reqtype)?{ ????????????if?(c->querybuf[0]?==?'*')?{ ????????????????c->reqtype?=?REDIS_REQ_MULTIBULK; ????????????}?else?{ ????????????????c->reqtype?=?REDIS_REQ_INLINE; ????????????} ????????} ????????if?(c->reqtype?==?REDIS_REQ_INLINE)?{ ????????????if?(processInlineBuffer(c)?!=?REDIS_OK)?break; ????????}?else?if?(c->reqtype?==?REDIS_REQ_MULTIBULK)?{ ????????????if?(processMultibulkBuffer(c)?!=?REDIS_OK)?break; ????????}?else?{ ????????????redisPanic("Unknown?request?type"); ????????} ????????/*?Multibulk?processing?could?see?a?<=?0?length.?*/ ????????if?(c->argc?==?0)?{ ????????????resetClient(c); ????????}?else?{ ????????????/*?Only?reset?the?client?when?the?command?was?executed.?*/ ????????????if?(processCommand(c)?==?REDIS_OK) ????????????????resetClient(c); ????????} ????} } |
- Redis支持兩種協議,一種是inline,一種是multibulk。inline協議是老協議,現在一般只在命令行下的redis客戶端使用,其他情況一般是使用multibulk協議。
- 如 果客戶端傳送的數據的第一個字符時‘*’,那么傳送數據將被當做multibulk協議處理,否則將被當做inline協議處理。Inline協議的具體 解析函數是processInlineBuffer,multibulk協議的具體解析函數是processMultibulkBuffer。
- 當協議解析完畢,即客戶端傳送的數據已經解析出命令字段和參數字段,接下來進行命令處理,命令處理函數是processCommand。
Inline請求協議
Networking.c:679
| int?processInlineBuffer(redisClient?*c)?{ ????... } |
- 根據空格分割客戶端傳送過來的數據,把傳送過來的命令和參數保存在argv數組中,把參數個數保存在argc中,argc的值包括了命令參數本身。即set key value命令,argc的值為3。詳細解析見協議詳解
Multibulk請求協議
Multibulk協議比inline協議復雜,它是二進制安全的,即傳送數據可以包含不安全字符。Inline協議不是二進制安全的,比如,如果 set key value命令中的key或value包含空白字符,那么inline協議解析時將會失敗,因為解析出來的參數個數與命令需要的的參數個數會不一致。
協議格式
| *<number?of?arguments>?CR?LF $<number?of?bytes?of?argument?1>?CR?LF <argument?data>?CR?LF ... $<number?of?bytes?of?argument?N>?CR?LF <argument?data>?CR?LF |
協議舉例
| *3 $3 SET $5 mykey $7 myvalue |
具體解析代碼位于
Networking.c:731
| int?processMultibulkBuffer(redisClient?*c)?{ ... } |
詳細解析見協議詳解
處理命令
當協議解析完畢,則表示客戶端的命令輸入已經全部讀取并已經解析成功,接下來就是執行客戶端命令前的準備和執行客戶端傳送過來的命令
Redis.c:1062
| /*?If?this?function?gets?called?we?already?read?a?whole ?*?command,?argments?are?in?the?client?argv/argc?fields. ?*?processCommand()?execute?the?command?or?prepare?the ?*?server?for?a?bulk?read?from?the?client. ?* ?*?If?1?is?returned?the?client?is?still?alive?and?valid?and ?*?and?other?operations?can?be?performed?by?the?caller.?Otherwise ?*?if?0?is?returned?the?client?was?destroied?(i.e.?after?QUIT).?*/ int?processCommand(redisClient?*c)?{ ... ?/*?Now?lookup?the?command?and?check?ASAP?about?trivial?error?conditions ??*?such?as?wrong?arity,?bad?command?name?and?so?forth.?*/ c->cmd?=?c->lastcmd?=?lookupCommand(c->argv[0]->ptr); ... call(c); ... } |
- lookupCommand先根據客戶端傳送過來的數據查找該命令并找到命令的對應處理函數。
- Call函數調用該命令函數來處理命令,命令與對應處理函數的綁定位于。
Redi.c:72
| struct?redisCommand?*commandTable; struct?redisCommand?readonlyCommandTable[]?=?{ {"get",getCommand,2,0,NULL,1,1,1}, ... } |
回復請求
回復請求位于對應的命令中,以get命令為例
T_string.c:67
| void?getCommand(redisClient?*c)?{ ????getGenericCommand(c); } |
T_string.c:52
| int?getGenericCommand(redisClient?*c)?{ ????robj?*o; ????if?((o?=?lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk))?==?NULL) ????????return?REDIS_OK; ????if?(o->type?!=?REDIS_STRING)?{ ????????addReply(c,shared.wrongtypeerr); ????????return?REDIS_ERR; ????}?else?{ ????????addReplyBulk(c,o); ????????return?REDIS_OK; ????} } |
- getGenericCommand在getset 命令中也會用到。
- lookupKeyReadOrReply是以讀數據為目的查詢key函數,并且如果該key不存在,則在該函數中做不存在的回包處理。
- 如果該key存在,則返回該key對應的數據,addReply函數以及以addReply函數開頭的都是回包函數。
綁定寫數據的回調函數
接下來看看addReply函數里的內容
Networking.c:190
| void?addReply(redisClient?*c,?robj?*obj)?{ ????if?(_installWriteEvent(c)?!=?REDIS_OK)?return; ????... } |
Networking.c:64
| int?_installWriteEvent(redisClient?*c)?{ ????if?(c->fd?<=?0)?return?REDIS_ERR; ????if?(c->bufpos?==?0?&&?listLength(c->reply)?==?0?&& ????????(c->replstate?==?REDIS_REPL_NONE?|| ?????????c->replstate?==?REDIS_REPL_ONLINE)?&& ????????aeCreateFileEvent(server.el,?c->fd,?AE_WRITABLE, ????????sendReplyToClient,?c)?==?AE_ERR)?return?REDIS_ERR; ????return?REDIS_OK; } |
- addReply函數一進來就先調用綁定寫數據的回調函數installWriteEvent。
- installWriteEvent函數中創建了一個文件寫事件和綁定寫事件的回調函數為sendReplyToClient。
準備寫的數據內容
??? addReply函數一進來后就綁定寫數據的回調函數,接下來就是準備寫的數據內容
Networking.c:190
| void?addReply(redisClient?*c,?robj?*obj)?{ ????if?(_installWriteEvent(c)?!=?REDIS_OK)?return; ????redisAssert(!server.vm_enabled?||?obj->storage?==?REDIS_VM_MEMORY); ????/*?This?is?an?important?place?where?we?can?avoid?copy-on-write ?????*?when?there?is?a?saving?child?running,?avoiding?touching?the ?????*?refcount?field?of?the?object?if?it's?not?needed. ?????* ?????*?If?the?encoding?is?RAW?and?there?is?room?in?the?static?buffer ?????*?we'll?be?able?to?send?the?object?to?the?client?without ?????*?messing?with?its?page.?*/ ????if?(obj->encoding?==?REDIS_ENCODING_RAW)?{ ????????if?(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr))?!=?REDIS_OK) ????????????_addReplyObjectToList(c,obj); ????}?else?{ ????????/*?FIXME:?convert?the?long?into?string?and?use?_addReplyToBuffer() ?????????*?instead?of?calling?getDecodedObject.?As?this?place?in?the ?????????*?code?is?too?performance?critical.?*/ ????????obj?=?getDecodedObject(obj); ????????if?(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr))?!=?REDIS_OK) ????????????_addReplyObjectToList(c,obj); ????????decrRefCount(obj); ????} } |
- 先嘗試把要返回的內容添加到發送數據緩沖區中(redisClient->buf),如果該緩沖區的大小已經放不下這次想放進去的數據,或者已經有數據在排隊(redisClient->reply 鏈表不為空),則把數據添加到發送鏈表的尾部。
給客戶端答復數據
在綁定寫數據的回調函數中看到綁定了回調函數sendReplyToClient,現在來看看這個函數的主要內容
Networking.c:566
| void?sendReplyToClient(aeEventLoop?*el,?int?fd,?...)?{ ????... while(c->bufpos?>?0?||?listLength(c->reply))?{ ????... ????if(c->bufpos?>?0){ ????????... ????????????nwritten=write(fd,...,c->bufpos-c->sentlen); ????????????... ????????}?else?{ ????????????o?=?listNodeValue(listFirst(c->reply)); ????????????... ????????????nwritten=write(fd,...,objlen-c->sentlen); ????????????... ????????} ????} } |
- 通過調用系統函數write給客戶端發送數據,如果緩沖區有數據就把緩沖區的數據發送給客戶端,緩沖區的數據發送完了,如果有排隊數據,則繼續發送。
退出
Redis 服務器的退出是通過shutdown命令來退出的,退出前會做一系列的清理工作
Db.c:347
| void?shutdownCommand(redisClient?*c)?{ ????if?(prepareForShutdown()?==?REDIS_OK) ????????exit(0); ????addReplyError(c,"Errors?trying?to?SHUTDOWN.?Check?logs."); } |
總結
框架從啟動,接收請求,讀取客戶端數據,請求協議解析,處理命令,回復請求,退出對redis運行的整個流程做了一個梳理。對整個redis的運作和框架有了一個初步的了解。
轉載于:https://www.cnblogs.com/davidwang456/p/3446349.html
總結
以上是生活随笔為你收集整理的Redis运行流程源码解析--转载的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kill -3 获取threaddump
- 下一篇: windows端口查看及进程查找