redis源码
原文鏈接
鏈接
版本
使用6.2.4
sds.h sds.c
內(nèi)存對齊
__attribute__((__packed__))可以讓編譯器對結(jié)構(gòu)體不進行內(nèi)存對齊,詳細參考
#include <stdint.h> #include <stdio.h>struct __attribute__((__packed__)) sdshdr64 {uint64_t len; /* used */uint64_t alloc; /* excluding the header and null terminator */unsigned char flags; /* 3 lsb of type, 5 unused bits */char buf[]; };struct _sdshdr64 {uint64_t len; /* used */uint64_t alloc; /* excluding the header and null terminator */unsigned char flags; /* 3 lsb of type, 5 unused bits */char buf[]; };int main() {printf("packed: %d\n", sizeof(struct sdshdr64));printf("nopacked: %d\n", sizeof(struct _sdshdr64)); } /* gcc a.c -o a && ./a packed: 17 nopacked: 24 */宏#\
##后標識的字符串會被替換,然后其左右的內(nèi)容加上自己會被合并到一起,編譯器將其視為標識符進行解析,詳細參考
sds.h 源碼
sds 可以被簡單的認為是一個 char*
typedef char *sds;接下來是5種 sds 他們是sdshdr5, sdshdr8, sdshdr16, sdshdr32, sdshdr64, 分別可以儲存長度為$2^5$, $2^8$, $2^{16}$, $2^{32}$, $2^{64}$ 的字符串。
__attribute__ ((__packed__))是編譯器指令,可以取消內(nèi)存對齊,讓內(nèi)存緊湊排列,這部分首先看后四個結(jié)構(gòu)體,他們的內(nèi)存結(jié)構(gòu)定義幾乎一摸一樣。
len: 字符串的長度
alloc: 分配的空間大小
flags: 字符串的類型(5種),所以只有最低的三位有意義,高5位不做使用。
buf: 字符串的實際內(nèi)容
對于sdshdr5,他比較特殊,實際上他的len和alloc一定相等,并儲存于flags的高5位上,借此實現(xiàn)了內(nèi)存壓縮。
/* Note: sdshdr5 is never used, we just access the flags byte directly.* However is here to document the layout of type 5 SDS strings. */ struct __attribute__ ((__packed__)) sdshdr5 {unsigned char flags; /* 3 lsb of type, and 5 msb of string length */char buf[]; }; struct __attribute__ ((__packed__)) sdshdr8 {uint8_t len; /* used */uint8_t alloc; /* excluding the header and null terminator */unsigned char flags; /* 3 lsb of type, 5 unused bits */char buf[]; }; struct __attribute__ ((__packed__)) sdshdr16 {uint16_t len; /* used */uint16_t alloc; /* excluding the header and null terminator */unsigned char flags; /* 3 lsb of type, 5 unused bits */char buf[]; }; struct __attribute__ ((__packed__)) sdshdr32 {uint32_t len; /* used */uint32_t alloc; /* excluding the header and null terminator */unsigned char flags; /* 3 lsb of type, 5 unused bits */char buf[]; }; struct __attribute__ ((__packed__)) sdshdr64 {uint64_t len; /* used */uint64_t alloc; /* excluding the header and null terminator */unsigned char flags; /* 3 lsb of type, 5 unused bits */char buf[]; };sds 把字符串的內(nèi)容,以及他的元信息(字符串類型、字符串長度、字符串分配的空間)儲存在了一起,讓內(nèi)存排列更加緊致。
adlist.c adlist.h
很普通的鏈表,并沒有什么很特殊的地方,注意listIter的direction是迭代器的方向。
typedef struct listNode {struct listNode *prev;struct listNode *next;void *value; } listNode;typedef struct listIter {listNode *next;int direction; } listIter;typedef struct list {listNode *head;listNode *tail;void *(*dup)(void *ptr);void (*free)(void *ptr);int (*match)(void *ptr, void *key);unsigned long len; } list;mt19937-64.c mt19937-64.h
梅森素數(shù)
在OEIS上,梅森素數(shù)有這些, 維基百科上也有說明, 我們需要注意到的是$2^{19937}-1$是一個梅森素數(shù)
線性反饋移位寄存器
線性反饋移位寄存器(Linear Feedback Shifting Register,簡稱 LFSR)
假設(shè)你有一個寄存器,寄存器中儲存著一些二進制位,寄存器中有幾個位被標記了,接下來會有無限輪操作,每輪操作如下
- 寄存器輸出最低位x(x=0或1)。
- 寄存器選擇被標記的位和x,取出其值,放到一起進行異或,得到y(tǒng)(y=0或1)。
- 寄存器把自己右移1位,然后把值y放入最高位。
具體來說,你有一個$8$位寄存器,初始儲存著$00001111$,其中$3$,$5$,$7$位被標記了,于是開始操作。
第一輪輸出$x=1$,然后從低位到高位選擇了$1$,$0$,$0$, 最后$y=1 \oplus1 \oplus 0 \oplus 0=0$,寄存器變成了$00000111$
第二輪輸出$x=1$,然后從低位到高位選擇了$1$,$0$,$0$, 最后$y=1 \oplus1 \oplus 0 \oplus 0=0$,寄存器變成了$00000011$
第三輪輸出$x=1$,然后從低位到高位選擇了$0$,$0$,$0$, 最后$y=1 \oplus 0 \oplus 0 \oplus 0=1$,寄存器變成了$10000001$
第四輪輸出$x=1$,然后從低位到高位選擇了$0$,$0$,$0$, 最后$y=1 \oplus 0 \oplus 0 \oplus 0=1$,寄存器變成了$11000000$
第五輪輸出$x=0$,然后從低位到高位選擇了$0$,$0$,$1$, 最后$y=0 \oplus 0 \oplus 0 \oplus 1=1$,寄存器變成了$11100000$
……
梅森旋轉(zhuǎn)算法
這是一個隨機數(shù)生成算法,這里有一篇有趣的Blog,有興趣可以讀一下。這里引用一些主要內(nèi)容。
梅森旋轉(zhuǎn)算法(Mersenne Twister Algorithm,簡稱 MT)
$32$ 位的梅森旋轉(zhuǎn)算法能夠產(chǎn)生周期為 $P$ 的 $w$-比特的隨機數(shù)序列${\vec xi}$;其中 $w=32$。這也就是說,每一個$\vec x$ 是一個長度為 $32$ 的行向量,并且其中的每一個元素都是二元數(shù)域$\mathbb{F}2 \overset{\text{def}}{=} {0, 1}$中的元素。現(xiàn)在,我們定義如下一些記號,來描述梅森旋轉(zhuǎn)算法是如何進行旋轉(zhuǎn)(線性移位)的。
- $n$:參與梅森旋轉(zhuǎn)的隨機數(shù)個數(shù);
- $r$:$[0, w)$ 之間的整數(shù);
- $m$:$(0, n]$之間的整數(shù);
- $\mathbf{A}$:$w \times w$ 的常矩陣;
- $\vec x^{(u)}$:$\vec x$的最高 $w - r$ 比特組成的數(shù)(低位補零);
- $\vec x^{(l)}$:$\vec x$的最低 r 比特組成的數(shù)(高位補零)。
梅森旋轉(zhuǎn)算法,首先需要根據(jù)隨機數(shù)種子初始化$ n $個行向量: $$ \vec x0, \vec x1, \ldots, \vec x{n - 1}. $$ 而后根據(jù)下式,從$ k=0$ 開始依次計算 $\vec x{n}$: $$ \begin{equation}\vec x{k + n} \overset{\text{def}}{=} \vec x{k + m}\oplus \bigl(\vec x{k}^{(u)}\mid \vec x{k + 1}^{(l)}\bigr)\mathbf{A}.\label{eq:twister}\end{equation} $$
其中,$\vec x\mid \vec x'$表示兩個二進制數(shù)按位或;$\vec x\oplus \vec x'$表示兩個二進制數(shù)按位半加(不進位,也就是按位異或);$\vec x\mathbf A$ 則表示按位半加的矩陣乘法。在 MT 中,$\mathbf A$ 被定義為 $$ \begin{pmatrix} & 1 \ & & 1 \ & & & \ddots \ & & & & 1 \ a{w - 1} & a{w - 2} & a{w - 3} & \cdots & a0 \end{pmatrix} $$
我們現(xiàn)在看看這個計算和旋轉(zhuǎn)有什么關(guān)系。首先不考慮矩陣$\mathbf A$.
則有$\vec x{k + n} \overset{\text{def}}{=} \vec x{k + m}\oplus \bigl(\vec x{k}^{(u)}\mid \vec x{k + 1}^{(l)}\bigr)$, 這個式子筆者看了很久才明白他就是$w$輪線性反饋移位寄存器變換。下圖是計算$xn$的時候的異或情況, 可以看到$xn$的每一個位都是獨立的異或
回過頭來看 2 式,不難發(fā)現(xiàn),這其實相當(dāng)于一個 $nw - r$ 級的線性反饋移位寄存器(取 $\vec xk^{(u)}$的最高 $w?r$ 位與 $\vec x{k + 1}^{(l)}$的最低 $r $位進行迭代異或,再經(jīng)過一個不影響周期的線性變換 $\mathbf A$)。只不過,2 式每一次運算,相當(dāng)于 $LFSR$ 進行了 $w$ 輪計算。若 $w$ 與 $nw?r$ 互素,那么這一微小的改變是不會影響 $LFSR$ 的周期的。考慮到 $LFSR$ 的計算過程像是在「旋轉(zhuǎn)」,這即是「梅森『旋轉(zhuǎn)』」名字的來由。
mt19937源碼
主要的計算都在這里
unsigned long long genrand64_int64(void) {//...for (i=0;i<NN-MM;i++) {x = (mt[i]&UM)|(mt[i+1]&LM);mt[i] = mt[i+MM] ^ (x>>1) ^ mag01[(int)(x&1ULL)];}for (;i<NN-1;i++) {x = (mt[i]&UM)|(mt[i+1]&LM);mt[i] = mt[i+(MM-NN)] ^ (x>>1) ^ mag01[(int)(x&1ULL)];}//... }然后是63位生成
/* generates a random number on [0, 2^63-1]-interval */ long long genrand64_int63(void) {return (long long)(genrand64_int64() >> 1); }實數(shù)的生成
/* generates a random number on [0,1]-real-interval */ double genrand64_real1(void) {return (genrand64_int64() >> 11) * (1.0/9007199254740991.0); }/* generates a random number on [0,1)-real-interval */ double genrand64_real2(void) {return (genrand64_int64() >> 11) * (1.0/9007199254740992.0); }/* generates a random number on (0,1)-real-interval */ double genrand64_real3(void) {return ((genrand64_int64() >> 12) + 0.5) * (1.0/4503599627370496.0); }dict.c dict.h
字典源碼
字典結(jié)構(gòu)體定義,需要注意這里有兩個dictht,即兩個字典,這涉及到了一個重hash問題,redis使用了漸進式rehash算法,即把重hash分布到各個地方(插入、查詢等),使得重hash的復(fù)雜度降低為$O1$,
redis是單線程,絕對不能出現(xiàn)過于耗時的操作,否則影響redis延時
typedef struct dict {dictType *type;void *privdata;dictht ht[2];long rehashidx; /* rehashing not in progress if rehashidx == -1 */int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */ } dict;server.h server.c-1-跳表
跳表定義在這里
/* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode {sds ele;double score;struct zskiplistNode *backward;struct zskiplistLevel {struct zskiplistNode *forward;unsigned long span;} level[]; } zskiplistNode;typedef struct zskiplist {struct zskiplistNode *header, *tail;unsigned long length;int level; } zskiplist;intset.c intset.h
整數(shù)集合,這里可以儲存整數(shù)
typedef struct intset {uint32_t encoding;uint32_t length;int8_t contents[]; } intset;intset *intsetNew(void); intset *intsetAdd(intset *is, int64_t value, uint8_t *success); intset *intsetRemove(intset *is, int64_t value, int *success); uint8_t intsetFind(intset *is, int64_t value); int64_t intsetRandom(intset *is); uint8_t intsetGet(intset *is, uint32_t pos, int64_t *value); uint32_t intsetLen(const intset *is); size_t intsetBlobLen(intset *is); int intsetValidateIntegrity(const unsigned char *is, size_t size, int deep);encoding是編碼方式,指的是contents中的數(shù)據(jù)如何儲存,編碼方式分為三種
/* Note that these encodings are ordered, so:* INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64. */ #define INTSET_ENC_INT16 (sizeof(int16_t)) #define INTSET_ENC_INT32 (sizeof(int32_t)) #define INTSET_ENC_INT64 (sizeof(int64_t))length是數(shù)字的個數(shù)
contents是內(nèi)容,但是他不一定是8位的整數(shù),取決于encoding的值。
整數(shù)集合升級
由于整數(shù)集合初始情況儲存的是INTSETENCINT16,當(dāng)你插入一個32位的數(shù)字以后,會出現(xiàn)溢出,這時候就需要進行升級,就直接開辟新的空間然后拷貝過去,復(fù)雜的$O(N)$
不支持降級
ziplist.c ziplist.h
壓縮列表
server.h server.c-2-對象
redis對象都在這里統(tǒng)一起來
```c typedef struct redisObject { unsigned type:4; unsigned encoding:4; unsigned lru:LRUBITS; /* LRU time (relative to global lruclock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ int refcount; void *ptr; } robj;
# server.h-3-db這次主要關(guān)注redisServer,這個結(jié)構(gòu)體有460行,筆者省去了一些,可以砍刀redisDb是一個數(shù)組,dbnum記錄他的數(shù)量,一般情況下,dbnum為6c struct redisServer { // … redisDb db; // … int dbnum; / Total number of configured DBs */ // … };
然后是客戶端這邊, 注意到client,. 這里也有一個指針,當(dāng)然他指向的就是當(dāng)前使用的db,而不是數(shù)組。c typedef struct client { // … redisDb db; / Pointer to currently SELECTed DB. */ // … } client;
看完服務(wù)器和客戶端,然后看dbc /* Redis database representation. There are multiple databases identified
- by integers from 0 (the default database) up to the max configured
- database. The database number is the 'id' field in the structure. */ typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blockingkeys; /* Keys with clients waiting for data (BLPOP)*/ dict *readykeys; /* Blocked keys that received a PUSH */ dict *watchedkeys; /* WATCHED keys for MULTI/EXEC CAS / int id; / Database ID / long long avgttl; / Average TTL, just for stats / unsigned long expirescursor; / Cursor of the active expire cycle. */ list *defraglater; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb;
c struct rio { /* Backend functions. * Since this functions do not tolerate short writes or reads the return * value is simplified to: zero on error, non zero on complete success. / sizet (read)(struct rio *, void *buf, sizet len); size_t (*write)(struct _rio *, const void buf, sizet len); offt (tell)(struct _rio ); int (flush)(struct _rio ); / The updatecksum method if not NULL is used to compute the checksum of * all the data that was read or written so far. The method should be * designed so that can be called with the current checksum, and the buf * and len fields pointing to the new block of data to add to the checksum * computation. / void (updatecksum)(struct rio *, const void *buf, sizet len);
/* The current checksum and flags (see RIO_FLAG_*) */ uint64_t cksum, flags;/* number of bytes read or written */ size_t processed_bytes;/* maximum single read or write chunk size */ size_t max_processing_chunk;/* Backend-specific vars. */ union {/* In-memory buffer target. */struct {sds ptr;off_t pos;} buffer;/* Stdio file pointer target. */struct {FILE *fp;off_t buffered; /* Bytes written since last fsync. */off_t autosync; /* fsync after 'autosync' bytes written. */} file;/* Connection object (used to read from socket) */struct {connection *conn; /* Connection */off_t pos; /* pos in buf that was returned */sds buf; /* buffered data */size_t read_limit; /* don't allow to buffer/read more than that */size_t read_so_far; /* amount of data read from the rio (not buffered) */} conn;/* FD target (used to write to pipe). */struct {int fd; /* File descriptor. */off_t pos;sds buf;} fd; } io;};
簡單來說,他的這些字段,分別對應(yīng)這些內(nèi)容:| 字段 | 內(nèi)容 | | :------------------: | :------------------: | | read | 讀數(shù)據(jù),是函數(shù)指針 | | write | 寫數(shù)據(jù),是函數(shù)指針 | | tell | tell,是函數(shù)指針 | | flush | flush,是函數(shù)指針 | | update_cksum | 校驗和,是函數(shù)指針 | | cksum | 當(dāng)前校驗和 | | flags | 是否發(fā)生讀寫錯誤 | | processed_bytes | 已經(jīng)處理的字節(jié)數(shù) | | max_processing_chunk | 單次最大處理的字節(jié)數(shù) | | io | 具體的讀寫目標 |這里的函數(shù)指針主要作用是給后面的下面這些函數(shù)使用,這種編程方式有一點像面向?qū)ο笾械某橄箢悺W⒁饪?#xff0c;下面的`rioWrite`使用了對象`r`的`write`方法,實現(xiàn)了任意 長度`len`的寫入。而對象`r`的`write`方法是不支持任意長度len的。`rioRead`也是同理了。c static inline sizet rioWrite(rio *r, const void *buf, sizet len) { if (r->flags & RIOFLAGWRITEERROR) return 0; while (len) { sizet bytestowrite = (r->maxprocessingchunk && r->maxprocessingchunk < len) ? r->maxprocessingchunk : len; if (r->updatecksum) r->updatecksum(r,buf,bytestowrite); if (r->write(r,buf,bytestowrite) == 0) { r->flags |= RIOFLAGWRITEERROR; return 0; } buf = (char*)buf + bytestowrite; len -= bytestowrite; r->processedbytes += bytestowrite; } return 1; }
static inline sizet rioRead(rio *r, void *buf, sizet len) { if (r->flags & RIOFLAGREADERROR) return 0; while (len) { sizet bytestoread = (r->maxprocessingchunk && r->maxprocessingchunk < len) ? r->maxprocessingchunk : len; if (r->read(r,buf,bytestoread) == 0) { r->flags |= RIOFLAGREADERROR; return 0; } if (r->updatecksum) r->updatecksum(r,buf,bytestoread); buf = (char*)buf + bytestoread; len -= bytestoread; r->processedbytes += bytestoread; } return 1; }
static inline off_t rioTell(rio *r) { return r->tell(r); }
static inline int rioFlush(rio *r) { return r->flush(r); }
這里有一個有趣的函數(shù)c /* Flushes any buffer to target device if applicable. Returns 1 on success
- and 0 on failures. */ static int rioBufferFlush(rio *r) { UNUSED(r); return 1; /* Nothing to do, our write just appends to the buffer. */ }
c /* ------------------------- Buffer I/O implementation ----------------------- */
/* Returns 1 or 0 for success/failure. */ static sizet rioBufferWrite(rio *r, const void buf, sizet len) { r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char)buf,len); r->io.buffer.pos += len; return 1; }
/* Returns 1 or 0 for success/failure. */ static sizet rioBufferRead(rio *r, void buf, sizet len) { if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len) return 0; / not enough buffer to return len bytes. */ memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len); r->io.buffer.pos += len; return 1; }
/* Returns read/write position in buffer. */ static off_t rioBufferTell(rio *r) { return r->io.buffer.pos; }
/* Flushes any buffer to target device if applicable. Returns 1 on success
- and 0 on failures. */ static int rioBufferFlush(rio *r) { UNUSED(r); return 1; /* Nothing to do, our write just appends to the buffer. */ }
static const rio rioBufferIO = { rioBufferRead, rioBufferWrite, rioBufferTell, rioBufferFlush, NULL, /* update_checksum / 0, / current checksum / 0, / flags / 0, / bytes read or written / 0, / read/write chunk size / { { NULL, 0 } } / union for io-specific vars */ };
void rioInitWithBuffer(rio *r, sds s) { *r = rioBufferIO; r->io.buffer.ptr = s; r->io.buffer.pos = 0; }
文件io和緩沖區(qū)io相差不大,注意關(guān)注文件io的寫函數(shù),這里涉及到一個[異步刷盤](https://blog.csdn.net/mengyafei43/article/details/38319783)的問題。redis對多個操作系統(tǒng)做了兼容,在linux下`redis_fsync`就是`fsync`,文件讀寫也有自己的緩沖區(qū),一旦開啟了自動同步`io.file.autosync`,則每寫入一定數(shù)量`io.file.buffered`的數(shù)據(jù),就進行同步`fsync(fileno(fp))`。c /* Returns 1 or 0 for success/failure. */ static sizet rioFileWrite(rio *r, const void *buf, sizet len) { size_t retval;
retval = fwrite(buf,len,1,r->io.file.fp); r->io.file.buffered += len;if (r->io.file.autosync &&r->io.file.buffered >= r->io.file.autosync) {fflush(r->io.file.fp);if (redis_fsync(fileno(r->io.file.fp)) == -1) return 0;r->io.file.buffered = 0; } return retval;}
接下來的兩個io分別是connection io和 file descriptor io, 前者只實現(xiàn)了從socket中讀取數(shù)據(jù)的接口,后者只實現(xiàn)了向fd中寫數(shù)據(jù)的接口(`This target is used to write the RDB file to pipe, when the master just streams the data to the replicas without creating an RDB on-disk image (diskless replication option)`)。# rdb.c rdb.h## rdbSaveRio直接看函數(shù)`rdbSaveRio`的實現(xiàn),第一部分是一些準備工作,RDB的版本被儲存到了字符串magic中c int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { // … dictIterator *di = NULL; dictEntry *de; char magic[10]; uint64t cksum; sizet processed = 0; int j; long keycount = 0; long long infoupdatedtime = 0; char *pname = (rdbflags & RDBFLAGSAOF_PREAMBLE) ? "AOF rewrite" : "RDB";
if (server.rdb_checksum)rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); // ...}
第二部分`rdbWriteRaw`直接把magic版本數(shù)據(jù)寫入rdb輸出流,`rdbSaveInfoAuxFields`寫入了一些kv對,分別是`redis-ver`,`redis-bits`,`ctime`和`used-mem`。對于`rdbSaveModulesAux`,他是module.c和module.h中的內(nèi)容,大概就是保存了一個modules字典。c int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { // … if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDISVERSION) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redisbits) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmallocusedmemory()) == -1) return -1; // … return 1; }
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { // … if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr; if (rdbSaveModulesAux(rdb, REDISMODULEAUXBEFORE_RDB) == -1) goto werr; // … }
第三部分開始處理數(shù)據(jù)庫,其主體如下。依次寫入了數(shù)據(jù)庫的編號、數(shù)據(jù)庫kv個數(shù),數(shù)據(jù)庫超時kv個數(shù)。c int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { // … for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; di = dictGetSafeIterator(d);
/* Write the SELECT DB opcode */if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;if (rdbSaveLen(rdb,j) == -1) goto werr;/* Write the RESIZE DB opcode. */uint64_t db_size, expires_size;db_size = dictSize(db->dict);expires_size = dictSize(db->expires);if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;if (rdbSaveLen(rdb,db_size) == -1) goto werr;if (rdbSaveLen(rdb,expires_size) == -1) goto werr;/* Iterate this DB writing every entry */while((de = dictNext(di)) != NULL) {// ...} } // ...}
第三部分的`while`循環(huán)中,對整個數(shù)據(jù)庫的kv字典進行了迭代,依次寫入了rio的流。c /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { sds keystr = dictGetKey(de); robj key, *o = dictGetVal(de); long long expire;
initStaticStringObject(key,keystr); expire = getExpire(db,&key); if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;/* When this RDB is produced as part of an AOF rewrite, move* accumulated diff from parent to child while rewriting in* order to have a smaller final write. */ if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {processed = rdb->processed_bytes;aofReadDiffFromParent(); }/* Update child info every 1 second (approximately).* in order to avoid calling mstime() on each iteration, we will* check the diff every 1024 keys */ if ((key_count++ & 1023) == 0) {long long now = mstime();if (now - info_updated_time >= 1000) {sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, pname);info_updated_time = now;} }}
最后一部分,寫入了結(jié)束符和checksumc /* If we are storing the replication information on disk, persist * the script cache as well: on successful PSYNC after a restart, we need * to be able to process any EVALSHA inside the replication backlog the * master will send us. */ if (rsi && dictSize(server.luascripts)) { di = dictGetIterator(server.luascripts); while((de = dictNext(di)) != NULL) { robj *body = dictGetVal(de); if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1) goto werr; } dictReleaseIterator(di); di = NULL; /* So that we don't release it again on error. */ }
if (rdbSaveModulesAux(rdb, REDISMODULEAUXAFTER_RDB) == -1) goto werr;
/* EOF opcode */ if (rdbSaveType(rdb,RDBOPCODEEOF) == -1) goto werr;
/* CRC64 checksum. It will be zero if checksum computation is disabled, the * loading code skips the check in this case. */ cksum = rdb->cksum; memrev64ifbe(&cksum); if (rioWrite(rdb,&cksum,8) == 0) goto werr; return C_OK;
## rdbSave首先rdbSave創(chuàng)建了一個名為`temp-pid.rdb`的文件,該文件將用于輸出rdb的結(jié)果。c int rdbSave(char *filename, rdbSaveInfo *rsi) { char tmpfile[256]; char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ FILE *fp = NULL; rio rdb; int error = 0;
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) {char *cwdp = getcwd(cwd,MAXPATHLEN);serverLog(LL_WARNING,"Failed opening the RDB file %s (in server root dir %s) ""for saving: %s",filename,cwdp ? cwdp : "unknown",strerror(errno));return C_ERR; } // ...}
然后使用該文件初始化rio流,并根據(jù)配置文件rio是否進行自動刷盤。c int rdbSave(char *filename, rdbSaveInfo *rsi) { // … rioInitWithFile(&rdb,fp); startSaving(RDBFLAGS_NONE);
if (server.rdb_save_incremental_fsync)rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); // ...}
接著執(zhí)行`rdbSaveRio`,并刷盤c int rdbSave(char *filename, rdbSaveInfo *rsi) { // … if (rdbSaveRio(&rdb,&error,RDBFLAGSNONE,rsi) == CERR) { errno = error; goto werr; }
/* Make sure data will not remain on the OS's output buffers */ if (fflush(fp)) goto werr; if (fsync(fileno(fp))) goto werr; if (fclose(fp)) { fp = NULL; goto werr; } fp = NULL; // ...}
最后把這個rdb文件命名為`filename`,并結(jié)束rdb。## rdbSaveBackgroundfork出一個子進程,子進程執(zhí)行rdb任務(wù)。c int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { pid_t childpid;
if (hasActiveChildProcess()) return C_ERR;server.dirty_before_bgsave = server.dirty; server.lastbgsave_try = time(NULL);if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {int retval;/* Child */redisSetProcTitle("redis-rdb-bgsave");redisSetCpuAffinity(server.bgsave_cpulist);retval = rdbSave(filename,rsi);if (retval == C_OK) {sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");}exitFromChild((retval == C_OK) ? 0 : 1); } else {/* Parent */if (childpid == -1) {server.lastbgsave_status = C_ERR;serverLog(LL_WARNING,"Can't save in background: fork: %s",strerror(errno));return C_ERR;}serverLog(LL_NOTICE,"Background saving started by pid %ld",(long) childpid);server.rdb_save_time_start = time(NULL);server.rdb_child_type = RDB_CHILD_TYPE_DISK;return C_OK; } return C_OK; /* unreached */} ```
Makefile
acl.c
ae.c
ae.h
ae_epoll.c
ae_evport.c
ae_kqueue.c
ae_select.c
anet.c
anet.h
aof.c
asciilogo.h
atomicvar.h
bio.c
bio.h
bitops.c
blocked.c
childinfo.c
cli_common.c
cli_common.h
cluster.c
cluster.h
config.c
config.h
connection.c
connection.h
connhelpers.h
crc16.c
crc16_slottable.h
crc64.c
crc64.h
crcspeed.c
crcspeed.h
db.c
debug.c
debugmacro.h
defrag.c
endianconv.c
endianconv.h
evict.c
expire.c
fmacros.h
geo.c
geo.h
geohash.c
geohash.h
geohash_helper.c
geohash_helper.h
help.h
hyperloglog.c
latency.c
latency.h
lazyfree.c
listpack.c
listpack.h
listpack_malloc.h
localtime.c
lolwut.c
lolwut.h
lolwut5.c
lolwut6.c
lzf.h
lzfP.h
lzf_c.c
lzf_d.c
memtest.c
mkreleasehdr.sh
module.c
modules
monotonic.c
monotonic.h
multi.c
networking.c
notify.c
object.c
pqsort.c
pqsort.h
pubsub.c
quicklist.c
quicklist.h
rand.c
rand.h
rax.c
rax.h
rax_malloc.h
redis-benchmark.c
redis-check-aof.c
redis-check-rdb.c
redis-cli.c
redis-trib.rb
redisassert.c
redisassert.h
redismodule.h
release.c
replication.c
scripting.c
sdsalloc.h
sentinel.c
server.c
server.h
setcpuaffinity.c
setproctitle.c
sha1.c
sha1.h
sha256.c
sha256.h
siphash.c
slowlog.c
slowlog.h
solarisfixes.h
sort.c
sparkline.c
sparkline.h
stream.h
syncio.c
t_hash.c
t_list.c
t_set.c
t_stream.c
t_string.c
t_zset.c
testhelp.h
timeout.c
tls.c
tracking.c
util.c
util.h
valgrind.sup
version.h
zipmap.c
zipmap.h
zmalloc.c
zmalloc.h
總結(jié)
- 上一篇: 《Adobe Photoshop CS5
- 下一篇: HashMap?面试?我是谁?我在哪?