Redis源码解析:14Redis服务器与客户端间的交互
? ? ? ? ?Redis服務(wù)器是典型的一對(duì)多服務(wù)器程序,通過(guò)使用由IO多路復(fù)用技術(shù)實(shí)現(xiàn)的文件事件處理器,Redis服務(wù)器使用單線程單進(jìn)程的方式來(lái)處理命令請(qǐng)求,并與多個(gè)客戶端進(jìn)行網(wǎng)絡(luò)通信。
?
? ? ? ? ?Redis客戶端與服務(wù)器之間通過(guò)TCP協(xié)議進(jìn)行通信。TCP協(xié)議是一種流式協(xié)議,數(shù)據(jù)以字節(jié)流的形式進(jìn)行傳遞,沒有固有的"報(bào)文"或"報(bào)文邊界"的概念,如果需要設(shè)置邊界,需要應(yīng)用層自行處理。
? ? ? ? ?因此,Redis客戶端與服務(wù)器之間的交互數(shù)據(jù),都按照Redis自定義的統(tǒng)一請(qǐng)求協(xié)議的格式進(jìn)行編碼。使用這種協(xié)議,每條命令之間都有了“邊界”。
? ? ? ? ?舉個(gè)例子,如果客戶端要向服務(wù)器發(fā)送以下命令請(qǐng)求:
? ? ? ? ?SET msg “helloworld”
? ? ? ? ?那么客戶端實(shí)際發(fā)送的數(shù)據(jù)是:
? ? ? ? ?*3\r\n$3\r\nSET\r\n$3\r\nmsg\r\n$11\r\nhelloworld\r\n
???????? 服務(wù)器收到這樣的數(shù)據(jù)時(shí),就可以通過(guò)解析”*3”得到該命令有3個(gè)參數(shù),第一個(gè)參數(shù)長(zhǎng)度為3,值為”SET”,也就是要執(zhí)行的命令;第二個(gè)參數(shù)長(zhǎng)度為3,值為”msg”;第三個(gè)參數(shù)長(zhǎng)度為11,值為”hello world”。
? ? ? ? ?這樣就得到了一條完整的命令,解析并處理該命令后,接著解析下一條命令。
?
一:客戶端結(jié)構(gòu)redisClient
? ? ? ? ?對(duì)于每個(gè)與服務(wù)器進(jìn)行連接的客戶端,服務(wù)器都為這些客戶端建立了相應(yīng)的redisClient結(jié)構(gòu),該結(jié)構(gòu)體定義在redis.h中,它的定義如下(有省略):
typedef struct redisClient {uint64_t id; /* Client incremental unique ID. */int fd;redisDb *db;int dictid;robj *name; /* As set by CLIENT SETNAME */sds querybuf;size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size */int argc;robj **argv;struct redisCommand *cmd, *lastcmd;int reqtype;int multibulklen; /* number of multi bulk arguments left to read */long bulklen; /* length of bulk argument in multi bulk request */list *reply;unsigned long reply_bytes; /* Tot bytes of objects in reply list */...int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */int authenticated; /* when requirepass is non-NULL */.../* Response buffer */int bufpos;char buf[REDIS_REPLY_CHUNK_BYTES]; } redisClient;? ? ? ? ?這個(gè)結(jié)構(gòu)保存了客戶端當(dāng)前的狀態(tài)信息,以及執(zhí)行相關(guān)功能時(shí)需要用到的數(shù)據(jù)結(jié)構(gòu),比如:客戶端的socket描述符(fd),指向客戶端正在使用的數(shù)據(jù)庫(kù)的指針(db),客戶端的名字(name),客戶端的標(biāo)志值(flags),客戶端輸入緩存(querybuf),客戶端當(dāng)前要執(zhí)行的命令參數(shù)(argv),以及參數(shù)個(gè)數(shù)(argc),以及客戶端的輸出緩存(buf和reply)等。
? ? ? ? ?這些屬性的具體意義會(huì)在下面的章節(jié)中介紹。
?
二:初始化(創(chuàng)建監(jiān)聽端口、注冊(cè)建連事件)
? ? ? ? ?在Redis服務(wù)器的初始化函數(shù)initserver中,調(diào)用aeCreateEventLoop創(chuàng)建了Redis服務(wù)器中唯一的事件循環(huán)結(jié)構(gòu)(aeEventLoop):server.e1。server.e1是全局性的,Redis服務(wù)器中所有的事件都注冊(cè)在該結(jié)構(gòu)上。
?
???????? 默認(rèn)情況下,Redis服務(wù)器監(jiān)聽本地所有網(wǎng)絡(luò)接口上的連接(0.0.0.0)。可以在配置文件中,通過(guò)"bind"選項(xiàng)設(shè)置監(jiān)聽的地址,其后跟一個(gè)或多個(gè)空格分隔的IP地址,比如:
bind 192.168.1.100 ?10.0.0.1
???????? Redis將這些地址保存在server.bindaddr中,IP地址總數(shù)為server.bindaddr_count。
?
? ? ? ? ?在initserver函數(shù)中,調(diào)用listenToPort,根據(jù)這些監(jiān)聽地址,調(diào)用socket、bind和listen創(chuàng)建監(jiān)聽socket描述符。
/* Open the TCP listening socket for the user commands. */ if (server.port != 0 &&listenToPort(server.port, server.ipfd, &server.ipfd_count) == REDIS_ERR)exit(1);? ? ? ? ?創(chuàng)建好的監(jiān)聽描述符保存在描述符數(shù)組server.ipfd中,最后創(chuàng)建的監(jiān)聽描述符的總數(shù)為server.ipfd_count。server.ipfd數(shù)組為固定大小:REDIS_BINDADDR_MAX(16),因此最多只支持16個(gè)監(jiān)聽地址。
?
???????? 然后,針對(duì)每個(gè)監(jiān)聽描述符,調(diào)用aeCreateFileEvent,注冊(cè)其上的可讀事件,回調(diào)函數(shù)為acceptTcpHandler:
for (j = 0; j < server.ipfd_count; j++) {if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL) == AE_ERR){redisPanic("Unrecoverable error creating server.ipfd file event.");} }???????? Redis服務(wù)器收到客戶端的TCP連接后,就會(huì)調(diào)用acceptTcpHandler函數(shù)進(jìn)行處理。acceptTcpHandler函數(shù)的代碼如下:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL;char cip[REDIS_IP_STR_LEN];REDIS_NOTUSED(el);REDIS_NOTUSED(mask);REDIS_NOTUSED(privdata);while(max--) {cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);if (cfd == ANET_ERR) {if (errno != EWOULDBLOCK)redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);return;}redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);acceptCommonHandler(cfd,0);} }? ? ? ? ?該函數(shù)每次最多處理MAX_ACCEPTS_PER_CALL(1000)個(gè)連接,如果還有其他連接,則等到下次調(diào)用acceptTcpHandler時(shí)再處理,這樣做的原因是為了保證該函數(shù)的執(zhí)行時(shí)間不會(huì)過(guò)長(zhǎng),以免影響后續(xù)事件的處理。
???????? 針對(duì)每個(gè)連接,調(diào)用anetTcpAccept函數(shù)進(jìn)行accept,并將客戶端地址記錄到cip以及cport中;
???????? 建鏈后的socket描述符為cfd,根據(jù)該值調(diào)用acceptCommonHandler,該函數(shù)中,調(diào)用createClient創(chuàng)建一個(gè)redisClient結(jié)構(gòu),并注冊(cè)socket描述符上的可讀事件,回調(diào)函數(shù)為readQueryFromClient。最后將該redisClient結(jié)構(gòu)存儲(chǔ)到全局客戶端列表server.clients中;
if (aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) == AE_ERR) {close(fd);zfree(c);return NULL; }?
三:接收客戶端請(qǐng)求,解析并處理請(qǐng)求
1:接收數(shù)據(jù)
???????? Redis服務(wù)器收到客戶端的請(qǐng)求數(shù)據(jù)后,就會(huì)觸發(fā)socket描述符上的可讀事件,從而調(diào)用其回調(diào)函數(shù)readQueryFromClient。??????
? ? ? ? ?在readQueryFromClient中,調(diào)用read讀取客戶端的請(qǐng)求,并緩存到redisClient結(jié)構(gòu)中的輸入緩存querybuf中,該輸入緩存會(huì)根據(jù)接收到的數(shù)據(jù)長(zhǎng)度動(dòng)態(tài)擴(kuò)容。接下來(lái)對(duì)收到的請(qǐng)求數(shù)據(jù)進(jìn)行解析,并執(zhí)行相應(yīng)的命令處理函數(shù)。
? ? ? ? ?readQueryFromClient函數(shù)代碼如下:
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {redisClient *c = (redisClient*) privdata;int nread, readlen;size_t qblen;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);server.current_client = c;readlen = REDIS_IOBUF_LEN;/* If this is a multi bulk request, and we are processing a bulk reply* that is large enough, try to maximize the probability that the query* buffer contains exactly the SDS string representing the object, even* at the risk of requiring more read(2) calls. This way the function* processMultiBulkBuffer() can avoid copying buffers to create the* Redis Object representing the argument. */if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1&& c->bulklen >= REDIS_MBULK_BIG_ARG){int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);if (remaining < readlen) readlen = remaining;}qblen = sdslen(c->querybuf);if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);nread = read(fd, c->querybuf+qblen, readlen);if (nread == -1) {if (errno == EAGAIN) {nread = 0;} else {redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));freeClient(c);return;}} else if (nread == 0) {redisLog(REDIS_VERBOSE, "Client closed connection");freeClient(c);return;}if (nread) {sdsIncrLen(c->querybuf,nread);c->lastinteraction = server.unixtime;if (c->flags & REDIS_MASTER) c->reploff += nread;server.stat_net_input_bytes += nread;} else {server.current_client = NULL;return;}if (sdslen(c->querybuf) > server.client_max_querybuf_len) {sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();bytes = sdscatrepr(bytes,c->querybuf,64);redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);sdsfree(ci);sdsfree(bytes);freeClient(c);return;}processInputBuffer(c);server.current_client = NULL; }???????? 該函數(shù)中,首先設(shè)置每次read讀取的最大字節(jié)數(shù)readlen為REDIS_IOBUF_LEN(16k)。然后得到輸入緩存c->querybuf當(dāng)前長(zhǎng)度qblen,也就是已接收到的客戶端請(qǐng)求數(shù)據(jù)的長(zhǎng)度。根據(jù)qblen更新c->querybuf_peak的值,該屬性記錄了輸入緩存c->querybuf的最大長(zhǎng)度。
?
? ? ? ? ?接下來(lái)為c->querybuf擴(kuò)容,使其能容納readlen個(gè)字節(jié);然后就調(diào)用read,最多讀取readlen個(gè)字節(jié)。讀取的內(nèi)容追加到c->querybuf尾部。
???????? 如果read返回值nread為-1,若errno等于EAGAIN,說(shuō)明暫無(wú)數(shù)據(jù),置nread為0;否則記錄錯(cuò)誤信息到日志,釋放客戶端結(jié)構(gòu)redisClient,并關(guān)閉鏈接,然后直接返回;
???????? 如果read返回0,說(shuō)明客戶端關(guān)閉連接,此時(shí)記錄信息到日志,釋放客戶端結(jié)構(gòu)redisClient,并關(guān)閉鏈接,然后直接返回;
???????? read返回非0,說(shuō)明讀取到了數(shù)據(jù)。判斷當(dāng)前輸入緩存c->querybuf的長(zhǎng)度是否大于閾值server.client_max_querybuf_len(1G)。若超過(guò)閾值,則記錄當(dāng)前客戶端信息,以及c->querybuf中前64個(gè)字節(jié)到日志中,然后釋放客戶端結(jié)構(gòu)redisClient,并關(guān)閉鏈接,然后直接返回;
?
? ? ? ? ?最后,調(diào)用processInputBuffer解析收到的數(shù)據(jù),并在讀取到完整的一條命令請(qǐng)求之后,執(zhí)行相應(yīng)的命令處理函數(shù)。
?
2:解析處理客戶端命令
? ? ? ? ?Redis服務(wù)器收到客戶端的請(qǐng)求數(shù)據(jù)后,調(diào)用processInputBuffer函數(shù)解析輸入緩存redisClient->querybuf中的數(shù)據(jù)。在得到一條完整的命令請(qǐng)求數(shù)據(jù)后,就調(diào)用processCommand函數(shù)處理執(zhí)行相應(yīng)的命令。
? ? ? ? ?processInputBuffer的代碼如下:
void processInputBuffer(redisClient *c) {/* Keep processing while there is something in the input buffer */while(sdslen(c->querybuf)) {/* Return if clients are paused. */if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return;/* Immediately abort if the client is in the middle of something. */if (c->flags & REDIS_BLOCKED) return;/* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is* written to the client. Make sure to not let the reply grow after* this flag has been set (i.e. don't process more commands). */if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;/* Determine request type when unknown. */if (!c->reqtype) {if (c->querybuf[0] == '*') {c->reqtype = REDIS_REQ_MULTIBULK;} else {c->reqtype = REDIS_REQ_INLINE;}}if (c->reqtype == REDIS_REQ_INLINE) {if (processInlineBuffer(c) != REDIS_OK) break;} else if (c->reqtype == REDIS_REQ_MULTIBULK) {if (processMultibulkBuffer(c) != REDIS_OK) break;} else {redisPanic("Unknown request type");}/* Multibulk processing could see a <= 0 length. */if (c->argc == 0) {resetClient(c);} else {/* Only reset the client when the command was executed. */if (processCommand(c) == REDIS_OK)resetClient(c);}} }? ? ? ? ?該函數(shù)中,只要c->querybuf不為空,就一直循環(huán)處理。在該循環(huán)中:
?
???????? 首先,根據(jù)客戶端的當(dāng)前狀態(tài)標(biāo)志c->flags,判斷是否需要繼續(xù)解析處理,比如:
? ? ? ? ?如果當(dāng)前客戶端不是SLAVE節(jié)點(diǎn),并且客戶端處于阻塞狀態(tài),則直接返回;
? ? ? ? ?如果客戶端標(biāo)志c->flags包含REDIS_BLOCKED,則直接返回;
???????? 如果客戶端標(biāo)志c->flags包含REDIS_CLOSE_AFTER_REPLY,則直接返回。該標(biāo)志表明發(fā)生了異常,服務(wù)器不再需要處理客戶端請(qǐng)求,在回復(fù)客戶端錯(cuò)誤消息后直接關(guān)閉鏈接。
?
???????? 接下來(lái),如果c->reqtype為0,說(shuō)明剛要開始處理一條請(qǐng)求(第一次處理c->querybuf中的數(shù)據(jù),或剛處理完一條完整的命令請(qǐng)求)。如果數(shù)據(jù)c->querybuf的首字節(jié)為'*',說(shuō)明該請(qǐng)求會(huì)跨越多行(包含多個(gè)”\r\n”),則置c->reqtype為EDIS_REQ_MULTIBULK;否則說(shuō)明該請(qǐng)求為單行請(qǐng)求,置c->reqtype為REDIS_REQ_INLINE;
???????? 如果c->reqtype為REDIS_REQ_INLINE,則調(diào)用processInlineBuffer解析單行請(qǐng)求,如果c->reqtype為EDIS_REQ_MULTIBULK,則調(diào)用processMultibulkBuffer解析多行請(qǐng)求。這兩個(gè)函數(shù)的返回值如果不是REDIS_OK,則說(shuō)明尚未收到一條完整的請(qǐng)求,需要退出循環(huán),函數(shù)返回后接著讀取剩余的數(shù)據(jù);
???????? 如果這兩個(gè)函數(shù)返回為REDIS_OK,則說(shuō)明已經(jīng)收到并解析好了一條完整的請(qǐng)求,命令的參數(shù)已經(jīng)分解到數(shù)組c->argv中,c->argc表示參數(shù)個(gè)數(shù)。
???????? 如果c->argc為0,則無(wú)需處理,直接調(diào)用resetClient重置客戶端狀態(tài),也就是釋放c->argv數(shù)組中的元素,置c->argc、c->reqtype和c->multibulklen為0,置c->bulklen為-1等。然后接著處理c->querybuf中剩下的內(nèi)容;
???????? 如果c->argc非0,則調(diào)用processCommand處理該命令,調(diào)用相應(yīng)的命令處理函數(shù)。處理成功后,調(diào)用resetClient重置客戶端狀態(tài)。然后接著處理c->querybuf中剩下的內(nèi)容。
?
???????? 函數(shù)processInlineBuffer和processMultibulkBuffer分別解析客戶端的單行請(qǐng)求和多行請(qǐng)求。這兩個(gè)函數(shù)返回REDIS_OK,說(shuō)明已經(jīng)收到并解析好了一條完整的請(qǐng)求,命令的參數(shù)已經(jīng)分解到數(shù)組c->argv中,c->argc表示參數(shù)個(gè)數(shù)。
???????? 如果這倆函數(shù)返回REDIS_ERR,要么說(shuō)明收到的客戶端命令請(qǐng)求尚不完整,這其實(shí)不是錯(cuò)誤,這種情況下函數(shù)返回后,服務(wù)器需要繼續(xù)接收客戶端請(qǐng)求;要么說(shuō)明客戶端發(fā)來(lái)的請(qǐng)求不符合統(tǒng)一請(qǐng)求協(xié)議的格式要求,這種情況下調(diào)用setProtocolError刪除c->querybuf相應(yīng)的內(nèi)容,并且將客戶端的標(biāo)志位c->flags增加REDIS_CLOSE_AFTER_REPLY標(biāo)記,從而在回復(fù)客戶端錯(cuò)誤信息后直接關(guān)閉連接。
???????? processMultibulkBuffer函數(shù)要比processInlineBuffer稍微復(fù)雜一些,直接看一下processMultibulkBuffer的實(shí)現(xiàn):
int processMultibulkBuffer(redisClient *c) {char *newline = NULL;int pos = 0, ok;long long ll;if (c->multibulklen == 0) {/* The client should have been reset */redisAssertWithInfo(c,NULL,c->argc == 0);/* Multi bulk length cannot be read without a \r\n */newline = strchr(c->querybuf,'\r');if (newline == NULL) {if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {addReplyError(c,"Protocol error: too big mbulk count string");setProtocolError(c,0);}return REDIS_ERR;}/* Buffer should also contain \n */if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))return REDIS_ERR;/* We know for sure there is a whole line since newline != NULL,* so go ahead and find out the multi bulk length. */redisAssertWithInfo(c,NULL,c->querybuf[0] == '*');ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);if (!ok || ll > 1024*1024) {addReplyError(c,"Protocol error: invalid multibulk length");setProtocolError(c,pos);return REDIS_ERR;}pos = (newline-c->querybuf)+2;if (ll <= 0) {sdsrange(c->querybuf,pos,-1);return REDIS_OK;}c->multibulklen = ll;/* Setup argv array on client structure */if (c->argv) zfree(c->argv);c->argv = zmalloc(sizeof(robj*)*c->multibulklen);}redisAssertWithInfo(c,NULL,c->multibulklen > 0);while(c->multibulklen) {/* Read bulk length if unknown */if (c->bulklen == -1) {newline = strchr(c->querybuf+pos,'\r');if (newline == NULL) {if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {addReplyError(c,"Protocol error: too big bulk count string");setProtocolError(c,0);return REDIS_ERR;}break;}/* Buffer should also contain \n */if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))break;if (c->querybuf[pos] != '$') {addReplyErrorFormat(c,"Protocol error: expected '$', got '%c'",c->querybuf[pos]);setProtocolError(c,pos);return REDIS_ERR;}ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);if (!ok || ll < 0 || ll > 512*1024*1024) {addReplyError(c,"Protocol error: invalid bulk length");setProtocolError(c,pos);return REDIS_ERR;}pos += newline-(c->querybuf+pos)+2;if (ll >= REDIS_MBULK_BIG_ARG) {size_t qblen;/* If we are going to read a large object from network* try to make it likely that it will start at c->querybuf* boundary so that we can optimize object creation* avoiding a large copy of data. */sdsrange(c->querybuf,pos,-1);pos = 0;qblen = sdslen(c->querybuf);/* Hint the sds library about the amount of bytes this string is* going to contain. */if (qblen < (size_t)ll+2)c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen);}c->bulklen = ll;}/* Read bulk argument */if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) {/* Not enough data (+2 == trailing \r\n) */break;} else {/* Optimization: if the buffer contains JUST our bulk element* instead of creating a new object by *copying* the sds we* just use the current sds string. */if (pos == 0 &&c->bulklen >= REDIS_MBULK_BIG_ARG &&(signed) sdslen(c->querybuf) == c->bulklen+2){c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf);sdsIncrLen(c->querybuf,-2); /* remove CRLF */c->querybuf = sdsempty();/* Assume that if we saw a fat argument we'll see another one* likely... */c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2);pos = 0;} else {c->argv[c->argc++] =createStringObject(c->querybuf+pos,c->bulklen);pos += c->bulklen+2;}c->bulklen = -1;c->multibulklen--;}}/* Trim to pos */if (pos) sdsrange(c->querybuf,pos,-1);/* We're done when c->multibulk == 0 */if (c->multibulklen == 0) return REDIS_OK;/* Still not read to process the command */return REDIS_ERR; }? ? ? ? ?redisClient結(jié)構(gòu)中的multibulklen屬性,記錄正在解析的一條完整的命令請(qǐng)求中,尚未處理的命令參數(shù)的個(gè)數(shù)。如果c->multibulklen為0,說(shuō)明當(dāng)前要解析的是命令請(qǐng)求的開頭,格式為"*<n>\r\n"。
???????? 這種情況下,首先找到c->querybuf中的第一個(gè)'\r'的位置newline,如果c->querybuf中找不到'\r',說(shuō)明收到的客戶端的請(qǐng)求尚不完整,直接返回REDIS_ERR。并且如果c->querybuf目前長(zhǎng)度超過(guò)64k的話,則反饋給客戶端錯(cuò)誤信息:"Protocol error: too big mbulk count string",然后調(diào)用setProtocolError為客戶端標(biāo)志位c->flags增加REDIS_CLOSE_AFTER_REPLY標(biāo)記;直接返回REDIS_ERR;
???????? 然后如果(newline-(c->querybuf))大于((signed)sdslen(c->querybuf)-2),說(shuō)明收到的客戶端請(qǐng)求尚不完整(缺少'\n'),直接返回REDIS_ERR;
?
???????? 接下來(lái)就開始解析該行,該行內(nèi)容的正確格式是"*<n>\r\n",其中<n>是一個(gè)表明接下來(lái)包含多少個(gè)字符串的整數(shù)。調(diào)用string2ll解析得到其中的整數(shù)ll,如果解析失敗,或者ll大于1M,則反饋給客戶端信息"Protocol error: invalid multibulk length",然后,調(diào)用setProtocolError為客戶端標(biāo)志位c->flags增加REDIS_CLOSE_AFTER_REPLY標(biāo)記,返回REDIS_ERR;
???????? 然后使pos記為c->querybuf下一行首地址的索引;
???????? 如果ll小于等于0,則直接清除c->querybuf中剛剛解析的行,直接返回REDIS_OK;然后將ll賦值到c->multibulklen中。然后根據(jù)c->multibulklen的值申請(qǐng)數(shù)組c->argv的空間,其數(shù)組長(zhǎng)度就是c->multibulklen。
?
???????? 得到c->multibulklen的值后,接下來(lái)開始依次處理命令請(qǐng)求中的每一個(gè)字符串行:
???????? redisClient結(jié)構(gòu)中的bulklen屬性,記錄接下來(lái)要解析的命令請(qǐng)求行中,包含的字符串的長(zhǎng)度。如果c->bulklen為-1,說(shuō)明當(dāng)前要解析的,是字符串的長(zhǎng)度行,格式為"$<n>\r\n"。
???????? 這種情況下,處理過(guò)程與c->multibulklen為0時(shí)的解析過(guò)程類似,不在贅述。解析完后,下一行中包含的字符串長(zhǎng)度存儲(chǔ)在ll中,ll最大為512M,否則反饋給客戶端錯(cuò)誤信息:"Protocol error: invalid bulk length",并且調(diào)用setProtocolError為客戶端標(biāo)志位c->flags增加REDIS_CLOSE_AFTER_REPLY標(biāo)記,返回REDIS_ERR;
???????? 然后使pos記為c->querybuf下一行首地址的索引;
???????? 如果字符串長(zhǎng)度ll大于等于32k,為了后續(xù)創(chuàng)建字符串對(duì)象時(shí)避免復(fù)制大塊內(nèi)存,直接使用c->querybuf創(chuàng)建字符串對(duì)象。因此直接將c->querybuf中pos之前的內(nèi)容刪除,置pos為0,并且必要情況下為c->querybuf擴(kuò)容。最后將ll賦值到c->bulklen中;
?
???????? 接下來(lái)開始解析c->querybuf中的字符串行,格式為"xxxx\r\n";
???????? 如果(sdslen(c->querybuf)-pos)小于((unsigned)(c->bulklen+2)),說(shuō)明收到的客戶端請(qǐng)求中,字符串行尚不完整,直接退出循環(huán),返回REDIS_ERR;
???????? 否則,如果同時(shí)滿足以下三個(gè)條件:
pos == 0;
c->bulklen >= REDIS_MBULK_BIG_ARG;
(signed) sdslen(c->querybuf) ==c->bulklen+2);
???????? 說(shuō)明,當(dāng)前c->querybuf中,不多不少正好包含的是一個(gè)大于32k的大字符串行,這種情況下,為了避免拷貝大塊內(nèi)存,直接使用c->querybuf創(chuàng)建字符串對(duì)象,并存儲(chǔ)到c->argv中;然后重新創(chuàng)建c->querybuf,并為其擴(kuò)容為c->bulklen+2,這樣可以容納在后續(xù)遇到的大字符串(Assume that if we saw a fat argument we'll see another one likely...);
???????? 如果不滿足上面的條件,則創(chuàng)建字符串對(duì)象,將c->querybuf+pos的內(nèi)容復(fù)制到該字符串對(duì)象中;
?
???????? 處理完一個(gè)完整的字符串行后,重置c->bulklen為-1,并且c->multibulklen--;然后循環(huán)處理下一個(gè)字符串行;
????????
???????? 跳出循環(huán)后,首先刪除已解析的內(nèi)容,如果c->multibulklen為0,說(shuō)明已經(jīng)完整的收到并解析了客戶端的一個(gè)跨多行的命令請(qǐng)求,返回REDIS_OK,表示可以開始處理該命令了;否則,返回REDIS_ERR,繼續(xù)接收客戶端請(qǐng)求;
?
? ? ? ? ?processInlineBuffer函數(shù)的實(shí)現(xiàn)要簡(jiǎn)單很多,不再贅述。
?
四:回復(fù)客戶端
???????? 服務(wù)器執(zhí)行完相應(yīng)的命令處理函數(shù)之后,就會(huì)調(diào)用addReply類的函數(shù)將要回復(fù)給客戶端的信息寫入客戶端輸出緩存。這些函數(shù)包括addReply,addReplySds,addReplyError,addReplyStatus等。
???????? 這些函數(shù)首先都會(huì)調(diào)用prepareClientToWrite函數(shù),注冊(cè)socket描述符上的可寫事件,然后將回復(fù)信息寫入到客戶端輸出緩存中。
? ? ? ? ?redisClient結(jié)構(gòu)中有兩種客戶端輸出緩存,一種是靜態(tài)大小的數(shù)組(buf),一種是動(dòng)態(tài)大小的列表(reply)。追加回復(fù)信息時(shí),首先嘗試將信息追加到數(shù)組buf中,如果其空間不足,則將信息在追加到reply中。比如addReplyString的代碼如下:
void addReplyString(redisClient *c, char *s, size_t len) {if (prepareClientToWrite(c) != REDIS_OK) return;if (_addReplyToBuffer(c,s,len) != REDIS_OK)_addReplyStringToList(c,s,len); }???????? 調(diào)用函數(shù)_addReplyToBuffer向c->buf中添加數(shù)據(jù),如果該函數(shù)返回REDIS_ERR,說(shuō)明添加失敗,則調(diào)用_addReplyStringToList,將數(shù)據(jù)添加到c->reply中。其他addReply類的函數(shù)也是類似的處理,不再贅述。
?
? ? ? ? ?每次向客戶端輸出緩存追加新數(shù)據(jù)之前,都要調(diào)用函數(shù)prepareClientToWrite。???? 因Redis中不同類型的客戶端需要不同的處理:有些客戶端(比如加載AOF文件時(shí)的偽客戶端)無(wú)需追加新數(shù)據(jù),這種情況下,該函數(shù)直接返回REDIS_ERR;有些客戶端(比如Lua客戶端)需要追加新數(shù)據(jù),但無(wú)需注冊(cè)socket描述符上的可寫事件;有些客戶端(普通客戶端)需要追加數(shù)據(jù),并注冊(cè)socket描述符上的可寫事件;
? ? ? ? ?因此,調(diào)用prepareClientToWrite函數(shù)返回REDIS_ERR,則表示無(wú)需向輸出緩存追加新數(shù)據(jù),只有返回REDIS_OK時(shí)才需要向輸出緩存中追加新數(shù)據(jù)。
?
? ? ? ? ?prepareClientToWrite函數(shù)的代碼如下:
int prepareClientToWrite(redisClient *c) {/* If it's the Lua client we always return ok without installing any* handler since there is no socket at all. */if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;/* Masters don't receive replies, unless REDIS_MASTER_FORCE_REPLY flag* is set. */if ((c->flags & REDIS_MASTER) &&!(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;if (c->fd <= 0) return REDIS_ERR; /* Fake client for AOF loading. *//* Only install the handler if not already installed and, in case of* slaves, if the client can actually receive writes. */if (c->bufpos == 0 && listLength(c->reply) == 0 &&(c->replstate == REDIS_REPL_NONE ||(c->replstate == REDIS_REPL_ONLINE && !c->repl_put_online_on_ack))){/* Try to install the write handler. */if (aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c) == AE_ERR){freeClientAsync(c);return REDIS_ERR;}}/* Authorize the caller to queue in the output buffer of this client. */return REDIS_OK; }? ? ? ? ?如果當(dāng)前客戶端是Lua客戶端,直接返回REDIS_OK,而無(wú)需注冊(cè)socket描述符上的可寫事件,因?yàn)楦緵]有socket描述符;
???????? 如果客戶端為Master節(jié)點(diǎn),除非設(shè)置REDIS_MASTER_FORCE_REPLY標(biāo)志,否則這種客戶端不接收回復(fù),因此直接返回REDIS_ERR;
???????? 如果客戶端的socket描述符小于等于0,說(shuō)明是加載AOF文件時(shí)的偽客戶端,直接返回REDIS_ERR;?
???????? 如果是普通客戶端,或者是在從節(jié)點(diǎn)需要接收數(shù)據(jù)時(shí),如果此前從未注冊(cè)過(guò)socket上的可寫事件,則調(diào)用aeCreateFileEvent注冊(cè)socket描述符c->fd上的可寫事件,事件回調(diào)函數(shù)為sendReplyToClient;最后直接返回REDIS_OK;???????
?
???????? 當(dāng)TCP輸出緩沖區(qū)有一定剩余空間時(shí),socket描述符上的可寫事件就會(huì)觸發(fā),從而調(diào)用事件回調(diào)函數(shù)sendReplyToClient。該函數(shù)調(diào)用write,將輸出緩存中的數(shù)據(jù)發(fā)送出去。函數(shù)的代碼如下:
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {redisClient *c = privdata;int nwritten = 0, totwritten = 0, objlen;size_t objmem;robj *o;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);while(c->bufpos > 0 || listLength(c->reply)) {if (c->bufpos > 0) {nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);if (nwritten <= 0) break;c->sentlen += nwritten;totwritten += nwritten;/* If the buffer was sent, set bufpos to zero to continue with* the remainder of the reply. */if (c->sentlen == c->bufpos) {c->bufpos = 0;c->sentlen = 0;}} else {o = listNodeValue(listFirst(c->reply));objlen = sdslen(o->ptr);objmem = getStringObjectSdsUsedMemory(o);if (objlen == 0) {listDelNode(c->reply,listFirst(c->reply));c->reply_bytes -= objmem;continue;}nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);if (nwritten <= 0) break;c->sentlen += nwritten;totwritten += nwritten;/* If we fully sent the object on head go to the next one */if (c->sentlen == objlen) {listDelNode(c->reply,listFirst(c->reply));c->sentlen = 0;c->reply_bytes -= objmem;}}/* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT* bytes, in a single threaded server it's a good idea to serve* other clients as well, even if a very large request comes from* super fast link that is always able to accept data (in real world* scenario think about 'KEYS *' against the loopback interface).** However if we are over the maxmemory limit we ignore that and* just deliver as much data as it is possible to deliver. */server.stat_net_output_bytes += totwritten;if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&(server.maxmemory == 0 ||zmalloc_used_memory() < server.maxmemory)) break;}if (nwritten == -1) {if (errno == EAGAIN) {nwritten = 0;} else {redisLog(REDIS_VERBOSE,"Error writing to client: %s", strerror(errno));freeClient(c);return;}}if (totwritten > 0) {/* For clients representing masters we don't count sending data* as an interaction, since we always send REPLCONF ACK commands* that take some time to just fill the socket output buffer.* We just rely on data / pings received for timeout detection. */if (!(c->flags & REDIS_MASTER)) c->lastinteraction = server.unixtime;}if (c->bufpos == 0 && listLength(c->reply) == 0) {c->sentlen = 0;aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);/* Close connection after entire reply has been sent. */if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c);} }? ? ? ? ?當(dāng)追加要發(fā)送的數(shù)據(jù)到輸出緩存時(shí),首先嘗試將其添加到c->buf中;如果c->buf空間不足,則追加到c->reply中。如果使用的是c->buf,則c->bufpos表示其中緩存的數(shù)據(jù)總量,c->sentlen表示其中已發(fā)送的數(shù)據(jù)量;如果使用的是c->reply,則c->reply_bytes表示列表c->reply中,保存的所有sds字符串占用的內(nèi)存總字節(jié)數(shù),c->sentlen表示列表中的正在發(fā)送數(shù)據(jù)的單塊緩存元素中,已發(fā)送的數(shù)據(jù)量。
???????? 函數(shù)中的totwritten表示本函數(shù)當(dāng)前已發(fā)送的數(shù)據(jù)量;
?
???????? 在函數(shù)中,如果c->bufpos大于0,或者listLength(c->reply)大于0,說(shuō)明緩存中有數(shù)據(jù)要發(fā)送,進(jìn)入循環(huán),調(diào)用write發(fā)送數(shù)據(jù),write返回值nwritten小于等于0時(shí),要么是TCP輸出緩存無(wú)空間,要么是發(fā)生了錯(cuò)誤,因此直接跳出循環(huán)。
? ? ? ? ?在循環(huán)中:如果c->bufpos大于0,說(shuō)明使用的緩存是c->buf。因此調(diào)用write,將c->buf中的剩余數(shù)據(jù)(c->bufpos- c->sentlen個(gè)字節(jié))發(fā)送出去。如果write返回值nwritten小于等于0時(shí),直接跳出循環(huán);否則,將nwritten增加到c->sentlen和totwritten中,繼續(xù)下一輪循環(huán)寫入。如果c->buf中的數(shù)據(jù)已全部發(fā)送出去,則重置c->bufpos和c->sentlen為0,表示清空緩存c->buf;
?
???????? 否則的話,表示使用的緩存是列表c->reply。得到其頭結(jié)點(diǎn)中保存的字符串對(duì)象o,然后得到該字符串的長(zhǎng)度objlen,以及該字符串占用的內(nèi)存objmem。接著調(diào)用write,將o->ptr中未發(fā)送的數(shù)據(jù)(objlen - c->sentlen個(gè)字節(jié))全部發(fā)送出去。如果write返回值nwritten小于等于0時(shí),直接跳出循環(huán);否則,將nwritten增加到c->sentlen和totwritten中,繼續(xù)下一輪循環(huán)寫入。如果c->sentlen等于objlen,說(shuō)明當(dāng)前節(jié)點(diǎn)的數(shù)據(jù)已經(jīng)全部發(fā)送完成,直接刪除該節(jié)點(diǎn),并重置c->sentlen為0,并從c->reply_bytes中減去objmem;
???????? 接下來(lái),將本次已發(fā)送的字節(jié)數(shù)totwritten加到server.stat_net_output_bytes中。
?
???????? 因本函數(shù)是可寫事件的回調(diào)函數(shù),為了避免該函數(shù)執(zhí)行時(shí)間過(guò)長(zhǎng),而影響其他事件的處理。因此這里限制該函數(shù)最大發(fā)送的字節(jié)數(shù)為REDIS_MAX_WRITE_PER_EVENT(64k),一旦已發(fā)送的字節(jié)數(shù)totwritten超過(guò)了該值,并且在沒設(shè)置最大內(nèi)存限制,或者尚未超過(guò)設(shè)置的最大內(nèi)存限制的條件下,直接退出循環(huán),停止發(fā)送。
?
???????? 退出循環(huán)后,如果write出錯(cuò),并且errno為EAGAIN,說(shuō)明TCP輸出緩存無(wú)空間了,這種情況不是錯(cuò)誤,直接置nwritten = 0即可;否則需要記錄錯(cuò)誤日志,并且調(diào)用freeClient釋放redisClient,關(guān)閉與客戶端的連接;
???????? 最后,如果緩存中所有的數(shù)據(jù)都已經(jīng)發(fā)送完成,則置c->sentlen為0,并且刪除socket描述符c->fd上的可寫事件;如果客戶端標(biāo)志c->flags中設(shè)置了REDIS_CLOSE_AFTER_REPLY,則調(diào)用freeClient釋放redisClient,關(guān)閉與客戶端的連接。
?
? ? ? ? ?其他相關(guān)代碼,可以參考:
https://github.com/gqtc/redis-3.0.5/blob/master/redis-3.0.5/src/networking.c
轉(zhuǎn)載于:https://www.cnblogs.com/gqtcgq/p/7247057.html
總結(jié)
以上是生活随笔為你收集整理的Redis源码解析:14Redis服务器与客户端间的交互的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 删除已有的 HTML 元素
- 下一篇: Timus 1049 Brave Bal