redis创建像mysql表结构_如何给redis添加新数据结构
前言
作為一款緩存型nosql數據庫,redis在誕生之初就以高性能、豐富的數據結構等特性獲得業界的青睞。redis默認提供了五種數據類型的支持:string、list、set、zset、hash。針對一般性的日常應用,這些數據結構基本可以滿足我們了,但是針對一些特定業務場景,需要一些新的數據結構來簡化業務的開發和使用,比如在物流行業中,可能需要存儲多邊形地理信息并對點、線和多邊形的關系進行一些位置相關運算(比如使用R-tree結構)。因此,為redis開發新的數據結構顯得尤為重要,本文就將以一個簡單的實例來介紹開發一個新的redis數據結構所需要做的所有事情,雖然redis4開始已經提供了module機制,使用module機制開發數據結構更為方便,但是為了更深入的理解redis內部的源碼細節,本文不使用module方式。
首先,先以一張圖從宏觀上展示一下redis現有數據結構的概況,由于空間有限,下圖沒有列出redis所有數據結構,以及對每一種數據結構只展示了一種編碼方式,但是這對理解起來沒有任何影響(本文所有圖片看不清的可以單獨放大圖片觀看)。
流程
為了行文的方便,我先直接給出要開發一個redis新數據結構所需要做的基本流程:
數據結構定義
添加新的對象類型(redisObject)、編碼方式以及對象創建和銷毀方法
編寫rdb的save和load方法
編寫aof rewrite方法
編寫數據結構內存使用統計方法
實現命令
編寫unit test
下面我就分別按照上面的幾個步驟來實現。
1、數據結構定義
為了敘述簡單,本文以一個并沒有實際業務意義的數據結構實現為目的。它實際上就是一個單向鏈表,我將該數據結構命名為HelloType。將我們的數據結構定義在hellotype.h中。
在hellotype.h文件中,我們首先定義鏈表節點:
struct HelloTypeNode {
int64_t value;// 節點承載值
struct HelloTypeNode *next;// 節點指針
};
然后定義redis數據結構:
struct HelloTypeObject {
struct HelloTypeNode *head;// 鏈表頭結點
size_t len; // 已經添加的鏈表節點的個數
}HelloTypeObject;
2、添加對象類型、對象創建方法和銷毀方法
定義好了數據結構,那么該數據結構在什么時候初始化或者是創建呢?在redis中,所有數據結構都是以對象(redisObject)的形式存在的,對象的定義如下(定義在server.h):
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits decreas time). */
int refcount;
void *ptr;
} robj;
所有數據結構對上都呈現為redisObject,對下使用不同的encoding進行編碼、存儲,不同的數據類型使用type字段進行區分,type只有4 bit,因此使用傳統方式最多只能定義16種redis數據結構(使用module方式則沒有這個限制),redis現在支持的數據結構type定義如下(定義在server.h):
/* The actual Redis Object */
#define OBJ_STRING 0 /* String object. */
#define OBJ_LIST 1 /* List object. */
#define OBJ_SET 2 /* Set object. */
#define OBJ_ZSET 3 /* Sorted set object. */
#define OBJ_HASH 4 /* Hash object. */
/* The "module" object type is a special one that signals that the object
* is one directly managed by a Redis module. In this case the value points
* to a moduleValue struct, which contains the object value (which is only
* handled by the module itself) and the RedisModuleType struct which lists
* function pointers in order to serialize, deserialize, AOF-rewrite and
* free the object.
*
* Inside the RDB file, module types are encoded as OBJ_MODULE followed
* by a 64 bit module type ID, which has a 54 bits module-specific signature
* in order to dispatch the loading to the right module, plus a 10 bits
* encoding version. */
#define OBJ_MODULE 5 /* Module object. */
#define OBJ_STREAM 6 /* Stream object. */
#define OBJ_HELLO_TYPE 7 // 我們自己的新類型
如上所示,我們添加了OBJ_HELLO_TYPE類型字段,但是該對象還沒有辦法創建,在redis中,對象的創建需要定義create*之類的創建函數,比如hash的創建函數實現為(定義在object.c):
robj *createHashObject(void) {
unsigned char *zl = ziplistNew();// 創建ziplist
robj *o = createObject(OBJ_HASH, zl);
o->encoding = OBJ_ENCODING_ZIPLIST;// 使用ziplist編碼方式
return o;
}
set的創建函數如下(針對不同的編碼方式會有多個創建函數):
robj *createSetObject(void) {
dict *d = dictCreate(&setDictType,NULL);
robj *o = createObject(OBJ_SET,d);
o->encoding = OBJ_ENCODING_HT;// 使用hashtable編碼方式存儲
return o;
}
robj *createIntsetObject(void) {
intset *is = intsetNew();
robj *o = createObject(OBJ_SET,is);
o->encoding = OBJ_ENCODING_INTSET;// 使用intset編碼方式存儲
return o;
}
上面所有的創建函數最終都會用到createObject創建對象,其定義如下(定義在object.c):
robj *createObject(int type, void *ptr) {
robj *o = zmalloc(sizeof(*o));
o->type = type;
o->encoding = OBJ_ENCODING_RAW;// 默認的編碼方式是RAW
o->ptr = ptr; // 針對不同的編碼方式,這里指向的數據結構是不同的
o->refcount = 1;
/* Set the LRU to the current lruclock (minutes resolution), or
* alternatively the LFU counter. */
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
} else {
o->lru = LRU_CLOCK();
}
return o;
}
上面的創建函數一直在使用encoding這個概念,出于性能和內存占用的權衡考慮,redis為每種數據結構至少定義了兩種encoding方式,它們和數據結構的對應關系如下:
有了上面的概念,那么我們可以定義自己的對象創建函數了,如下(定義在object.c):
robj *createHelloTypeObject(void){
HelloTypeObject *h = hellotypeNew();// 創建我們自定義的數據結構
robj *o = createObject(OBJ_HELLO_TYPE,h);// 次數默認使用OBJ_ENCODING_RAW編碼類型,這里也可以自定義新的編碼類型,對實現而言沒有本質的影響
return o;
}
其中hellotypeNew函數是自定義數據結構的創建函數,那么它在哪里定義呢?由redis源碼可以看出,redis的所有數據結構(創建函數和命令的實現函數)都定義在一個單獨的文件中,并且文件名都以t_開頭(t為type的縮寫),比如t_set.c、t_hash.c等,那么我們也遵循這個約束,將其定義為t_hellotype.c,并在其中添加如下內容:
#include "server.h"
#include "hellotype.h"
HelloTypeObject *hellotypeNew(void){
HelloTypeObject *h = zmalloc(sizeof(*h));
h->head = NULL;// 頭指針為NULL
h->len = 0;
return h;
}
同時,為了便于被其他文件引用,在hellotype.h中為該函數添加聲明,因此此時的hellotype.h文件內容如下:
#ifndef HELLO_TYPE_H
#define HELLO_TYPE_H
#include "server.h"
struct HelloTypeNode {
int64_t value;
struct HelloTypeNode *next;
};
typedef struct HelloTypeObject {
struct HelloTypeNode *head;
size_t len;
}HelloTypeObject;
HelloTypeObject *hellotypeNew(void);
#endif
對象被創建之后,什么時候被釋放呢?redis使用引用計數的方式來管理對象的生命周期,每次刪除一個對象的時候都將其引用計數減1,如果引用計數為0才會真正的執行刪除操作,該邏輯在?object.c中的decrRefCount函數中實現:
void decrRefCount(robj *o) {
if (o->refcount == 1) {
switch(o->type) {
case OBJ_STRING: freeStringObject(o); break;
case OBJ_LIST: freeListObject(o); break;
case OBJ_SET: freeSetObject(o); break;
case OBJ_ZSET: freeZsetObject(o); break;
case OBJ_HASH: freeHashObject(o); break;
case OBJ_MODULE: freeModuleObject(o); break;
case OBJ_STREAM: freeStreamObject(o); break;
case OBJ_HELLO_TYPE:freeHelloTypeObject(o); break;// 添加我們自己的數據結構釋放函數
default: serverPanic("Unknown object type"); break;
}
zfree(o);
} else {
if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
}
}
freeHelloTypeObject函數也實現在object.c中,其本質就是循環釋放一個鏈表的所有節點,如下:
void freeHelloTypeObject(robj *o){
struct HelloTypeNode *cur, *next;
cur = (( struct HelloTypeObject * )o->ptr)->head;// 先找到頭結點
while(cur) {
next = cur->next;
zfree(cur);
cur = next;
}
zfree(o);
}
3、編寫rdb的save和load方法
我們都知道,rdb是redis持久化的一種機制,為了能讓我們自己的數據結構也能被正確的備份和恢復,就需要我們實現其save和load方法。
首先,還是先大致介紹些RDB文件的組織結構,大致可以用下圖表示:
可以看到,RDB對每一個kv pair都使用一個類型來標識后面存儲的value的類型(key的類型永遠為string),因此為了讓RDB可以正確的識別出我們自定義的類型,也需要增加一個RDB類型,在rdb.h中更改如下:
/* Map object types to RDB object types. Macros starting with OBJ_ are for
* memory storage and may change. Instead RDB types must be fixed because
* we store them on disk. */
#define RDB_TYPE_STRING 0
#define RDB_TYPE_LIST 1
#define RDB_TYPE_SET 2
#define RDB_TYPE_ZSET 3
#define RDB_TYPE_HASH 4
#define RDB_TYPE_ZSET_2 5 /* ZSET version 2 with doubles stored in binary. */
#define RDB_TYPE_MODULE 6
#define RDB_TYPE_MODULE_2 7 /* Module value with annotations for parsing without
the generating module being loaded. */
#define RDB_TYPE_HELLO_TYPE 8 // 我們自己的RDB類型
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
/* Object types for encoded objects. */
#define RDB_TYPE_HASH_ZIPMAP 9
#define RDB_TYPE_LIST_ZIPLIST 10
#define RDB_TYPE_SET_INTSET 11
#define RDB_TYPE_ZSET_ZIPLIST 12
#define RDB_TYPE_HASH_ZIPLIST 13
#define RDB_TYPE_LIST_QUICKLIST 14
#define RDB_TYPE_STREAM_LISTPACKS 15
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
/* Test if a type is an object type. */
#define rdbIsObjectType(t) ((t >= 0 && t <= 8) || (t >= 9 && t <= 15)) // 不要忘記更改此處
類型添加完成之后,先來實現save的功能。有上面的RDB格式可以看出,在保存真正的kv之前,必須先保存類型,這個動作是由rdbSaveObjectType完成的,在rdb.c中,實現如下:
/* Save the object type of object "o". */
int rdbSaveObjectType(rio *rdb, robj *o) {
switch (o->type) {
case OBJ_STRING:
return rdbSaveType(rdb,RDB_TYPE_STRING);
case OBJ_LIST:
if (o->encoding == OBJ_ENCODING_QUICKLIST)
return rdbSaveType(rdb,RDB_TYPE_LIST_QUICKLIST);
else
serverPanic("Unknown list encoding");
case OBJ_SET:
if (o->encoding == OBJ_ENCODING_INTSET)
return rdbSaveType(rdb,RDB_TYPE_SET_INTSET);
else if (o->encoding == OBJ_ENCODING_HT)
return rdbSaveType(rdb,RDB_TYPE_SET);
else
serverPanic("Unknown set encoding");
case OBJ_ZSET:
if (o->encoding == OBJ_ENCODING_ZIPLIST)
return rdbSaveType(rdb,RDB_TYPE_ZSET_ZIPLIST);
else if (o->encoding == OBJ_ENCODING_SKIPLIST)
return rdbSaveType(rdb,RDB_TYPE_ZSET_2);
else
serverPanic("Unknown sorted set encoding");
case OBJ_HASH:
if (o->encoding == OBJ_ENCODING_ZIPLIST)
return rdbSaveType(rdb,RDB_TYPE_HASH_ZIPLIST);
else if (o->encoding == OBJ_ENCODING_HT)
return rdbSaveType(rdb,RDB_TYPE_HASH);
else
serverPanic("Unknown hash encoding");
case OBJ_STREAM:
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
case OBJ_MODULE:
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
case OBJ_HELLO_TYPE:
return rdbSaveType(rdb,RDB_TYPE_HELLO_TYPE);// 添加保存自定義類型
default:
serverPanic("Unknown object type");
}
return -1; /* avoid warning */
}
接下來實現保存value部分,需要修改rdb.c中的rdbSaveObject函數,添加我們定義的數據結構,如下:
/* Save a Redis object. Returns -1 on error, number of bytes written on success. */
ssize_t rdbSaveObject(rio *rdb, robj *o) {
ssize_t n = 0, nwritten = 0;
if (o->type == OBJ_STRING) {
/* Save a string value */
if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
nwritten += n;
} else if (o->type == OBJ_LIST) {
} else if (o->type == OBJ_SET) {
} else if (o->type == OBJ_ZSET) {
} else if (o->type == OBJ_HASH) {
} else if (o->type == OBJ_STREAM) {
} else if (o->type == OBJ_MODULE) {
} else if (o->type == OBJ_HELLO_TYPE){
if(o->encoding == OBJ_ENCODING_RAW){
struct HelloTypeObject *hto = o->ptr;
struct HelloTypeNode *node = hto->head;
if ((n = rdbSaveLen(rdb,hto->len)) == -1) return -1;// 持久化鏈表長度
nwritten += n;// 每次都要更新nwritten,表示向RDB文件中寫入的字節數
while(node) {
if ((n = rdbSaveLen(rdb,node->value)) == -1) return -1;// 持久化節點值
nwritten += n;
node = node->next;
}
} else {
serverPanic("Unknown hellotype encoding");
}
} else {
serverPanic("Unknown object type");
}
return nwritten;
}
save完成之后開始實現load,其實就是save的相反過程,按照什么格式存進去的就按照什么格式讀出來,在rdb.c的rdbLoadObject函數中:
/* Load a Redis object of the specified type from the specified file.
* On success a newly allocated object is returned, otherwise NULL. */
robj *rdbLoadObject(int rdbtype, rio *rdb) {
robj *o = NULL, *ele, *dec;
uint64_t len;
unsigned int i;
if (rdbtype == RDB_TYPE_STRING) {
/* Read string value */
if ((o = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
o = tryObjectEncoding(o);
} else if (rdbtype == RDB_TYPE_LIST) {
} else if (rdbtype == RDB_TYPE_SET) {
} else if (rdbtype == RDB_TYPE_ZSET_2 || rdbtype == RDB_TYPE_ZSET) {
} else if (rdbtype == RDB_TYPE_HASH) {
} else if (rdbtype == RDB_TYPE_LIST_QUICKLIST) {
} else if (rdbtype == RDB_TYPE_HASH_ZIPMAP ||
rdbtype == RDB_TYPE_LIST_ZIPLIST ||
rdbtype == RDB_TYPE_SET_INTSET ||
rdbtype == RDB_TYPE_ZSET_ZIPLIST ||
rdbtype == RDB_TYPE_HASH_ZIPLIST)
{
} else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS) {
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
} else if (rdbtype == RDB_TYPE_HELLO_TYPE){
uint64_t len;
if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;// 加載鏈表長度
uint64_t elements = len;
robj *hto = createHelloTypeObject();
while(elements--) {
if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;// 加載鏈表值
int64_t ele = len;
HelloTypeInsert(hto->ptr,ele);// 插入鏈表
}
o = hto;
} else {
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
}
return o;
}
4、編寫aof rewrite方法
aof是redis的另一個持久化方法,由于aof需要rewrite機制來降低aof文件的大小,因此我們添加相應的代碼來讓redis可以正確的識別并rewrite我們自己的數據結構,入口在aof.c的rewriteAppendOnlyFileRio函數中:
int rewriteAppendOnlyFileRio(rio *aof) {
/* Save the key and associated value */
if (o->type == OBJ_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkObject(aof,o) == 0) goto werr;
} else if (o->type == OBJ_LIST) {
if (rewriteListObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_SET) {
if (rewriteSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_ZSET) {
if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_HASH) {
if (rewriteHashObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_STREAM) {
if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_MODULE) {
if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_HELLO_TYPE) {// 此處添加我們自己的數據結構
if (rewritreHelloTypeObject(aof,&key,o) == 0) goto werr;
} else {
serverPanic("Unknown object type");
}
}
同樣在aof中實現rewritreHelloTypeObject函數,其本質就是根據rewirte時刻aof中的數據構造等價的redis 命令:
int rewritreHelloTypeObject(rio *r, robj *key, robj *o){
struct HelloTypeObject *hto = o->ptr;
struct HelloTypeNode *node = hto->head;
while(node) {
/* Bulk count. */
if (rioWriteBulkCount(r,'*',3) == 0) return 0;
if (rioWriteBulkString(r,"HELLOTYPE.INSERT",sizeof "HELLOTYPE.INSERT") == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkLongLong(r,node->value) == 0) return 0;
node = node->next;
}
return 1;
}
上面的構造需要對redis協議有一定的理解,具體的可以參見這里:https://redis.io/topics/protocol
5、編寫數據結構內存使用統計方法
redis經常需要獲取數據結構當前堆內存的使用情況,該功能在object.c中objectComputeSize函數完成:
size_t objectComputeSize(robj *o, size_t sample_size) {
sds ele, ele2;
dict *d;
dictIterator *di;
struct dictEntry *de;
size_t asize = 0, elesize = 0, samples = 0;
if (o->type == OBJ_STRING) {
if(o->encoding == OBJ_ENCODING_INT) {
asize = sizeof(*o);
} else if(o->encoding == OBJ_ENCODING_RAW) {
asize = sdsAllocSize(o->ptr)+sizeof(*o);
} else if(o->encoding == OBJ_ENCODING_EMBSTR) {
asize = sdslen(o->ptr)+2+sizeof(*o);
} else {
serverPanic("Unknown string encoding");
}
} else if (o->type == OBJ_LIST) {
} else if (o->type == OBJ_SET) {
} else if (o->type == OBJ_ZSET) {
} else if (o->type == OBJ_HASH) {
} else if (o->type == OBJ_STREAM) {
} else if (o->type == OBJ_MODULE) {
} else if (o->type == OBJ_HELLO_TYPE){// 此處添加我們的實現
const struct HelloTypeObject *hto = o->ptr;
struct HelloTypeNode *node = hto->head;
asize = sizeof(*hto) + sizeof(*node)*hto->len; // 將頭節點和鏈表節點內存使用計算和并復制給asize
} else {
serverPanic("Unknown object type");
}
return asize;
}
6、實現命令
現在一切都準備就緒了,到了實現命令的時候了,命令的實現包括兩個方面,分別對應redis的通用命令和類型特有的命令。redis中鍵空間通用的命令有很多,比如DEL、DUMP、EXISTS、TYPE、SCAN等,此處我們以支持TYPE命令為例,該命令的實現在db.c的typeCommand函數中,它很簡單,只需要返回一個類型字符串就可以了。
void typeCommand(client *c) {
robj *o;
char *type;
o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
if (o == NULL) {
type = "none";
} else {
switch(o->type) {
case OBJ_STRING: type = "string"; break;
case OBJ_LIST: type = "list"; break;
case OBJ_SET: type = "set"; break;
case OBJ_ZSET: type = "zset"; break;
case OBJ_HASH: type = "hash"; break;
case OBJ_STREAM: type = "stream"; break;
case OBJ_MODULE: {
moduleValue *mv = o->ptr;
type = mv->type->name;
}; break;
case OBJ_HELLO_TYPE: type = "hello_type";break;// 這里添加我們自己的實現
default: type = "unknown"; break;
}
}
addReplyStatus(c,type);
}
接著我們開始實現類型特有的命令,也就是這些命令只能作用在我們定義的hellotype類型上,根據前文所述,這些命令都會實現在t_hellotype.c中,本實例我們一共會實現三個命令,分別為
HELLOTYPE.RANGE : 獲取指定個數的鏈表元素
HELLOTYPE.LEN :? 獲取鏈表當前長度
HELLOTYPE.INSERT :? ?向鏈表中摻入一個數據
三個命令分別對應三個處理函數,實現如下:
/* HELLOTYPE.INSERT key value */
void HelloTypeInsert_RedisCommand(client *c) {
robj *o =o = lookupKeyWrite(c->db,c->argv[1]);
if (o != NULL && checkType(c,o,OBJ_HELLO_TYPE)) return;
long long value;
if (!string2ll(c->argv[2]->ptr,sdslen(c->argv[2]->ptr), &value)) {
addReplyError(c,"invalid value: must be a signed 64 bit integer");
return;
}
/* Create an empty value object if the key is currently empty. */
struct HelloTypeObject *hto = NULL;
if (o == NULL) {
// 如果鍵不存在,就新建并添加到db中
o = createHelloTypeObject();
dbAdd(c->db,c->argv[1],o);
}
hto = o->ptr;
HelloTypeInsert(hto,value);// 執行鏈表插入
addReplyLongLong(c,hto->len);// 響應客戶端當前鏈表長度
return;
}
/* HELLOTYPE.RANGE key first count */
void HelloTypeRange_RedisCommand(client * c) {
void *replylen = NULL;
robj *o = lookupKeyWrite(c->db,c->argv[1]);
if (o != NULL && checkType(c,o,OBJ_HELLO_TYPE)) return;// 鍵類型檢測
long long first, count;
if (!string2ll(c->argv[2]->ptr,sdslen(c->argv[2]->ptr),&first) ||
!string2ll(c->argv[3]->ptr,sdslen(c->argv[3]->ptr),&count) ||
first < 0 || count < 0)
{
addReplyError(c,
"invalid first or count parameters");
return;
}
struct HelloTypeObject *hto = o ? o->ptr:NULL;
struct HelloTypeNode *node = hto ? hto->head : NULL;
replylen = addDeferredMultiBulkLength(c);// 注意這里需要特殊處理一下,因此實現無法知道鏈表節點的個數
long long arraylen = 0;
while(node && count--) {
addReplyLongLong(c,node->value);
arraylen++;
node = node->next;
}
setDeferredMultiBulkLength(c, replylen, arraylen);// 填充真正的長度
return ;
}
/* HELLOTYPE.LEN key */
void HelloTypeLen_RedisCommand(client * c) {
robj *o = lookupKeyWrite(c->db,c->argv[1]);
if (o != NULL && checkType(c,o,OBJ_HELLO_TYPE)) return;
struct HelloTypeObject *hto = o ? o->ptr:NULL;
addReplyLongLong(c,hto ? hto->len : 0);
return ;
}
命令實現完之后需要在server.h中進行聲明:
/* 聲明我們實現的命令 */
void htlenCommand(client * c);
void htrangeCommand(client * c);
void htinsertCommand(client *c);
聲明之后,進行最后一步,將命令寫入redisCommandTable中,至此redis才能識別我們新加入的命令并找到命令對應的處理函數,redisCommandTable定義在server.c中,顧名思義就是redisCommand類型數組,redisCommandTable定義如下:
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
char *sflags; /* Flags as string representation, one char per flag. */
int flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line.
* Used for Redis Cluster redirect. */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
long long microseconds, calls;
};
name: 命令名
proc: 指針函數,指向該命令對應的處理函數
arity: 參數個數(包括命令本身),當為-N時表示大于等于N個參數
sflags: 命令標志位字符串表示,碼表請參考下面
* w: write command (may modify the key space).
* r: read command ?(will never modify the key space).
* m: may increase memory usage once called. Don't allow if out of memory.
* a: admin command, like SAVE or SHUTDOWN.
* p: Pub/Sub related command.
* f: force replication of this command, regardless of server.dirty.
* s: command not allowed in scripts.
* R: random command. Command is not deterministic, that is, the same command
* ? ?with the same arguments, with the same key space, may have different
* ? ?results. For instance SPOP and RANDOMKEY are two random commands.
* S: Sort command output array if called from script, so that the output
* ? ?is deterministic.
* l: Allow command while loading the database.
* t: Allow command while a slave has stale data but is not allowed to
* ? ?server this data. Normally no command is accepted in this condition
* ? ?but just a few.
* M: Do not automatically propagate the command on MONITOR.
* k: Perform an implicit ASKING for this command, so the command will be
* ? ?accepted in cluster mode if the slot is marked as 'importing'.
* F: Fast command: O(1) or O(log(N)) command that should never delay
* ? ?its execution as long as the kernel scheduler is giving us time.
* ? ?Note that commands that may trigger a DEL as a side effect (like SET)
* ? ?are not fast commands.
flag: sflags的位掩碼,初始化全為0,在void populateCommandTable(void)方法中會進行初始化
getkeys_proc: 指針函數,通過此方法來指定key的位置
first_key_index: 第一個key的位置,為0時表示沒有key
last_key_index: 最后一個key的位置
key_step: key之間的間距
microseconds: 該命令的總調用時間,初始化都為0
calls: 該命令的總調用次數,初始化都為0
get_keys_proc和[first_key_index, last_key_index, key_step]都是指定key的位置,區別在于前者通過函數的方式返回一個int*來指定,后者則是通過指定第一個key值和最后一個key值,并告訴你key值之間的間隔step來表示。目前redis大部分的命令都是通過[first_key_index,last_key_index,key_step]來指定,因為大部分的命令的Key的位置都是有固定規律的。
最終我們的命令實現如下:
struct redisCommand redisCommandTable[] = {
{"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
......
/* 下面添加我們自己的命令 */
{"HELLOTYPE.LEN",htlenCommand,1,"r",0,NULL,1,1,1,0,0},
{"HELLOTYPE.INSERT",htinsertCommand,2,"m",0,NULL,1,1,1,0,0},
{"HELLOTYPE.RANGE",htrangeCommand,3,"r",0,NULL,1,1,1,0,0}
};
這里為了讓不了解redis命令執行過程的人有一個大致的了解,從網上找到一張圖,個人感覺畫的還不錯,我就不自己重新畫了:
7、編寫unit test
編寫unit test之前我們最好先用原生redis-cli測試一下我們新加的命令:
127.0.0.1:6379> HELLOTYPE.INSERT h1 1
(integer) 1
127.0.0.1:6379> HELLOTYPE.INSERT h1 2
(integer) 2
127.0.0.1:6379> HELLOTYPE.INSERT h1 3
(integer) 3
127.0.0.1:6379> HELLOTYPE.INSERT h1 4
(integer) 4
127.0.0.1:6379> HELLOTYPE.INSERT h1 5
(integer) 5
127.0.0.1:6379> HELLOTYPE.LEN h1
(integer) 5
127.0.0.1:6379> HELLOTYPE.RANGE h1 1 1
1) (integer) 1
127.0.0.1:6379> HELLOTYPE.RANGE h1 1 2
1) (integer) 1
2) (integer) 2
127.0.0.1:6379> HELLOTYPE.RANGE h1 1 4
1) (integer) 1
2) (integer) 2
3) (integer) 3
4) (integer) 4
一切正常之后,我們可以添加unit test,參照redis原生數據結構,我們在redis/tests/unit/type目錄下新加文件hellotype.tcl,并寫入如下內容:
start_server {tags {"hellotype"}} {
test {HELLOTYPE.INSERT key value} {
r del hellotype1
assert_equal 1 [r HELLOTYPE.INSERT hellotype1 1]
assert_equal 2 [r HELLOTYPE.INSERT hellotype1 2]
assert_equal 3 [r HELLOTYPE.INSERT hellotype1 3]
assert_equal 4 [r HELLOTYPE.INSERT hellotype1 4]
assert_equal 5 [r HELLOTYPE.INSERT hellotype1 5]
}
test { HELLOTYPE.LEN key } {
assert_equal 5 [r HELLOTYPE.LEN hellotype1 ]
}
test {HELLOTYPE.RANGE key start count } {
assert_equal 1 [r HELLOTYPE.RANGE hellotype1 1 1 ]
assert_equal {1 2} [r HELLOTYPE.RANGE hellotype1 1 2 ]
assert_equal {1 2 3 4 5} [r HELLOTYPE.RANGE hellotype1 1 5 ]
}
}
然后在test_helper.tcl加入unit/type/hellotype,執行make test就可以執行unit test了。
[ok]: HELLOTYPE.INSERT key value
[ok]: ?HELLOTYPE.LEN key
[ok]: HELLOTYPE.RANGE key start count
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的redis创建像mysql表结构_如何给redis添加新数据结构的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java的多线程实现方式_java 多线
- 下一篇: word自带公式编辑_怎样在word20