[转载] 深入剖析 redis 主从复制
轉(zhuǎn)載自http://www.cnblogs.com/daoluanxiaozi/p/3724299.html
主從概述
redis 支持 master-slave(主從)模式,redis server 可以設(shè)置為另一個(gè) redis server 的主機(jī)(從機(jī)),從機(jī)定期從主機(jī)拿數(shù)據(jù)。特殊的,一個(gè) 從機(jī)同樣可以設(shè)置為一個(gè) redis server 的主機(jī),這樣一來(lái) master-slave 的分布看起來(lái)就是一個(gè)有向無(wú)環(huán)圖 DAG,如此形成 redis server 集群,無(wú)論是主機(jī)還是從機(jī)都是 redis server,都可以提供服務(wù))。
在配置后,主機(jī)可負(fù)責(zé)讀寫服務(wù),從機(jī)只負(fù)責(zé)讀。redis 提高這種配置方式,為的是讓其支持?jǐn)?shù)據(jù)的弱一致性,即最終一致性。在業(yè)務(wù)中,選擇強(qiáng)一致性還是若已執(zhí)行,應(yīng)該取決于具體的業(yè)務(wù)需求,像微博,完全可以使用弱一致性模型;像淘寶,可以選用強(qiáng)一致性模型。
redis 主從復(fù)制的實(shí)現(xiàn)主要在 replication.c 中。
這篇文章涉及較多的代碼,但我已經(jīng)盡量刪繁就簡(jiǎn),達(dá)到能說(shuō)明問(wèn)題本質(zhì)。為了保留代碼的原生性并讓讀者能夠閱讀原生代碼的注釋,剖析 redis 的幾篇文章都沒(méi)有刪除代碼中的英文注釋,并已加注釋。
積壓空間
在《深入剖析 redis AOF 持久化策略》中,介紹了更新緩存的概念,舉一個(gè)例子:客戶端發(fā)來(lái)命令:set name Jhon,這一數(shù)據(jù)更新被記錄為:*3\r\n$3\r\nSET\r\n$4\r\nname\r\n$3\r\nJhon\r\n,并存儲(chǔ)在更新緩存中。
同樣,在主從連接中,也有更新緩存的概念。只是兩者的用途不一樣,前者被寫入本地,后者被寫入從機(jī),這里我們把它成為積壓空間。
更新緩存存儲(chǔ)在 server.repl_backlog,redis 將其作為一個(gè)環(huán)形空間來(lái)處理,這樣做節(jié)省了空間,避免內(nèi)存再分配的情況。
| struct?redisServer { ????/* Replication (master) */ ????// 最近一次使用(訪問(wèn))的數(shù)據(jù)集 ????int?slaveseldb;???????????????? /* Last SELECTed DB in replication output */ ????// 全局的數(shù)據(jù)同步偏移量 ????long?long?master_repl_offset;?? /* Global replication offset */ ????// 主從連接心跳頻率 ????int?repl_ping_slave_period;???? /* Master pings the slave every N seconds */ ????// 積壓空間指針 ????char?*repl_backlog;???????????? /* Replication backlog for partial syncs */ ????// 積壓空間大小 ????long?long?repl_backlog_size;??? /* Backlog circular buffer size */ ????// 積壓空間中寫入的新數(shù)據(jù)的大小 ????long?long?repl_backlog_histlen; /* Backlog actual data length */ ????// 下一次向積壓空間寫入數(shù)據(jù)的起始位置 ????long?long?repl_backlog_idx;???? /* Backlog circular buffer current offset */ ????// 積壓數(shù)據(jù)的起始位置,是一個(gè)宏觀值 ????long?long?repl_backlog_off;???? /* Replication offset of first byte in the ???????????????????????????????????????backlog buffer. */ ????// 積壓空間有效時(shí)間 ????time_t?repl_backlog_time_limit; /* Time without slaves after the backlog ???????????????????????????????????????gets released. */ } |
積壓空間中的數(shù)據(jù)變更記錄是什么時(shí)候被寫入的?在執(zhí)行一個(gè) redis 命令的時(shí)候,如果存在數(shù)據(jù)的修改(寫),那么就會(huì)把變更記錄傳播。redis 源碼中是這么實(shí)現(xiàn)的:call()->propagate()->replicationFeedSlaves()
注釋:命令真正執(zhí)行的地方在 call() 中,call() 如果發(fā)現(xiàn)數(shù)據(jù)被修改(dirty),則傳播 propagrate(),replicationFeedSlaves() 將修改記錄寫入積壓空間和所有已連接的從機(jī)。
這里可能會(huì)有疑問(wèn):為什么把數(shù)據(jù)添加入積壓空間,又把數(shù)據(jù)分發(fā)給所有的從機(jī)?為什么不僅僅將數(shù)據(jù)分發(fā)給所有從機(jī)呢?
因?yàn)橛幸恍臋C(jī)會(huì)因特殊情況(???)與主機(jī)斷開(kāi)連接,注意從機(jī)斷開(kāi)前有暫存主機(jī)的狀態(tài)信息,因此這些斷開(kāi)的從機(jī)就沒(méi)有及時(shí)收到更新的數(shù)據(jù)。redis 為了讓斷開(kāi)的從機(jī)在下次連接后能夠獲取更新數(shù)據(jù),將更新數(shù)據(jù)加入了積壓空間。從 replicationFeedSlaves() 實(shí)現(xiàn)來(lái)看,在線的 slave 能馬上收到數(shù)據(jù)更新記錄;因某些原因暫時(shí)斷開(kāi)連接的 slave,需要從積壓空間中找回?cái)嚅_(kāi)期間的數(shù)據(jù)更新記錄。如果斷開(kāi)的時(shí)間足夠長(zhǎng),master 會(huì)拒絕 slave 的部分同步請(qǐng)求,從而 slave 只能進(jìn)行全同步。
下面是源碼注釋:
| // call() 函數(shù)是執(zhí)行命令的核心函數(shù),真正執(zhí)行命令的地方 /* Call() is the core of Redis execution of a command */ void?call(redisClient *c, int?flags) { ????...... ????/* Call the command. */ ????c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL); ????redisOpArrayInit(&server.also_propagate); ????// 臟數(shù)據(jù)標(biāo)記,數(shù)據(jù)是否被修改 ????dirty = server.dirty; ????// 執(zhí)行命令對(duì)應(yīng)的函數(shù) ????c->cmd->proc(c); ????dirty = server.dirty-dirty; ????duration = ustime()-start; ????...... ????// 將客戶端請(qǐng)求的數(shù)據(jù)修改記錄傳播給 AOF 和從機(jī) ????/* Propagate the command into the AOF and replication link */ ????if?(flags & REDIS_CALL_PROPAGATE) { ????????int?flags = REDIS_PROPAGATE_NONE; ????????// 強(qiáng)制主從復(fù)制 ????????if?(c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL; ????????// 強(qiáng)制 AOF 持久化 ????????if?(c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF; ????????// 數(shù)據(jù)被修改 ????????if?(dirty) ????????????flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF); ????????// 傳播數(shù)據(jù)修改記錄 ????????if?(flags != REDIS_PROPAGATE_NONE) ????????????propagate(c->cmd,c->db->id,c->argv,c->argc,flags); ????} ????...... } // 向 AOF 和從機(jī)發(fā)布數(shù)據(jù)更新 /* Propagate the specified command (in the context of the specified database id) ?* to AOF and Slaves. ?* ?* flags are an xor between: ?* + REDIS_PROPAGATE_NONE (no propagation of command at all) ?* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled) ?* + REDIS_PROPAGATE_REPL (propagate into the replication link) ?*/ void?propagate(struct?redisCommand *cmd, int?dbid, robj **argv, int?argc, ???????????????int?flags) { ????// AOF 策略需要打開(kāi),且設(shè)置 AOF 傳播標(biāo)記,將更新發(fā)布給本地文件 ????if?(server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF) ????????feedAppendOnlyFile(cmd,dbid,argv,argc); ????// 設(shè)置了從機(jī)傳播標(biāo)記,將更新發(fā)布給從機(jī) ????if?(flags & REDIS_PROPAGATE_REPL) ????????replicationFeedSlaves(server.slaves,dbid,argv,argc); } // 向積壓空間和從機(jī)發(fā)送數(shù)據(jù) void?replicationFeedSlaves(list *slaves, int?dictid, robj **argv, int?argc) { ????listNode *ln; ????listIter li; ????int?j, len; ????char?llstr[REDIS_LONGSTR_SIZE]; ????// 沒(méi)有積壓數(shù)據(jù)且沒(méi)有從機(jī),直接退出 ????/* If there aren't slaves, and there is no backlog buffer to populate, ?????* we can return ASAP. */ ????if?(server.repl_backlog == NULL && listLength(slaves) == 0) return; ????/* We can't have slaves attached and no backlog. */ ????redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); ????/* Send SELECT command to every slave if needed. */ ????if?(server.slaveseldb != dictid) { ????????robj *selectcmd; ????????// 小于等于 10 的可以用共享對(duì)象 ????????/* For a few DBs we have pre-computed SELECT command. */ ????????if?(dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) { ????????????selectcmd = shared.select[dictid]; ????????} else?{ ????????// 不能使用共享對(duì)象,生成 SELECT 命令對(duì)應(yīng)的 redis 對(duì)象 ????????????int?dictid_len; ????????????dictid_len = ll2string(llstr,sizeof(llstr),dictid); ????????????selectcmd = createObject(REDIS_STRING, ????????????????sdscatprintf(sdsempty(), ????????????????"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", ????????????????dictid_len, llstr)); ????????} ????????// 這里可能會(huì)有疑問(wèn):為什么把數(shù)據(jù)添加入積壓空間,又把數(shù)據(jù)分發(fā)給所有的從機(jī)? ????????// 為什么不僅僅將數(shù)據(jù)分發(fā)給所有從機(jī)呢? ????????// 因?yàn)橛幸恍臋C(jī)會(huì)因特殊情況(???)與主機(jī)斷開(kāi)連接,注意從機(jī)斷開(kāi)前有暫存 ????????// 主機(jī)的狀態(tài)信息,因此這些斷開(kāi)的從機(jī)就沒(méi)有及時(shí)收到更新的數(shù)據(jù)。redis 為了讓 ????????// 斷開(kāi)的從機(jī)在下次連接后能夠獲取更新數(shù)據(jù),將更新數(shù)據(jù)加入了積壓空間。 ????????// 將 SELECT 命令對(duì)應(yīng)的 redis 對(duì)象數(shù)據(jù)添加到積壓空間 ????????/* Add the SELECT command into the backlog. */ ????????if?(server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); ????????// 將數(shù)據(jù)分發(fā)所有的從機(jī) ????????/* Send it to slaves. */ ????????listRewind(slaves,&li); ????????while((ln = listNext(&li))) { ????????????redisClient *slave = ln->value; ????????????addReply(slave,selectcmd); ????????} ????????// 銷毀對(duì)象 ????????if?(dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS) ????????????decrRefCount(selectcmd); ????} ????// 更新最近一次使用(訪問(wèn))的數(shù)據(jù)集 ????server.slaveseldb = dictid; ????// 將命令寫入積壓空間 ????/* Write the command to the replication backlog if any. */ ????if?(server.repl_backlog) { ????????char?aux[REDIS_LONGSTR_SIZE+3]; ????????// 命令個(gè)數(shù) ????????/* Add the multi bulk reply length. */ ????????aux[0] = '*'; ????????len = ll2string(aux+1,sizeof(aux)-1,argc); ????????aux[len+1] = '\r'; ????????aux[len+2] = '\n'; ????????feedReplicationBacklog(aux,len+3); ????????// 逐個(gè)命令寫入 ????????for?(j = 0; j < argc; j++) { ????????????long?objlen = stringObjectLen(argv[j]); ????????????/* We need to feed the buffer with the object as a bulk reply ?????????????* not just as a plain string, so create the $..CRLF payload len ?????????????* ad add the final CRLF */ ????????????aux[0] = '$'; ????????????len = ll2string(aux+1,sizeof(aux)-1,objlen); ????????????aux[len+1] = '\r'; ????????????aux[len+2] = '\n'; ????????????/* 每個(gè)命令格式如下: ????????????$3 ????????????*3 ????????????SET ????????????*4 ????????????NAME ????????????*4 ????????????Jhon*/ ????????????// 命令長(zhǎng)度 ????????????feedReplicationBacklog(aux,len+3); ????????????// 命令 ????????????feedReplicationBacklogWithObject(argv[j]); ????????????// 換行 ????????????feedReplicationBacklog(aux+len+1,2); ????????} ????} ????// 立即給每一個(gè)從機(jī)發(fā)送命令 ????/* Write the command to every slave. */ ????listRewind(slaves,&li); ????while((ln = listNext(&li))) { ????????redisClient *slave = ln->value; ????????// 如果從機(jī)要求全同步,則不對(duì)此從機(jī)發(fā)送數(shù)據(jù) ????????/* Don't feed slaves that are still waiting for BGSAVE to start */ ????????if?(slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue; ????????/* Feed slaves that are waiting for the initial SYNC (so these commands ?????????* are queued in the output buffer until the initial SYNC completes), ?????????* or are already in sync with the master. */ ????????// 向從機(jī)命令的長(zhǎng)度 ????????/* Add the multi bulk length. */ ????????addReplyMultiBulkLen(slave,argc); ????????// 向從機(jī)發(fā)送命令 ????????/* Finally any additional argument that was not stored inside the ?????????* static buffer if any (from j to argc). */ ????????for?(j = 0; j < argc; j++) ????????????addReplyBulk(slave,argv[j]); ????} } |
主從數(shù)據(jù)同步機(jī)制概述
redis 主從同步有兩種方式(或者所兩個(gè)階段):全同步和部分同步。
主從剛剛連接的時(shí)候,進(jìn)行全同步;全同步結(jié)束后,進(jìn)行部分同步。當(dāng)然,如果有需要,slave 在任何時(shí)候都可以發(fā)起全同步。redis 策略是,無(wú)論如何,首先會(huì)嘗試進(jìn)行部分同步,如不成功,要求從機(jī)進(jìn)行全同步,并啟動(dòng) BGSAVE……BGSAVE 結(jié)束后,傳輸 RDB 文件;如果成功,允許從機(jī)進(jìn)行部分同步,并傳輸積壓空間中的數(shù)據(jù)。
下面這幅圖,總結(jié)了主從同步的機(jī)制:
如需設(shè)置 slave,master 需要向 slave 發(fā)送 SLAVEOF hostname port,從機(jī)接收到后會(huì)自動(dòng)連接主機(jī),注冊(cè)相應(yīng)讀寫事件(syncWithMaster())。
| // 修改主機(jī) void?slaveofCommand(redisClient *c) { ????if?(!strcasecmp(c->argv[1]->ptr,"no") && ????????!strcasecmp(c->argv[2]->ptr,"one")) { ????????// slaveof no one 斷開(kāi)主機(jī)連接 ????????if?(server.masterhost) { ????????????replicationUnsetMaster(); ????????????redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)"); ????????} ????} else?{ ????????long?port; ????????if?((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK)) ????????????return; ????????// 可能已經(jīng)連接需要連接的主機(jī) ????????/* Check if we are already attached to the specified slave */ ????????if?(server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) ????????????&& server.masterport == port) { ????????????redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed."); ????????????addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); ????????????return; ????????} ????????// 斷開(kāi)之前連接主機(jī)的連接,連接新的。 replicationSetMaster() 并不會(huì)真正連接主機(jī),只是修改 struct server 中關(guān)于主機(jī)的設(shè)置。真正的主機(jī)連接在 replicationCron() 中完成 ????????/* There was no previous master or the user specified a different one, ?????????* we can continue. */ ????????replicationSetMaster(c->argv[1]->ptr, port); ????????redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)", ????????????server.masterhost, server.masterport); ????} ????addReply(c,shared.ok); } // 設(shè)置新主機(jī) /* Set replication to the specified master address and port. */ void?replicationSetMaster(char?*ip, int?port) { ????sdsfree(server.masterhost); ????server.masterhost = sdsdup(ip); ????server.masterport = port; ????// 斷開(kāi)之前主機(jī)的連接 ????if?(server.master) freeClient(server.master); ????disconnectSlaves(); /* Force our slaves to resync with us as well. */ ????// 取消緩存主機(jī) ????replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ ????// 釋放積壓空間 ????freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ ????// cancelReplicationHandshake() 嘗試斷開(kāi)數(shù)據(jù)傳輸和主機(jī)連接 ????cancelReplicationHandshake(); ????server.repl_state = REDIS_REPL_CONNECT; ????server.master_repl_offset = 0; } // 管理主從連接的定時(shí)程序定時(shí)程序,每秒執(zhí)行一次 // 在 serverCorn() 中調(diào)用 /* --------------------------- REPLICATION CRON? ----------------------------- */ /* Replication cron funciton, called 1 time per second. */ void?replicationCron(void) { ????...... ????// 如果需要( REDIS_REPL_CONNECT),嘗試連接主機(jī),真正連接主機(jī)的操作在這里 ????/* Check if we should connect to a MASTER */ ????if?(server.repl_state == REDIS_REPL_CONNECT) { ????????redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d", ????????????server.masterhost, server.masterport); ????????if?(connectWithMaster() == REDIS_OK) { ????????????redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started"); ????????} ????} ????...... } |
全同步
接著自動(dòng)發(fā)起 PSYNC 請(qǐng)求 master 進(jìn)行全同步。無(wú)論如何,redis 首先會(huì)嘗試部分同步,如果失敗才嘗試全同步。而剛剛建立連接的 master-slave 需要全同步。
從機(jī)連接主機(jī)后,會(huì)主動(dòng)發(fā)起 PSYNC 命令,從機(jī)會(huì)提供 master_runid 和 offset,主機(jī)驗(yàn)證 master_runid 和 offset 是否有效?master_runid 相當(dāng)于主機(jī)身份驗(yàn)證碼,用來(lái)驗(yàn)證從機(jī)上一次連接的主機(jī),offset 是全局積壓空間數(shù)據(jù)的偏移量。
驗(yàn)證未通過(guò)則,則進(jìn)行全同步:主機(jī)返回 +FULLRESYNC master_runid offset(從機(jī)接收并記錄 master_runid 和 offset,并準(zhǔn)備接收 RDB 文件)接著啟動(dòng) BGSAVE 生成 RDB 文件,BGSAVE 結(jié)束后,向從機(jī)傳輸,從而完成全同步。
| // 連接主機(jī) connectWithMaster() 的時(shí)候,會(huì)被注冊(cè)為回調(diào)函數(shù) void?syncWithMaster(aeEventLoop *el, int?fd, void?*privdata, int?mask) { ????char?tmpfile[256], *err; ????int?dfd, maxtries = 5; ????int?sockerr = 0, psync_result; ????socklen_t errlen = sizeof(sockerr); ????...... ????// 這里嘗試向主機(jī)請(qǐng)求部分同步,主機(jī)會(huì)回復(fù)以拒絕或接受請(qǐng)求。如果拒絕部分同步,會(huì)返回 +FULLRESYNC master_runid offset ????// 從機(jī)接收后準(zhǔn)備進(jìn)行全同步??? psync_result = slaveTryPartialResynchronization(fd); ????if?(psync_result == PSYNC_CONTINUE) { ????????redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); ????????return; ????} ????// 執(zhí)行全同步 ????/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC ?????* and the server.repl_master_runid and repl_master_initial_offset are ?????* already populated. */ ????// 未知結(jié)果,進(jìn)行出錯(cuò)處理 ????if?(psync_result == PSYNC_NOT_SUPPORTED) { ????????redisLog(REDIS_NOTICE,"Retrying with SYNC..."); ????????if?(syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { ????????????redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s", ????????????????strerror(errno)); ????????????goto?error; ????????} ????} ????// 為什么要嘗試 5次??? ????/* Prepare a suitable temp file for bulk transfer */ ????while(maxtries--) { ????????snprintf(tmpfile,256, ????????????"temp-%d.%ld.rdb",(int)server.unixtime,(long?int)getpid()); ????????dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); ????????if?(dfd != -1) break; ????????sleep(1); ????} ????if?(dfd == -1) { ????????redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno)); ????????goto?error; ????} ????// 注冊(cè)讀事件,回調(diào)函數(shù) readSyncBulkPayload(), 準(zhǔn)備讀 RDB 文件 ????/* Setup the non blocking download of the bulk file. */ ????if?(aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) ????????????== AE_ERR) ????{ ????????redisLog(REDIS_WARNING, ????????????"Can't create readable event for SYNC: %s (fd=%d)", ????????????strerror(errno),fd); ????????goto?error; ????} ????// 設(shè)置傳輸 RDB 文件數(shù)據(jù)的選項(xiàng) ????// 狀態(tài) ????server.repl_state = REDIS_REPL_TRANSFER; ????// RDB 文件大小 ????server.repl_transfer_size = -1; ????// 已經(jīng)傳輸?shù)拇笮?????server.repl_transfer_read = 0; ????// 上一次同步的偏移,為的是定時(shí)寫入磁盤 ????server.repl_transfer_last_fsync_off = 0; ????// 本地 RDB 文件套接字 ????server.repl_transfer_fd = dfd; ????// 上一次同步 IO 時(shí)間 ????server.repl_transfer_lastio = server.unixtime; ????// 臨時(shí)文件名 ????server.repl_transfer_tmpfile = zstrdup(tmpfile); ????return; error: ????close(fd); ????server.repl_transfer_s = -1; ????server.repl_state = REDIS_REPL_CONNECT; ????return; } |
全同步請(qǐng)求的數(shù)據(jù)是 RDB 數(shù)據(jù)文件和積壓空間中的數(shù)據(jù)。關(guān)于 RDB 數(shù)據(jù)文件,請(qǐng)參看《深入剖析 redis RDB 持久化策略》。如果沒(méi)有后臺(tái)持久化 BGSAVE 進(jìn)程,那么 BGSVAE 會(huì)被觸發(fā),否則所有請(qǐng)求全同步的 slave 都會(huì)被標(biāo)記為等待 BGSAVE 結(jié)束。BGSAVE 結(jié)束后,master 會(huì)馬上向所有的從機(jī)發(fā)送 RDB 文件。
| // 主機(jī) SYNC 和 PSYNC 命令處理函數(shù),會(huì)嘗試進(jìn)行部分同步和全同步 /* SYNC ad PSYNC command implemenation. */ void?syncCommand(redisClient *c) { ????...... ????// 主機(jī)嘗試部分同步,失敗的話向從機(jī)發(fā)送 +FULLRESYNC master_runid offset,接著啟動(dòng) BGSAVE ????// 執(zhí)行全同步: ????/* Full resynchronization. */ ????server.stat_sync_full++; ????/* Here we need to check if there is a background saving operation ?????* in progress, or if it is required to start one */ ????if?(server.rdb_child_pid != -1) { ????/*? 存在 BGSAVE 后臺(tái)進(jìn)程。 ????????1.如果 master 現(xiàn)有所連接的所有從機(jī) slaves 當(dāng)中有存在 REDIS_REPL_WAIT_BGSAVE_END 的從機(jī),那么將從機(jī) c 設(shè)置為 REDIS_REPL_WAIT_BGSAVE_END; ????????2.否則,設(shè)置為 REDIS_REPL_WAIT_BGSAVE_START*/ ????????/* Ok a background save is in progress. Let's check if it is a good ?????????* one for replication, i.e. if there is another slave that is ?????????* registering differences since the server forked to save */ ????????redisClient *slave; ????????listNode *ln; ????????listIter li; ????????// 檢測(cè)是否已經(jīng)有從機(jī)申請(qǐng)全同步 ????????listRewind(server.slaves,&li); ????????while((ln = listNext(&li))) { ????????????slave = ln->value; ????????????if?(slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break; ????????} ????????if?(ln) { ????????// 存在狀態(tài)為 REDIS_REPL_WAIT_BGSAVE_END 的從機(jī) slave, ????????// 就將此從機(jī) c 狀態(tài)設(shè)置為 REDIS_REPL_WAIT_BGSAVE_END, ????????// 從而在 BGSAVE 進(jìn)程結(jié)束后,可以發(fā)送 RDB 文件, ????????// 同時(shí)將從機(jī) slave 中的更新復(fù)制到此從機(jī) c。 ????????????/* Perfect, the server is already registering differences for ?????????????* another slave. Set the right state, and copy the buffer. */ ????????????// 將其他從機(jī)上的待回復(fù)的緩存復(fù)制到從機(jī) c ????????????copyClientOutputBuffer(c,slave); ????????????// 修改從機(jī) c 狀態(tài)為「等待 BGSAVE 進(jìn)程結(jié)束」 ????????????c->replstate = REDIS_REPL_WAIT_BGSAVE_END; ????????????redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); ????????} else?{ ????????// 不存在狀態(tài)為 REDIS_REPL_WAIT_BGSAVE_END 的從機(jī),就將此從機(jī) c 狀態(tài)設(shè)置為 REDIS_REPL_WAIT_BGSAVE_START,即等待新的 BGSAVE 進(jìn)程的開(kāi)啟。 ????????????// 修改狀態(tài)為「等待 BGSAVE 進(jìn)程開(kāi)始」 ????????????/* No way, we need to wait for the next BGSAVE in order to ?????????????* register differences */ ????????????c->replstate = REDIS_REPL_WAIT_BGSAVE_START; ????????????redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); ????????} ????} else?{ ????// 不存在 BGSAVE 后臺(tái)進(jìn)程,啟動(dòng)一個(gè)新的 BGSAVE 進(jìn)程 ????????/* Ok we don't have a BGSAVE in progress, let's start one */ ????????redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); ????????if?(rdbSaveBackground(server.rdb_filename) != REDIS_OK) { ????????????redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); ????????????addReplyError(c,"Unable to perform background save"); ????????????return; ????????} ????????// 將此從機(jī) c 狀態(tài)設(shè)置為 REDIS_REPL_WAIT_BGSAVE_END,從而在 BGSAVE 進(jìn)程結(jié)束后,可以發(fā)送 RDB 文件,同時(shí)將從機(jī) slave 中的更新復(fù)制到此從機(jī) c。 ????????c->replstate = REDIS_REPL_WAIT_BGSAVE_END; ????????// 清理腳本緩存??? ????????/* Flush the script cache for the new slave. */ ????????replicationScriptCacheFlush(); ????} ????if?(server.repl_disable_tcp_nodelay) ????????anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ ????c->repldbfd = -1; ????c->flags |= REDIS_SLAVE; ????server.slaveseldb = -1; /* Force to re-emit the SELECT command. */ ????listAddNodeTail(server.slaves,c); ????if?(listLength(server.slaves) == 1 && server.repl_backlog == NULL) ????????createReplicationBacklog(); ????return; } // BGSAVE 結(jié)束后,會(huì)調(diào)用 /* A background saving child (BGSAVE) terminated its work. Handle this. */ void?backgroundSaveDoneHandler(int?exitcode, int?bysignal) { ????// 其他操作 ????...... ????// 可能從機(jī)正在等待 BGSAVE 進(jìn)程的終止 ????/* Possibly there are slaves waiting for a BGSAVE in order to be served ?????* (the first stage of SYNC is a bulk transfer of dump.rdb) */ ????updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR); } // 當(dāng) RDB 持久化(backgroundSaveDoneHandler())結(jié)束后,會(huì)調(diào)用此函數(shù) // RDB 文件就緒,給所有的從機(jī)發(fā)送 RDB 文件 /* This function is called at the end of every background saving. * The argument bgsaveerr is REDIS_OK if the background saving succeeded * otherwise REDIS_ERR is passed to the function. * * The goal of this function is to handle slaves waiting for a successful * background saving in order to perform non-blocking synchronization. */ void?updateSlavesWaitingBgsave(int?bgsaveerr) { ????listNode *ln; ????int?startbgsave = 0; ????listIter li; ????listRewind(server.slaves,&li); ????while((ln = listNext(&li))) { ????????redisClient *slave = ln->value; ????????// 等待 BGSAVE 開(kāi)始。調(diào)整狀態(tài)為等待下一次 BGSAVE 進(jìn)程的結(jié)束 ????????if?(slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { ????????????startbgsave = 1; ????????????slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; ????????// 等待 BGSAVE 結(jié)束。準(zhǔn)備向 slave 發(fā)送 RDB 文件 ????????} else?if?(slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { ????????????struct?redis_stat buf; ????????????// 如果 RDB 持久化失敗, bgsaveerr 會(huì)被設(shè)置為 REDIS_ERR ????????????if?(bgsaveerr != REDIS_OK) { ????????????????freeClient(slave); ????????????????redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); ????????????????continue; ????????????} ????????????// 打開(kāi) RDB 文件 ????????????if?((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || ????????????????redis_fstat(slave->repldbfd,&buf) == -1) { ????????????????freeClient(slave); ????????????????redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); ????????????????continue; ????????????} ????????????slave->repldboff = 0; ????????????slave->repldbsize = buf.st_size; ????????????slave->replstate = REDIS_REPL_SEND_BULK; ????????????// 如果之前有注冊(cè)寫事件,取消 ????????????aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); ????????????// 注冊(cè)新的寫事件,sendBulkToSlave() 傳輸 RDB 文件 ????????????if?(aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { ????????????????freeClient(slave); ????????????????continue; ????????????} ????????} ????} ????// startbgsave == REDIS_ERR 表示 BGSAVE 失敗,再一次進(jìn)行 BGSAVE 嘗試 ????if?(startbgsave) { ????????/* Since we are starting a new background save for one or more slaves, ?????????* we flush the Replication Script Cache to use EVAL to propagate every ?????????* new EVALSHA for the first time, since all the new slaves don't know ?????????* about previous scripts. */ ????????replicationScriptCacheFlush(); ????????if?(rdbSaveBackground(server.rdb_filename) != REDIS_OK) { ????????/*BGSAVE 可能 fork 失敗,所有等待 BGSAVE 的從機(jī)都將結(jié)束連接。這是 redis 自我保護(hù)的措施,fork 失敗很可能是內(nèi)存緊張*/ ????????????listIter li; ????????????listRewind(server.slaves,&li); ????????????redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed"); ????????????while((ln = listNext(&li))) { ????????????????redisClient *slave = ln->value; ????????????????if?(slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) ????????????????????freeClient(slave); ????????????} ????????} ????} } |
部分同步
如上所說(shuō),無(wú)論如何,redis 首先會(huì)嘗試部分同步。部分同步即把積壓空間緩存的數(shù)據(jù),即更新記錄發(fā)送給從機(jī)。
從機(jī)連接主機(jī)后,會(huì)主動(dòng)發(fā)起 PSYNC 命令,從機(jī)會(huì)提供 master_runid 和 offset,主機(jī)驗(yàn)證 master_runid 和 offset 是否有效?
驗(yàn)證通過(guò)則,進(jìn)行部分同步:主機(jī)返回 +CONTINUE(從機(jī)接收后會(huì)注冊(cè)積壓數(shù)據(jù)接收事件),接著發(fā)送積壓空間數(shù)據(jù)。
| // 連接主機(jī) connectWithMaster() 的時(shí)候,會(huì)被注冊(cè)為回調(diào)函數(shù) void?syncWithMaster(aeEventLoop *el, int?fd, void?*privdata, int?mask) { ????char?tmpfile[256], *err; ????int?dfd, maxtries = 5; ????int?sockerr = 0, psync_result; ????socklen_t errlen = sizeof(sockerr); ????...... ????// 嘗試部分同步,主機(jī)允許進(jìn)行部分同步會(huì)返回 +CONTINUE,從機(jī)接收后注冊(cè)相應(yīng)的事件 ????/* Try a partial resynchonization. If we don't have a cached master ?????* slaveTryPartialResynchronization() will at least try to use PSYNC ?????* to start a full resynchronization so that we get the master run id ?????* and the global offset, to try a partial resync at the next ?????* reconnection attempt. */ ????// 函數(shù)返回三種狀態(tài): ????// PSYNC_CONTINUE:表示會(huì)進(jìn)行部分同步,在 slaveTryPartialResynchronization() ?????????????????????// 中已經(jīng)設(shè)置回調(diào)函數(shù) readQueryFromClient() ????// PSYNC_FULLRESYNC:全同步,會(huì)下載 RDB 文件 ????// PSYNC_NOT_SUPPORTED:未知 ????psync_result = slaveTryPartialResynchronization(fd); ????if?(psync_result == PSYNC_CONTINUE) { ????????redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); ????????return; ????} ????// 執(zhí)行全同步 ????...... } // 函數(shù)返回三種狀態(tài): // PSYNC_CONTINUE:表示會(huì)進(jìn)行部分同步,已經(jīng)設(shè)置回調(diào)函數(shù) // PSYNC_FULLRESYNC:全同步,會(huì)下載 RDB 文件 // PSYNC_NOT_SUPPORTED:未知 #define PSYNC_CONTINUE 0 #define PSYNC_FULLRESYNC 1 #define PSYNC_NOT_SUPPORTED 2 int?slaveTryPartialResynchronization(int?fd) { ????char?*psync_runid; ????char?psync_offset[32]; ????sds reply; ????/* Initially set repl_master_initial_offset to -1 to mark the current ?????* master run_id and offset as not valid. Later if we'll be able to do ?????* a FULL resync using the PSYNC command we'll set the offset at the ?????* right value, so that this information will be propagated to the ?????* client structure representing the master into server.master. */ ????server.repl_master_initial_offset = -1; ????if?(server.cached_master) { ????// 緩存了上一次與主機(jī)連接的信息,可以嘗試進(jìn)行部分同步,減少數(shù)據(jù)傳輸 ????????psync_runid = server.cached_master->replrunid; ????????snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); ????????redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset); ????} else?{ ????// 未緩存上一次與主機(jī)連接的信息,進(jìn)行全同步 ????// psync ? -1 可以獲取主機(jī)的 master_runid ????????redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)"); ????????psync_runid = "?"; ????????memcpy(psync_offset,"-1",3); ????} ????// 向主機(jī)發(fā)送命令,并接收回復(fù) ????/* Issue the PSYNC command */ ????reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL); ????// 全同步 ????if?(!strncmp(reply,"+FULLRESYNC",11)) { ????????char?*runid = NULL, *offset = NULL; ????????/* FULL RESYNC, parse the reply in order to extract the run id ?????????* and the replication offset. */ ????????runid = strchr(reply,' '); ????????if?(runid) { ????????????runid++; ????????????offset = strchr(runid,' '); ????????????if?(offset) offset++; ????????} ????????if?(!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) { ????????????redisLog(REDIS_WARNING, ????????????????"Master replied with wrong +FULLRESYNC syntax."); ????????????/* This is an unexpected condition, actually the +FULLRESYNC ?????????????* reply means that the master supports PSYNC, but the reply ?????????????* format seems wrong. To stay safe we blank the master ?????????????* runid to make sure next PSYNCs will fail. */ ????????????memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1); ????????} else?{ ????????????// 拷貝 runid ????????????memcpy(server.repl_master_runid, runid, offset-runid-1); ????????????server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0'; ????????????server.repl_master_initial_offset = strtoll(offset,NULL,10); ????????????redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld", ????????????????server.repl_master_runid, ????????????????server.repl_master_initial_offset); ????????} ????????/* We are going to full resync, discard the cached master structure. */ ????????replicationDiscardCachedMaster(); ????????sdsfree(reply); ????????return?PSYNC_FULLRESYNC; ????} ????// 部分同步 ????if?(!strncmp(reply,"+CONTINUE",9)) { ????????/* Partial resync was accepted, set the replication state accordingly */ ????????redisLog(REDIS_NOTICE, ????????????"Successful partial resynchronization with master."); ????????sdsfree(reply); ????????// 緩存主機(jī)替代現(xiàn)有主機(jī),且為 PSYNC(部分同步) 做好準(zhǔn)備c ????????replicationResurrectCachedMaster(fd); ????????return?PSYNC_CONTINUE; ????} ????/* If we reach this point we receied either an error since the master does ?????* not understand PSYNC, or an unexpected reply from the master. ?????* Reply with PSYNC_NOT_SUPPORTED in both cases. */ ????// 接收到主機(jī)發(fā)出的錯(cuò)誤信息 ????if?(strncmp(reply,"-ERR",4)) { ????????/* If it's not an error, log the unexpected event. */ ????????redisLog(REDIS_WARNING, ????????????"Unexpected reply to PSYNC from master: %s", reply); ????} else?{ ????????redisLog(REDIS_NOTICE, ????????????"Master does not support PSYNC or is in " ????????????"error state (reply: %s)", reply); ????} ????sdsfree(reply); ????replicationDiscardCachedMaster(); ????return?PSYNC_NOT_SUPPORTED; } // 主機(jī) SYNC 和 PSYNC 命令處理函數(shù),會(huì)嘗試進(jìn)行部分同步和全同步 /* SYNC ad PSYNC command implemenation. */ void?syncCommand(redisClient *c) { ????...... ????// 主機(jī)嘗試部分同步,允許則進(jìn)行部分同步,會(huì)返回 +CONTINUE,接著發(fā)送積壓空間 ????/* Try a partial resynchronization if this is a PSYNC command. ?????* If it fails, we continue with usual full resynchronization, however ?????* when this happens masterTryPartialResynchronization() already ?????* replied with: ?????* ?????* +FULLRESYNC <runid> <offset> ?????* ?????* So the slave knows the new runid and offset to try a PSYNC later ?????* if the connection with the master is lost. */ ????if?(!strcasecmp(c->argv[0]->ptr,"psync")) { ????????// 部分同步 ????????if?(masterTryPartialResynchronization(c) == REDIS_OK) { ????????????server.stat_sync_partial_ok++; ????????????return; /* No full resync needed, return. */ ????????} else?{ ????????// 部分同步失敗,會(huì)進(jìn)行全同步,這時(shí)會(huì)收到來(lái)自客戶端的 runid ????????????char?*master_runid = c->argv[1]->ptr; ????????????/* Increment stats for failed PSYNCs, but only if the ?????????????* runid is not "?", as this is used by slaves to force a full ?????????????* resync on purpose when they are not albe to partially ?????????????* resync. */ ????????????if?(master_runid[0] != '?') server.stat_sync_partial_err++; ????????} ????} else?{ ????????/* If a slave uses SYNC, we are dealing with an old implementation ?????????* of the replication protocol (like redis-cli --slave). Flag the client ?????????* so that we don't expect to receive REPLCONF ACK feedbacks. */ ????????c->flags |= REDIS_PRE_PSYNC_SLAVE; ????} ????// 執(zhí)行全同步: ????...... } // 主機(jī)嘗試是否能進(jìn)行部分同步 /* This function handles the PSYNC command from the point of view of a * master receiving a request for partial resynchronization. * * On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed * with the usual full resync. */ int?masterTryPartialResynchronization(redisClient *c) { ????long?long?psync_offset, psync_len; ????char?*master_runid = c->argv[1]->ptr; ????char?buf[128]; ????int?buflen; ????/* Is the runid of this master the same advertised by the wannabe slave ?????* via PSYNC? If runid changed this master is a different instance and ?????* there is no way to continue. */ ????if?(strcasecmp(master_runid, server.runid)) { ????// 當(dāng)因?yàn)楫惓P枰c主機(jī)斷開(kāi)連接的時(shí)候,從機(jī)會(huì)暫存主機(jī)的狀態(tài)信息,以便 ????// 下一次的部分同步。 ????// 1)master_runid 是從機(jī)提供一個(gè)因緩存主機(jī)的 runid, ????// 2)server.runid 是本機(jī)(主機(jī))的 runid。 ????// 匹配失敗,說(shuō)明是本機(jī)(主機(jī))不是從機(jī)緩存的主機(jī),這時(shí)候不能進(jìn)行部分同步, ????// 只能進(jìn)行全同步 ????????// "?" 表示從機(jī)要求全同步 ????????// 什么時(shí)候從機(jī)會(huì)要求全同步??? ????????/* Run id "?" is used by slaves that want to force a full resync. */ ????????if?(master_runid[0] != '?') { ????????????redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: " ????????????????"Runid mismatch (Client asked for '%s', I'm '%s')", ????????????????master_runid, server.runid); ????????} else?{ ????????????redisLog(REDIS_NOTICE,"Full resync requested by slave."); ????????} ????????goto?need_full_resync; ????} ????// 從參數(shù)中解析整數(shù),整數(shù)是從機(jī)指定的偏移量 ????/* We still have the data our slave is asking for? */ ????if?(getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != ???????REDIS_OK) goto?need_full_resync; ????// 部分同步失敗的情況 ????if?(!server.repl_backlog || /*不存在積壓空間*/ ????????psync_offset < server.repl_backlog_off ||? /*psync_offset 太過(guò)小, ????????????????????????????????????????????????????即從機(jī)錯(cuò)過(guò)太多更新記錄, ????????????????????????????????????????????????????安全起見(jiàn),實(shí)行全同步*/ ????????????????????????????????????????????????????/*psync_offset 越界*/ ????????psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) ????// 經(jīng)檢測(cè),不滿足部分同步的條件,轉(zhuǎn)而進(jìn)行全同步 ????{ ????????redisLog(REDIS_NOTICE, ????????????"Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset); ????????if?(psync_offset > server.master_repl_offset) { ????????????redisLog(REDIS_WARNING, ????????????????"Warning: slave tried to PSYNC with an offset that is greater than the master replication offset."); ????????} ????????goto?need_full_resync; ????} ????// 執(zhí)行部分同步: ????// 1)標(biāo)記客戶端為從機(jī) ????// 2)通知從機(jī)準(zhǔn)備接收數(shù)據(jù)。從機(jī)收到 +CONTINUE 會(huì)做好準(zhǔn)備 ????// 3)開(kāi)發(fā)發(fā)送數(shù)據(jù) ????/* If we reached this point, we are able to perform a partial resync: ?????* 1) Set client state to make it a slave. ?????* 2) Inform the client we can continue with +CONTINUE ?????* 3) Send the backlog data (from the offset to the end) to the slave. */ ????// 將連接的客戶端標(biāo)記為從機(jī) ????c->flags |= REDIS_SLAVE; ????// 表示進(jìn)行部分同步 ????// #define REDIS_REPL_ONLINE 9 /* RDB file transmitted, sending just ????// updates. */ ????c->replstate = REDIS_REPL_ONLINE; ????// 更新 ack 的時(shí)間 ????c->repl_ack_time = server.unixtime; ????// 添加入從機(jī)鏈表 ????listAddNodeTail(server.slaves,c); ????// 告訴從機(jī)可以進(jìn)行部分同步,從機(jī)收到后會(huì)做相關(guān)的準(zhǔn)備(注冊(cè)回調(diào)函數(shù)) ????/* We can't use the connection buffers since they are used to accumulate ?????* new commands at this stage. But we are sure the socket send buffer is ?????* emtpy so this write will never fail actually. */ ????buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); ????if?(write(c->fd,buf,buflen) != buflen) { ????????freeClientAsync(c); ????????return?REDIS_OK; ????} ????// 向從機(jī)寫積壓空間中的數(shù)據(jù),積壓空間存儲(chǔ)有「更新緩存」 ????psync_len = addReplyReplicationBacklog(c,psync_offset); ????redisLog(REDIS_NOTICE, ????????"Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset); ????/* Note that we don't need to set the selected DB at server.slaveseldb ?????* to -1 to force the master to emit SELECT, since the slave already ?????* has this state from the previous connection with the master. */ ????refreshGoodSlavesCount(); ????return?REDIS_OK; /* The caller can return, no full resync needed. */ need_full_resync: ????...... ????// 向從機(jī)發(fā)送 +FULLRESYNC runid repl_offset } |
暫緩主機(jī)
從機(jī)因?yàn)槟承┰?#xff0c;譬如網(wǎng)絡(luò)延遲(PING 超時(shí),ACK 超時(shí)等),可能會(huì)斷開(kāi)與主機(jī)的連接。這時(shí)候,從機(jī)會(huì)嘗試保存與主機(jī)連接的信息,譬如全局積壓空間數(shù)據(jù)偏移量等,以便下一次的部分同步,并且從機(jī)會(huì)再一次嘗試連接主機(jī)。注意一點(diǎn),如果斷開(kāi)的時(shí)間足夠長(zhǎng), 部分同步肯定會(huì)失敗的。
| void?freeClient(redisClient *c) { ????listNode *ln; ????/* If this is marked as current client unset it */ ????if?(server.current_client == c) server.current_client = NULL; ????// 如果此機(jī)為從機(jī),已經(jīng)連接主機(jī),可能需要保存主機(jī)狀態(tài)信息,以便進(jìn)行 PSYNC ????/* If it is our master that's beging disconnected we should make sure ?????* to cache the state to try a partial resynchronization later. ?????* ?????* Note that before doing this we make sure that the client is not in ?????* some unexpected state, by checking its flags. */ ????if?(server.master && c->flags & REDIS_MASTER) { ????????redisLog(REDIS_WARNING,"Connection with master lost."); ????????if?(!(c->flags & (REDIS_CLOSE_AFTER_REPLY| ??????????????????????????REDIS_CLOSE_ASAP| ??????????????????????????REDIS_BLOCKED| ??????????????????????????REDIS_UNBLOCKED))) ????????{ ????????????replicationCacheMaster(c); ????????????return; ????????} ????} ????...... } // 為了實(shí)現(xiàn)部分同步,從機(jī)會(huì)保存主機(jī)的狀態(tài)信息后才會(huì)斷開(kāi)主機(jī)的連接,主機(jī)狀態(tài)信息 // 保存在 server.cached_master // 會(huì)在 freeClient() 中調(diào)用,保存與主機(jī)連接的狀態(tài)信息,以便進(jìn)行 PSYNC void?replicationCacheMaster(redisClient *c) { ????listNode *ln; ????redisAssert(server.master != NULL && server.cached_master == NULL); ????redisLog(REDIS_NOTICE,"Caching the disconnected master state."); ????// 從客戶端列表刪除主機(jī)的信息 ????/* Remove from the list of clients, we don't want this client to be ?????* listed by CLIENT LIST or processed in any way by batch operations. */ ????ln = listSearchKey(server.clients,c); ????redisAssert(ln != NULL); ????listDelNode(server.clients,ln); ????// 保存主機(jī)的狀態(tài)信息 ????/* Save the master. Server.master will be set to null later by ?????* replicationHandleMasterDisconnection(). */ ????server.cached_master = server.master; ????// 注銷事件,關(guān)閉連接 ????/* Remove the event handlers and close the socket. We'll later reuse ?????* the socket of the new connection with the master during PSYNC. */ ????aeDeleteFileEvent(server.el,c->fd,AE_READABLE); ????aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); ????close(c->fd); ????/* Set fd to -1 so that we can safely call freeClient(c) later. */ ????c->fd = -1; ????// 修改連接的狀態(tài),設(shè)置 server.master = NULL ????/* Caching the master happens instead of the actual freeClient() call, ?????* so make sure to adjust the replication state. This function will ?????* also set server.master to NULL. */ ????replicationHandleMasterDisconnection(); } |
總結(jié)
簡(jiǎn)單來(lái)說(shuō),主從同步就是 RDB 文件的上傳下載;主機(jī)有小部分的數(shù)據(jù)修改,就把修改記錄傳播給每個(gè)從機(jī)。這篇文章詳述了 redis 主從復(fù)制的內(nèi)部協(xié)議和機(jī)制。接下來(lái)的幾篇關(guān)于 redis 的文章,主要是其內(nèi)部數(shù)據(jù)結(jié)構(gòu)。
搗亂 2014-4-22
http://daoluan.net
更多請(qǐng)?jiān)L問(wèn):http://daoluan.net轉(zhuǎn)載于:https://www.cnblogs.com/scott19820130/p/4916038.html
總結(jié)
以上是生活随笔為你收集整理的[转载] 深入剖析 redis 主从复制的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: poj1789(prim)
- 下一篇: 解决Eclipse中文乱码