Redis源码剖析(十二)--客户端和服务器
?客戶端屬性
客戶端的狀態(tài)保存在結(jié)構(gòu)體 redisClient 中,下面給出redisClient的部分屬性:
typedef struct redisClient{// 套接字描述符int fd; // 客戶端狀態(tài)標(biāo)志int flags;// 輸入緩沖區(qū) sds querybuf;// 命令參數(shù)robj** argv;int argc;// 命令的實(shí)現(xiàn)函數(shù)struct redisCommand *cmd;// 固定輸出緩沖區(qū)char buf[REDIS_REPLY_CHUNK_BYTES];int bufpos;// 可變大小輸出緩沖區(qū)list* reply;// ...... };-
?fd 屬性:客戶端使用的套接字描述符,偽客戶端的fd屬性為 -1,普通客戶端的fd屬性為大于0的整數(shù)。
-
flags 屬性:客戶端狀態(tài)。在主從服務(wù)器復(fù)制時(shí),主服務(wù)器和從服務(wù)器互為客戶端,REDIS_MASTER 標(biāo)志表示客戶端代表的是一個(gè)主服務(wù)器,REDIS_SLAVE 標(biāo)志表示客戶端代表的是一個(gè)從服務(wù)器。
-
querybuf 屬性:保存客戶端發(fā)送的命令請(qǐng)求。
-
argv、argc 屬性:對(duì)客戶端的命令請(qǐng)求分析,得到的命令參數(shù)及命令參數(shù)的個(gè)數(shù)。
-
cmd 屬性:服務(wù)器從客戶端發(fā)送的命令請(qǐng)求中分析得到argv、argc參數(shù)后,會(huì)根據(jù)argv[0]的值,去查找該命令對(duì)應(yīng)的實(shí)現(xiàn)函數(shù),并使cmd指針指向該實(shí)現(xiàn)函數(shù)。
-
buf、bufpos 屬性:bufpos屬性記錄了buf 數(shù)組已使用的字節(jié)數(shù)量。
-
reply 屬性:當(dāng)buf 數(shù)組空間不夠用時(shí),服務(wù)器會(huì)使用 reply 可變大小緩沖區(qū)。
?
?命令請(qǐng)求
命令的執(zhí)行過(guò)程
服務(wù)器在接收到命令后,會(huì)將命令以對(duì)象的形式保存在服務(wù)器client的參數(shù)列表 robj** argv 中,因此服務(wù)器執(zhí)行命令請(qǐng)求時(shí),服務(wù)器已經(jīng)讀入了一套命令參數(shù)保存在參數(shù)列表中。執(zhí)行命令的過(guò)程對(duì)應(yīng)的函數(shù)是processCommand(),部分源碼如下:
int processCommand(redisClient *c) {// 查找命令,并進(jìn)行命令合法性檢查,以及命令參數(shù)個(gè)數(shù)檢查c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);if (!c->cmd) {// 沒(méi)找到指定的命令 flagTransaction(c);addReplyErrorFormat(c,"unknown command '%s'",(char*)c->argv[0]->ptr);return REDIS_OK;} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||(c->argc < -c->cmd->arity)) {// 參數(shù)個(gè)數(shù)錯(cuò)誤 flagTransaction(c);addReplyErrorFormat(c,"wrong number of arguments for '%s' command",c->cmd->name);return REDIS_OK;}// ....../* Exec the command */if (c->flags & REDIS_MULTI &&c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){// 在事務(wù)上下文中// 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外// 其他所有命令都會(huì)被入隊(duì)到事務(wù)隊(duì)列中 queueMultiCommand(c);addReply(c,shared.queued);} else {// 執(zhí)行命令 call(c,REDIS_CALL_FULL);c->woff = server.master_repl_offset;// 處理那些解除了阻塞的鍵if (listLength(server.ready_keys))handleClientsBlockedOnLists();}return REDIS_OK; }?
我們總結(jié)出執(zhí)行命令的大致過(guò)程:
-
查找命令。對(duì)應(yīng)的代碼是:c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr)
-
執(zhí)行命令前的準(zhǔn)備
-
執(zhí)行命令。對(duì)應(yīng)代碼是:call(c,REDIS_CALL_FULL)
?
?
查找命令
lookupCommand 函數(shù)是對(duì) dictFetchValue 函數(shù)的封裝。dictFetchValue 函數(shù)會(huì)從 server.commands 字典中查找 name 命令。這個(gè)保存命令表的字典,鍵是命令的名稱,值是命令表的地址。服務(wù)器初始化時(shí)會(huì)創(chuàng)建一張命令表。命令表部分代碼如下:
?
?
?
?執(zhí)行命令
執(zhí)行命令調(diào)用了call(c, CMD_CALL_FULL)函數(shù),該函數(shù)是執(zhí)行命令的核心。該函數(shù)其實(shí)是對(duì) c->cmd->proc(c) 的封裝, proc 指向命令的實(shí)現(xiàn)函數(shù)。
void call(redisClient *c, int flags) {// start 記錄命令開(kāi)始執(zhí)行的時(shí)間long long dirty, start, duration;// 記錄命令開(kāi)始執(zhí)行前的 FLAGint client_old_flags = c->flags; // 如果可以的話,將命令發(fā)送到 MONITORif (listLength(server.monitors) &&!server.loading &&!(c->cmd->flags & REDIS_CMD_SKIP_MONITOR)){replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);}/* Call the command. */c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);redisOpArrayInit(&server.also_propagate);// 保留舊 dirty 計(jì)數(shù)器值dirty = server.dirty;// 計(jì)算命令開(kāi)始執(zhí)行的時(shí)間start = ustime();// 執(zhí)行實(shí)現(xiàn)函數(shù)c->cmd->proc(c);// 計(jì)算命令執(zhí)行耗費(fèi)的時(shí)間duration = ustime()-start;// 計(jì)算命令執(zhí)行之后的 dirty 值dirty = server.dirty-dirty; // 不將從 Lua 中發(fā)出的命令放入 SLOWLOG ,也不進(jìn)行統(tǒng)計(jì)if (server.loading && c->flags & REDIS_LUA_CLIENT)flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS); // 如果調(diào)用者是 Lua ,那么根據(jù)命令 FLAG 和客戶端 FLAG// 打開(kāi)傳播(propagate)標(biāo)志if (c->flags & REDIS_LUA_CLIENT && server.lua_caller) {if (c->flags & REDIS_FORCE_REPL)server.lua_caller->flags |= REDIS_FORCE_REPL;if (c->flags & REDIS_FORCE_AOF)server.lua_caller->flags |= REDIS_FORCE_AOF;} // 如果有需要,將命令放到 SLOWLOG 里面if (flags & REDIS_CALL_SLOWLOG && c->cmd->proc != execCommand)slowlogPushEntryIfNeeded(c->argv,c->argc,duration);// 更新命令的統(tǒng)計(jì)信息if (flags & REDIS_CALL_STATS) {c->cmd->microseconds += duration;c->cmd->calls++;} // 將命令復(fù)制到 AOF 和 slave 節(jié)點(diǎn)if (flags & REDIS_CALL_PROPAGATE) {int flags = REDIS_PROPAGATE_NONE;// 強(qiáng)制 REPL 傳播if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;// 強(qiáng)制 AOF 傳播if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;// 如果數(shù)據(jù)庫(kù)有被修改,那么啟用 REPL 和 AOF 傳播if (dirty)flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);if (flags != REDIS_PROPAGATE_NONE)propagate(c->cmd,c->db->id,c->argv,c->argc,flags);} // 將客戶端的 FLAG 恢復(fù)到命令執(zhí)行之前// 因?yàn)?call 可能會(huì)遞歸執(zhí)行c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL); // 傳播額外的命令if (server.also_propagate.numops) {int j;redisOp *rop;for (j = 0; j < server.also_propagate.numops; j++) {rop = &server.also_propagate.ops[j];propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);}redisOpArrayFree(&server.also_propagate);}server.stat_numcommands++; }?
?執(zhí)行命令 c->cmd->proc(c) 就相當(dāng)于執(zhí)行了命令實(shí)現(xiàn)的函數(shù),然后會(huì)在執(zhí)行完成后,由這些函數(shù)產(chǎn)生相應(yīng)的命令回復(fù),根據(jù)回復(fù)的大小,會(huì)將回復(fù)保存在輸出緩沖區(qū) buf 或可變輸出緩沖區(qū)鏈表 reply 中。
?
?
?maxmemory策略
Redis 服務(wù)器對(duì)內(nèi)存使用會(huì)有一個(gè)server.maxmemory的限制,如果超過(guò)這個(gè)限制,就要通過(guò)刪除一些鍵空間來(lái)釋放一些內(nèi)存,具體函數(shù)對(duì)應(yīng)freeMemoryIfNeeded()。釋放內(nèi)存時(shí),可以指定不同的策略。策略保存在maxmemory_policy中,可以指定以下的幾個(gè)值:
#define MAXMEMORY_VOLATILE_LRU 0 #define MAXMEMORY_VOLATILE_TTL 1 #define MAXMEMORY_VOLATILE_RANDOM 2 #define MAXMEMORY_ALLKEYS_LRU 3 #define MAXMEMORY_ALLKEYS_RANDOM 4 #define MAXMEMORY_NO_EVICTION 5可以看出主要分為三種:
- LRU:優(yōu)先刪除最近最少使用的鍵。
- TTL:優(yōu)先刪除生存時(shí)間最短的鍵。
- RANDOM:隨機(jī)刪除。
而ALLKEYS和VOLATILE的不同之處就是要確定是從數(shù)據(jù)庫(kù)的鍵值對(duì)字典還是過(guò)期鍵字典中刪除。
int freeMemoryIfNeeded(void) {size_t mem_used, mem_tofree, mem_freed;int slaves = listLength(server.slaves);// 計(jì)算出 Redis 目前占用的內(nèi)存總數(shù),但有兩個(gè)方面的內(nèi)存不會(huì)計(jì)算在內(nèi):// 1)從服務(wù)器的輸出緩沖區(qū)的內(nèi)存// 2)AOF 緩沖區(qū)的內(nèi)存mem_used = zmalloc_used_memory();if (slaves) {listIter li;listNode *ln;listRewind(server.slaves,&li);while((ln = listNext(&li))) {redisClient *slave = listNodeValue(ln);unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);if (obuf_bytes > mem_used)mem_used = 0;elsemem_used -= obuf_bytes;}}if (server.aof_state != REDIS_AOF_OFF) {mem_used -= sdslen(server.aof_buf);mem_used -= aofRewriteBufferSize();} // 如果目前使用的內(nèi)存大小比設(shè)置的 maxmemory 要小,那么無(wú)須執(zhí)行進(jìn)一步操作if (mem_used <= server.maxmemory) return REDIS_OK;// 如果占用內(nèi)存比 maxmemory 要大,但是 maxmemory 策略為不淘汰,那么直接返回if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)return REDIS_ERR; /* We need to free memory, but policy forbids. */// 計(jì)算需要釋放多少字節(jié)的內(nèi)存mem_tofree = mem_used - server.maxmemory;// 初始化已釋放內(nèi)存的字節(jié)數(shù)為 0mem_freed = 0;// 根據(jù) maxmemory 策略,// 遍歷字典,釋放內(nèi)存并記錄被釋放內(nèi)存的字節(jié)數(shù)while (mem_freed < mem_tofree) {int j, k, keys_freed = 0;// 遍歷所有字典for (j = 0; j < server.dbnum; j++) {long bestval = 0; /* just to prevent warning */sds bestkey = NULL;dictEntry *de;redisDb *db = server.db+j;dict *dict;if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM){// 如果策略是 allkeys-lru 或者 allkeys-random // 那么淘汰的目標(biāo)為所有數(shù)據(jù)庫(kù)鍵dict = server.db[j].dict;} else {// 如果策略是 volatile-lru 、 volatile-random 或者 volatile-ttl // 那么淘汰的目標(biāo)為帶過(guò)期時(shí)間的數(shù)據(jù)庫(kù)鍵dict = server.db[j].expires;}// 跳過(guò)空字典if (dictSize(dict) == 0) continue;/* volatile-random and allkeys-random policy */// 如果使用的是隨機(jī)策略,那么從目標(biāo)字典中隨機(jī)選出鍵if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM ||server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_RANDOM){de = dictGetRandomKey(dict);bestkey = dictGetKey(de);} // 如果使用的是 LRU 策略,// 那么從一集 sample 鍵中選出 IDLE 時(shí)間最長(zhǎng)的那個(gè)鍵else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU){struct evictionPoolEntry *pool = db->eviction_pool;while(bestkey == NULL) {evictionPoolPopulate(dict, db->dict, db->eviction_pool);/* Go backward from best to worst element to evict. */for (k = REDIS_EVICTION_POOL_SIZE-1; k >= 0; k--) {if (pool[k].key == NULL) continue;de = dictFind(dict,pool[k].key);/* Remove the entry from the pool. */sdsfree(pool[k].key);/* Shift all elements on its right to left. */memmove(pool+k,pool+k+1,sizeof(pool[0])*(REDIS_EVICTION_POOL_SIZE-k-1));/* Clear the element on the right which is empty* since we shifted one position to the left. */pool[REDIS_EVICTION_POOL_SIZE-1].key = NULL;pool[REDIS_EVICTION_POOL_SIZE-1].idle = 0;/* If the key exists, is our pick. Otherwise it is* a ghost and we need to try the next element. */if (de) {bestkey = dictGetKey(de);break;} else {/* Ghost... */continue;}}}}// 策略為 volatile-ttl ,從一集 sample 鍵中選出過(guò)期時(shí)間距離當(dāng)前時(shí)間最接近的鍵else if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_TTL) {for (k = 0; k < server.maxmemory_samples; k++) {sds thiskey;long thisval;de = dictGetRandomKey(dict);thiskey = dictGetKey(de);thisval = (long) dictGetVal(de);/* Expire sooner (minor expire unix timestamp) is better* candidate for deletion */if (bestkey == NULL || thisval < bestval) {bestkey = thiskey;bestval = thisval;}}} // 刪除被選中的鍵if (bestkey) {long long delta;robj *keyobj = createStringObject(bestkey,sdslen(bestkey));propagateExpire(db,keyobj);// 計(jì)算刪除鍵所釋放的內(nèi)存數(shù)量delta = (long long) zmalloc_used_memory();dbDelete(db,keyobj);delta -= (long long) zmalloc_used_memory();mem_freed += delta;// 對(duì)淘汰鍵的計(jì)數(shù)器增一server.stat_evictedkeys++;notifyKeyspaceEvent(REDIS_NOTIFY_EVICTED, "evicted",keyobj, db->id);decrRefCount(keyobj);keys_freed++; if (slaves) flushSlavesOutputBuffers();}}if (!keys_freed) return REDIS_ERR; /* nothing to free... */}return REDIS_OK; }?
轉(zhuǎn)載于:https://www.cnblogs.com/lizhimin123/p/10215368.html
總結(jié)
以上是生活随笔為你收集整理的Redis源码剖析(十二)--客户端和服务器的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Java HashMap的put操作(J
- 下一篇: UOJ42/BZOJ3817 清华集训2