【Redis学习笔记】2018-06-12 复制与传播
順風車運營研發團隊 譚淼
一、復制
Redis的主從同步復制包括兩種方式:一種是完全復制,另一種是部分復制。
完全復制:主Redis生產RDB,傳輸到從服務器進行同步。
部分復制:主Redis從復制積壓緩沖區中獲取數據,發送給從服務器進行同步。
假設兩臺Redis服務器A和B啟動后,對B執行slaveof命令,使得B變為A的從服務器,整體流程如下:
(1)B接收到slaveof命令時候,會執行slaveofCommand()函數,slaveofCommand()函數會調用replicationSetMaster()函數中,將復制狀態為REPL_STATE_CONNECT
/* Set replication to the specified master address and port. */ void replicationSetMaster(char *ip, int port) {......server.repl_state = REPL_STATE_CONNECT;server.repl_down_since = 0; }(2)周期調度的時間事件中,會定期執行replicationCron()函數
/* This is our timer interrupt, called server.hz times per second.* Here is where we do a number of things that need to be done asynchronously.* For instance:** - Active expired keys collection (it is also performed in a lazy way on* lookup).* - Software watchdog.* - Update some statistic.* - Incremental rehashing of the DBs hash tables.* - Triggering BGSAVE / AOF rewrite, and handling of terminated children.* - Clients timeout of different kinds.* - Replication reconnection.* - Many more...** Everything directly called here will be called server.hz times per second,* so in order to throttle execution of things we want to do less frequently* a macro is used: run_with_period(milliseconds) { .... }*/int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {....../* Replication cron function -- used to reconnect to master,* detect transfer failures, start background RDB transfers and so forth. */run_with_period(1000) replicationCron();...... }(3)在replicationCron()函數中,如果檢測到REPL_STATE_CONNECT狀態,調用connectWithMaster()。
/* --------------------------- REPLICATION CRON ---------------------------- *//* Replication cron function, called 1 time per second. */ void replicationCron(void) {....../* Check if we should connect to a MASTER */if (server.repl_state == REPL_STATE_CONNECT) {serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",server.masterhost, server.masterport);if (connectWithMaster() == C_OK) {serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");}}...... }(4)在connectWithMaster()中,會設置文件事件,事件處理函數為syncWithMaster()函數
int connectWithMaster(void) {int fd;fd = anetTcpNonBlockBestEffortBindConnect(NULL,server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);if (fd == -1) {serverLog(LL_WARNING,"Unable to connect to MASTER: %s",strerror(errno));return C_ERR;}if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==AE_ERR){close(fd);serverLog(LL_WARNING,"Can't create readable event for SYNC");return C_ERR;}server.repl_transfer_lastio = server.unixtime;server.repl_transfer_s = fd;server.repl_state = REPL_STATE_CONNECTING;return C_OK; }(5)在syncWithMaster中,從Redis首先會與主Redis進行通信,交換關鍵信息
(6)master根接收到PSYN消息后,根據復制ID如果復制ID與自己的復制ID相同且復制偏移量仍然存在于復制緩存區中(server.repl_backlog),那么執行部分重同步,回復CONTINUE消息,并從復制緩存區中復制相關的數據到slave。否則執行全量重同步,回復FULLRESYNC消息,生成RDB傳輸到slave。
二、命令傳播
當在主Redis寫一條命令時,會調用server.c中的call()函數,call()函數會調用propagate()來向從Redis更新數據
/* Call() is the core of Redis execution of a command.** The following flags can be passed:* CMD_CALL_NONE No flags.* CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed.* CMD_CALL_STATS Populate command stats.* CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset* or if the client flags are forcing propagation.* CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset* or if the client flags are forcing propagation.* CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL.* CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE.** The exact propagation behavior depends on the client flags.* Specifically:** 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set* and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set* in the call flags, then the command is propagated even if the* dataset was not affected by the command.* 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP* are set, the propagation into AOF or to slaves is not performed even* if the command modified the dataset.** Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF* or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or* slaves propagation will never occur.** Client flags are modified by the implementation of a given command* using the following API:** forceCommandPropagation(client *c, int flags);* preventCommandPropagation(client *c);* preventCommandAOF(client *c);* preventCommandReplication(client *c);**/ void call(client *c, int flags) {....../* Propagate the command into the AOF and replication link */if (flags & CMD_CALL_PROPAGATE &&(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP){......if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);}...... }propagate()會調用replicationFeedSlaves(),向復制積壓緩沖區中寫入數據
/* Propagate the specified command (in the context of the specified database id)* to AOF and Slaves.** flags are an xor between:* + PROPAGATE_NONE (no propagation of command at all)* + PROPAGATE_AOF (propagate into the AOF file if is enabled)* + PROPAGATE_REPL (propagate into the replication link)** This should not be used inside commands implementation. Use instead* alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().*/ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,int flags) {if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)feedAppendOnlyFile(cmd,dbid,argv,argc);if (flags & PROPAGATE_REPL)replicationFeedSlaves(server.slaves,dbid,argv,argc); }replicationFeedSlaves()負責向復制積壓緩沖區中寫入數據
/* Propagate write commands to slaves, and populate the replication backlog* as well. This function is used if the instance is a master: we use* the commands received by our clients in order to create the replication* stream. Instead if the instance is a slave and has sub-slaves attached,* we use replicationFeedSlavesFromMaster() */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {....../* Write the command to the replication backlog if any. */if (server.repl_backlog) {char aux[LONG_STR_SIZE+3];/* Add the multi bulk reply length. */aux[0] = '*';len = ll2string(aux+1,sizeof(aux)-1,argc);aux[len+1] = '\r';aux[len+2] = '\n';feedReplicationBacklog(aux,len+3);for (j = 0; j < argc; j++) {long objlen = stringObjectLen(argv[j]);/* We need to feed the buffer with the object as a bulk reply* not just as a plain string, so create the $..CRLF payload len* and add the final CRLF */aux[0] = '$';len = ll2string(aux+1,sizeof(aux)-1,objlen);aux[len+1] = '\r';aux[len+2] = '\n';feedReplicationBacklog(aux,len+3);feedReplicationBacklogWithObject(argv[j]);feedReplicationBacklog(aux+len+1,2);}}/* Write the command to every slave. */listRewind(slaves,&li);while((ln = listNext(&li))) {client *slave = ln->value;/* Don't feed slaves that are still waiting for BGSAVE to start */if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;/* Feed slaves that are waiting for the initial SYNC (so these commands* are queued in the output buffer until the initial SYNC completes),* or are already in sync with the master. *//* Add the multi bulk length. */addReplyMultiBulkLen(slave,argc);/* Finally any additional argument that was not stored inside the* static buffer if any (from j to argc). */for (j = 0; j < argc; j++)addReplyBulk(slave,argv[j]);} }總結
以上是生活随笔為你收集整理的【Redis学习笔记】2018-06-12 复制与传播的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: F4+2 团队项目软件设计方案
- 下一篇: 盘点Kubernetes网络问题的4种解