MIT6.824-lab4B-Sharded Key/Value Server(基于Raft的Shard KV数据库-分片KV存储器)
文章目錄
- 4B(Sharded Key/Value Server,分片KV存儲器)
- 任務(wù)
- 任務(wù)須知
- 代碼
- server
- 數(shù)據(jù)結(jié)構(gòu)
- 初始化代碼
- 主要邏輯
- 定時獲取config信息
- server_op(普通命令rpc接收處理)
- 數(shù)據(jù)結(jié)構(gòu)
- Get、PutAppend rpc處理
- server_shard(shard遷移相關(guān))
- 定時獲取shard信息
- 請求刪除shard數(shù)據(jù)
- RPC處理
- server_apply(applyCh處理)
- applyCh處理
- 客戶端普通命令處理
- 更新配置命令處理
- 遷入的shard數(shù)據(jù)保存命令
- 遷出的shard數(shù)據(jù)清除命令
- server_snapshot(快照處理)
- 讀取快照
- 生成快照
- Common
- 測試結(jié)果
所有資料:👉 https://github.com/1345414527/MIT6.824-2022
4B(Sharded Key/Value Server,分片KV存儲器)
每個 shardkv 服務(wù)器都作為副本組的一部分運行。每個副本組為某些鍵空間分片提供 Get、Put 和 Append 操作。在 client.go 中使用 key2shard() 來查找 key 屬于哪個 shard。多個副本組合作為完整的分片集提供服務(wù)。 shardctrler 服務(wù)的單個實例將分片分配給副本組;當(dāng)這個分配發(fā)生變化時,副本組必須相互傳遞分片,同時確保客戶端不會看到不一致的響應(yīng)。
我們的存儲系統(tǒng)必須為使用其客戶端接口的應(yīng)用程序提供可線性化的接口。也就是說,對 shardkv/client.go 中的 Clerk.Get()、Clerk.Put() 和 Clerk.Append() 方法的完整應(yīng)用程序調(diào)用必須以相同的順序影響所有副本。 Clerk.Get() 應(yīng)該看到最近的 Put/Append 寫入同一個鍵的值。即使 Gets 和 Puts 與配置更改幾乎同時到達,也必須如此。
這一部分我覺的是整個MIT6.824中最難的一部分,lab2雖然也很難,但是只要按照論文寫基本不會出現(xiàn)很大的問題,而這個部分如果沒有一些分布式數(shù)據(jù)庫源碼和設(shè)計基礎(chǔ)的,我覺得很難上手和理解。
任務(wù)
這里shardKV實現(xiàn)有很多種,我按我的理解和實現(xiàn)簡述的任務(wù):
- 接收客戶端發(fā)來的Get、PutAppend RPC并進行處理和調(diào)用Start;
- 定期通過shardctrler.Clerk獲取下一個config信息,config不能跳躍獲取并且要完成當(dāng)前config的新shard 數(shù)據(jù)遷移,才能調(diào)用Start達到多數(shù)派情況應(yīng)用該config;
- 完成在一個新配置中,shard的數(shù)據(jù)遷移。一個簡單的方案是:源節(jié)點保存所有configNum的未遷移完成shard數(shù)據(jù),目標(biāo)節(jié)點定期向新shard的源節(jié)點獲取數(shù)據(jù)(如果有新shard的話);
- 接收ApplyCh中的命令進行處理,命令分別為:安裝快照命令、客戶端Op命令、更新config命令、接收新shard數(shù)據(jù)、清除舊shard數(shù)據(jù);
- 完成兩個RPC:目標(biāo)節(jié)點向源節(jié)點發(fā)送的獲取shard數(shù)據(jù)RPC,目標(biāo)節(jié)點向源節(jié)點發(fā)送的清除shard數(shù)據(jù)RPC。
任務(wù)須知
-
shardKV與shardctrler之間的通信是要通過每一個shardKV內(nèi)部的client來進行;不同group的shardKV通過自建RPC來進行通信;同一個group的shardKV通過start來進行通信并達到一致;
-
每一個shardkv不應(yīng)調(diào)用分片控制器的 Join() 處理程序。測試代碼將在適當(dāng)?shù)臅r候調(diào)用 Join();
-
Shardkv將需要定期調(diào)用Query()輪詢 shardctrler 以了解新配置。測試預(yù)計我們的代碼大約每 100 毫秒輪詢一次;更頻繁是可以的,但少得多可能會導(dǎo)致問題;
-
ShardKV定期從 shardctrler 獲取最新配置,并添加代碼以在接收組不負(fù)責(zé)客戶端key的分片時拒絕客戶端請求;
-
在配置更改期間,一對組可能需要在它們之間雙向移動分片。如果我們看到死鎖,這是一個可能的來源。
-
配置更改可能會涉及部分shard的重新分配,在新配置中,目標(biāo)節(jié)點會請求源節(jié)點獲取該shard的數(shù)據(jù),而在請求并完成數(shù)據(jù)的導(dǎo)入這個過程中,就涉及到了數(shù)據(jù)的一致性問題,一個簡單的方法就是:在這個過程中,目標(biāo)結(jié)果不會處理涉及尚未遷移完成的新shard的命令。雖然概念上很簡單,但這種方法在生產(chǎn)級系統(tǒng)中是不可行的,每當(dāng)機器被帶入或取出時,它都會導(dǎo)致所有客戶端長時間停頓,最好繼續(xù)為不受正在進行的配置更改影響的分片提供服務(wù),這種方法其實我也有一個想法:請求目標(biāo)節(jié)點,如果發(fā)現(xiàn)該shard還沒有遷移完成,就返回源節(jié)點group+一個重定向Move標(biāo)識,使得客戶端可以帶上標(biāo)識請求源節(jié)點,源節(jié)點如果該shard還沒有進行遷移,則進行運行并保存,等目標(biāo)節(jié)點進行遷移時,就會獲取已經(jīng)執(zhí)行好命令的數(shù)據(jù),這樣子的話實現(xiàn)就會很復(fù)雜了;
-
configNum其實類似于Redis中的configEpoch,因為網(wǎng)絡(luò)中因為一致性的問題,不可能在同一個時間點所有的節(jié)點的configNum都是一致的,因為不同的configNum,同一個shard可能會分配給不同得節(jié)點,因此以configNum為依據(jù),可以找到指定的shard處理節(jié)點;
-
這里遇到一個很坑的點,在這個lab中,每當(dāng)調(diào)用readPersister讀取快照時,因為要對Raft的快照狀態(tài)進行修改,因此就必須調(diào)用CondInstallSnapshot函數(shù),但是這樣就會導(dǎo)致測試一直阻塞,就算該函數(shù)為空也沒辦法通過,只能不調(diào)用該函數(shù),將CondInstallSnapshot的邏輯寫到InstallSnapshot RPC的處理代碼中才能通過測試,不知道什么問題,太離譜了。(我在寫kvraft這個lab的時候,readPersister的實現(xiàn)基本相同,但是那個調(diào)用CondInstallSnapshot可以,在這個lab中就是不行)
而且在本lab中,我發(fā)現(xiàn)頻繁的上鎖和解鎖不僅會導(dǎo)致運行變慢,還會出現(xiàn)很多奇怪的現(xiàn)象,但絕不是死鎖的問題。(待解決)
-
Config中存儲的Groups是每一個gid下的所有servername,通過servername可以獲取對應(yīng)的endpoint從而進行不同group間的通信:
func(servername string) *labrpc.ClientEnd {name := randstring(20)end := cfg.net.MakeEnd(name)cfg.net.Connect(name, servername)cfg.net.Enable(name, true)return end } -
在進行數(shù)據(jù)遷移時,源節(jié)點可以向目標(biāo)節(jié)點的任何一個節(jié)點獲取要遷移的shard數(shù)據(jù),因為這部分?jǐn)?shù)據(jù)都是一致的,而不用特定的從leader獲取,減少網(wǎng)絡(luò)中的請求包。
代碼
這部分主要代碼分為如下部分:
- Server:shardkv的主要數(shù)據(jù)結(jié)構(gòu)定義、初始化以及主要邏輯,主要邏輯其實就是生成三個協(xié)程分別進行:處理applyCh、定期獲取config、定期獲取待遷入shard數(shù)據(jù);
- Server_op:針對客戶端發(fā)來的Get、PutAppend RPC進行接收、處理、返回;
- Server_shard:①定期查看待遷入shard,有的話就向源group(主、從節(jié)點都行)發(fā)送RPC獲取數(shù)據(jù)并Start進行提交、同步和應(yīng)用;②當(dāng)節(jié)點某個shard遷入完成后,向源group(主節(jié)點)發(fā)送RPC請求刪除保存的shard數(shù)據(jù);③處理目標(biāo)節(jié)點請求獲取某個shard數(shù)據(jù)的RPC、處理目標(biāo)節(jié)點請求刪除某個shard數(shù)據(jù)的RPC;
- Server_apply:主要邏輯就是一個for循環(huán),獲取applyCh的命令并進行應(yīng)用。針對五種類別命令分別進行處理:①快照命令;②客戶端普通命令;③更新配置命令;④遷入的shard數(shù)據(jù)保存命令;⑤遷出的shard數(shù)據(jù)清除命令;
- Server_snapshot:和kvraft的快照類似,readPersist讀取快照、saveSnapshot保存快照;
- Common:主要是一些RPC參數(shù)和回復(fù)結(jié)構(gòu)定義和回復(fù)狀態(tài)碼定義;
- Client:自帶的client客戶端,幾乎不需要進行代碼修改,修改的幾處:結(jié)構(gòu)體定義加上用于實現(xiàn)冪等性的clinetId、初始化賦值clientId、請求RPC前定義好args
基本就這些吧,在下面一些簡單的功能性函數(shù)我就不貼出來了。
server
數(shù)據(jù)結(jié)構(gòu)
const (PullConfigInterval = time.Millisecond * 100PullShardsInterval = time.Millisecond * 200WaitCmdTimeOut = time.Millisecond * 500CallPeerFetchShardDataTimeOut = time.Millisecond * 500CallPeerCleanShardDataTimeOut = time.Millisecond * 500MaxLockTime = time.Millisecond * 10 // debug )type ShardKV struct {mu sync.Mutexme intrf *raft.RaftapplyCh chan raft.ApplyMsgmake_end func(string) *labrpc.ClientEndgid intctrlers []*labrpc.ClientEndmaxraftstate int // snapshot if log grows this big// Your definitions here.stopCh chan struct{}commandNotifyCh map[int64]chan CommandResult //用于命令apply后的喚醒lastApplies [shardctrler.NShards]map[int64]int64 //k-v:ClientId-CommandIdconfig shardctrler.Config //記錄當(dāng)前的configoldConfig shardctrler.Config //保存上一個config,進行shard遷移時,目標(biāo)節(jié)點根據(jù)這個config來獲取源節(jié)點,從而獲取shard數(shù)據(jù)和請求清除shard數(shù)據(jù)meShards map[int]bool //記錄自己分配到的sharddata [shardctrler.NShards]map[string]stringinputShards map[int]bool //當(dāng)前這個config相較于上一個config新指派的shard,只有input為空了才能更新下一個configoutputShards map[int]map[int]MergeShardData // configNum -> shard -> data。當(dāng)某一個config,當(dāng)前節(jié)點的shard移除,則記錄當(dāng)前config的所有移除shard的mergeShardData//cleanOutputDataNotifyCh map[string]chan struct{} //用來通知等待協(xié)程clean完成scc *shardctrler.Clerk //保存一個shardctrler的客戶端,因為要向shardctrler發(fā)送query獲取配置信息//持久化persister *raft.Persister//定時任務(wù)計時器pullConfigTimer *time.Timer //定期獲取configpullShardsTimer *time.Timer //定期檢查inputShard并請求數(shù)據(jù)//用于互斥鎖lockStartTime time.TimelockEndTime time.TimelockMsg string }沒什么好介紹的。。。
初始化代碼
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int, gid int, ctrlers []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *ShardKV {// call labgob.Register on structures you want// Go's RPC library to marshall/unmarshall.labgob.Register(Op{})kv := new(ShardKV)kv.me = mekv.maxraftstate = maxraftstatekv.make_end = make_endkv.gid = gidkv.ctrlers = ctrlers// Your initialization code here.kv.persister = persisterkv.scc = shardctrler.MakeClerk(kv.ctrlers)// Use something like this to talk to the shardctrler:// kv.mck = shardctrler.MakeClerk(kv.ctrlers)kv.applyCh = make(chan raft.ApplyMsg)kv.stopCh = make(chan struct{})kv.rf = raft.Make(servers, me, persister, kv.applyCh)//初始化自身數(shù)據(jù)kv.data = [shardctrler.NShards]map[string]string{}for i, _ := range kv.data {kv.data[i] = make(map[string]string)}kv.lastApplies = [shardctrler.NShards]map[int64]int64{}for i, _ := range kv.lastApplies {kv.lastApplies[i] = make(map[int64]int64)}kv.inputShards = make(map[int]bool)kv.outputShards = make(map[int]map[int]MergeShardData)//kv.cleanOutputDataNotifyCh = make(map[string]chan struct{})config := shardctrler.Config{Num: 0,Shards: [shardctrler.NShards]int{},Groups: map[int][]string{},}kv.config = configkv.oldConfig = config//讀取快照內(nèi)容kv.readPersist(true, 0, 0, kv.persister.ReadSnapshot())kv.commandNotifyCh = make(map[int64]chan CommandResult)//設(shè)置定時器kv.pullConfigTimer = time.NewTimer(PullConfigInterval)kv.pullShardsTimer = time.NewTimer(PullShardsInterval)kv.ticker()return kv }主要邏輯
主要邏輯就是三個協(xié)程的處理,后面會依次介紹:
func (kv *ShardKV) ticker() {//處理applyChgo kv.handleApplyCh()//定時獲取config信息go kv.pullConfig()//定時獲取input shard(如果有的話)go kv.fetchShards() }定時獲取config信息
func (kv *ShardKV) pullConfig() {for {select {case <-kv.stopCh:returncase <-kv.pullConfigTimer.C://只有l(wèi)eader才能獲取_, isLeader := kv.rf.GetState()if !isLeader {kv.pullConfigTimer.Reset(PullConfigInterval)break}kv.lock("pullconfig")lastNum := kv.config.Numkv.log("pull config,last: %d", lastNum)kv.unlock("pullconfig")config := kv.scc.Query(lastNum + 1)if config.Num == lastNum+1 {//找到新的configkv.log("pull config,new config:%+v", config)kv.lock("pullconfig")//這一個判斷很關(guān)鍵,必須當(dāng)前shard全部遷移完成才能獲取下一個configif len(kv.inputShards) == 0 && kv.config.Num+1 == config.Num {kv.log("pull config,start config:%+v", config)kv.unlock("pullconfig")//請求該命令kv.rf.Start(config.Copy())} else {kv.unlock("pullconfig")}}kv.pullConfigTimer.Reset(PullConfigInterval)}} }這一部分就是定時從shardctrler獲取config的邏輯,注意幾點:
- 只有l(wèi)eader才能向shardctrler請求查詢config;
- config不能跳躍獲取,只能依照順序獲取;
- 當(dāng)前config如果沒有穩(wěn)定,即當(dāng)前config還有shard數(shù)據(jù)沒有遷移完成,就不能獲取該config;
- 同一個group的config同步借助raft實現(xiàn),調(diào)用Start來進行同步和應(yīng)用,leader沒必要阻塞等待Start函數(shù)的結(jié)果。
server_op(普通命令rpc接收處理)
數(shù)據(jù)結(jié)構(gòu)
type Op struct {// Your definitions here.// Field names must start with capital letters,// otherwise RPC will break.ReqId int64 //用來標(biāo)識commandNotifyCommandId int64ClientId int64Key stringValue stringMethod stringConfigNum int }type CommandResult struct {Err ErrValue string }Op和CommandResult和之前l(fā)ab中的變化不是很大。
Op中的ConfigNum是用來記錄客戶端請求時args中的configNum,當(dāng)Op從applyCh中獲取并應(yīng)用時,只有configNum和當(dāng)前節(jié)點的configNum相同,才能執(zhí)行。
Get、PutAppend rpc處理
這一部分和kvraft區(qū)別不大:
func (kv *ShardKV) Get(args *GetArgs, reply *GetReply) {// Your code here.res := kv.waitCommand(args.ClientId, args.CommandId, "Get", args.Key, "", args.ConfigNum)reply.Err = res.Errreply.Value = res.Value }func (kv *ShardKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {// Your code here.res := kv.waitCommand(args.ClientId, args.CommandId, args.Op, args.Key, args.Value, args.ConfigNum)reply.Err = res.Err }func (kv *ShardKV) waitCommand(clientId int64, commandId int64, method, key, value string, configNum int) (res CommandResult) {kv.log("wait cmd start,clientId:%d,commandId: %d,method: %s,key-value:%s %s,configNum %d", clientId, commandId, method, key, value, configNum)op := Op{ReqId: nrand(),ClientId: clientId,CommandId: commandId,Method: method,Key: key,ConfigNum: configNum,Value: value,}index, term, isLeader := kv.rf.Start(op)if !isLeader {res.Err = ErrWrongLeaderkv.log("wait cmd NOT LEADER.")return}kv.lock("waitCommand")ch := make(chan CommandResult, 1)kv.commandNotifyCh[op.ReqId] = chkv.unlock("waitCommand")kv.log("wait cmd notify,index: %v,term: %v,op: %+v", index, term, op)t := time.NewTimer(WaitCmdTimeOut)defer t.Stop()select {case <-t.C:res.Err = ErrTimeOutcase res = <-ch:case <-kv.stopCh:res.Err = ErrServer}kv.removeCh(op.ReqId)kv.log("wait cmd end,Op: %+v.res:%+v", op, res)return}waitCommand主要的處理步驟:
- 根據(jù)命令信息封裝一個Op命令;
- 調(diào)用Start提交該命令;
- 創(chuàng)建一個用于處理該命令的喚醒ch;
- 阻塞等待ch的返回,不管是哪個ch返回,都要刪除前一步創(chuàng)建的ch,防止內(nèi)存泄漏。
server_shard(shard遷移相關(guān))
定時獲取shard信息
//定時獲取shard func (kv *ShardKV) fetchShards() {for {select {case <-kv.stopCh:returncase <-kv.pullShardsTimer.C://判斷是否有要input的shard_, isLeader := kv.rf.GetState()if isLeader {kv.lock("pullshards")for shardId, _ := range kv.inputShards {//注意要從上一個config中請求shard的源節(jié)點go kv.fetchShard(shardId, kv.oldConfig)}kv.unlock("pullshards")}kv.pullShardsTimer.Reset(PullShardsInterval)}} }主要就是主節(jié)點檢查inputShards,如果有還沒有遷移完成的shard,就調(diào)用fetchShard函數(shù)進行遷移:
//獲取指定的shard func (kv *ShardKV) fetchShard(shardId int, config shardctrler.Config) {args := FetchShardDataArgs{ConfigNum: config.Num,ShardNum: shardId,}t := time.NewTimer(CallPeerFetchShardDataTimeOut)defer t.Stop()for {//依次請求group中的每個節(jié)點,但只要獲取一個就好了for _, s := range config.Groups[config.Shards[shardId]] {reply := FetchShardDataReply{}srv := kv.make_end(s)done := make(chan bool, 1)go func(args *FetchShardDataArgs, reply *FetchShardDataReply) {done <- srv.Call("ShardKV.FetchShardData", args, reply)}(&args, &reply)t.Reset(CallPeerFetchShardDataTimeOut)select {case <-kv.stopCh:returncase <-t.C:case isDone := <-done:if isDone && reply.Success == true {kv.lock("pullShard")if _, ok := kv.inputShards[shardId]; ok && kv.config.Num == config.Num+1 {replyCopy := reply.Copy()mergeShardData := MergeShardData{ConfigNum: args.ConfigNum,ShardNum: args.ShardNum,Data: replyCopy.Data,CommandIndexes: replyCopy.CommandIndexes,}kv.log("pullShard get data:%+v", mergeShardData)kv.unlock("pullShard")kv.rf.Start(mergeShardData)//不管是不是leader都返回return} else {kv.unlock("pullshard")}}}}}}因為新shard所在的源group每一個節(jié)點都有相同的數(shù)據(jù),因此這里可以依次請求每一個節(jié)點,只要從一個節(jié)點得到了數(shù)據(jù)(不一定是leader節(jié)點),就可以結(jié)束請求了。
而獲取新shard的數(shù)據(jù)也需要調(diào)用Start通過raft來進行同步,這里并不需要阻塞等待Start的結(jié)果,因為沒必要。
調(diào)用的RPC是ShardKV.FetchShardData,后面會介紹。
請求刪除shard數(shù)據(jù)
當(dāng)進行shard遷移時,目標(biāo)節(jié)點如果已經(jīng)導(dǎo)入了該shard的數(shù)據(jù),就可以調(diào)用該函數(shù)請求源節(jié)點刪除該shard的數(shù)據(jù)了,因為遷移完成后,源節(jié)點沒必要保存舊的shard數(shù)據(jù)了。
//發(fā)送給shard源節(jié)點,可以刪除shard數(shù)據(jù)了 //一般在apply command中處理好input的shard,發(fā)送給源節(jié)點刪除保存的shard數(shù)據(jù) func (kv *ShardKV) callPeerCleanShardData(config shardctrler.Config, shardId int) {args := CleanShardDataArgs{ConfigNum: config.Num,ShardNum: shardId,}t := time.NewTimer(CallPeerCleanShardDataTimeOut)defer t.Stop()for {//因為并不知道哪一個節(jié)點是leader,因此群發(fā)吧for _, group := range config.Groups[config.Shards[shardId]] {reply := CleanShardDataReply{}srv := kv.make_end(group)done := make(chan bool, 1)go func(args *CleanShardDataArgs, reply *CleanShardDataReply) {done <- srv.Call("ShardKV.CleanShardData", args, reply)}(&args, &reply)t.Reset(CallPeerCleanShardDataTimeOut)select {case <-kv.stopCh:returncase <-t.C:case isDone := <-done:if isDone && reply.Success == true {return}}}kv.lock("callPeerCleanShardData")if kv.config.Num != config.Num+1 || len(kv.inputShards) == 0 {kv.unlock("callPeerCleanShardData")break}kv.unlock("callPeerCleanShardData")} }會輪詢源group的所有節(jié)點發(fā)送RPC請求,因為并不知道哪一個節(jié)點時主節(jié)點,只要某一個節(jié)點返回成功,就可以了。
調(diào)用的RPC是ShardKV.CleanShardData,后面會介紹。
RPC處理
主要是兩種RPC的處理:
- FetchShardData:根據(jù)configNum和shardId獲取數(shù)據(jù);
- CleanShardData:根據(jù)configNum和shardId刪除記錄的數(shù)據(jù)。
- 如果請求的configNum不是歷史的configNum,則直接返回;
- 如果outputShards沒有請求的數(shù)據(jù),則可能是:①該shard沒有進行遷移;②該shard已經(jīng)遷移完成,并接收到目標(biāo)節(jié)點的CleanShardData RPC刪除了;
- 否則,就可以將shard中的數(shù)據(jù)和用于冪等性的數(shù)據(jù)結(jié)構(gòu)(記錄每一個客戶端的最后一個應(yīng)用命令)發(fā)送給請求方。
- 如果請求的configNum不是歷史的configNum,則直接返回;
- 調(diào)用Start同步該命令,如果該節(jié)點不是leader直接返回;(這里要同步,是因為所有節(jié)點都有這一份數(shù)據(jù))
- 接下來有兩種方法:①復(fù)雜的方法就是借助ch,當(dāng)命令應(yīng)用完成后,通知該阻塞協(xié)程,但這樣子寫最終測試的結(jié)果時間會長一些;②簡單的進行多次循環(huán),如果在循環(huán)中,發(fā)現(xiàn)數(shù)據(jù)被刪除了,就可以返回成功了。這樣子看似不嚴(yán)謹(jǐn),其實是可行的,因為就算本次成功刪除了但RPC沒有成功返回,下一次就可以成功返回了。
server_apply(applyCh處理)
applyCh處理
func (kv *ShardKV) handleApplyCh() {for {select {case <-kv.stopCh:kv.log("get from stopCh,server-%v stop!", kv.me)returncase cmd := <-kv.applyCh://處理快照命令,讀取快照的內(nèi)容if cmd.SnapshotValid {kv.log("%v get install sn,%v %v", kv.me, cmd.SnapshotIndex, cmd.SnapshotTerm)kv.lock("waitApplyCh_sn")kv.readPersist(false, cmd.SnapshotTerm, cmd.SnapshotIndex, cmd.Snapshot)kv.unlock("waitApplyCh_sn")continue}//處理普通命令if !cmd.CommandValid {continue}cmdIdx := cmd.CommandIndex//處理不同的命令if op, ok := cmd.Command.(Op); ok {kv.handleOpCommand(cmdIdx, op)} else if config, ok := cmd.Command.(shardctrler.Config); ok {kv.handleConfigCommand(cmdIdx, config)} else if mergeData, ok := cmd.Command.(MergeShardData); ok {kv.handleMergeShardDataCommand(cmdIdx, mergeData)} else if cleanData, ok := cmd.Command.(CleanShardDataArgs); ok {kv.handleCleanShardDataCommand(cmdIdx, cleanData)} else {panic("apply command,NOT FOUND COMMDN!")}}}}這個函數(shù)和kvraft、shardctrler并沒有多大的不同,處理邏輯是在一個for循環(huán)中,從applyCh中會獲取五種命令:①快照命令;②客戶端普通命令;③更新配置命令;④遷入的shard數(shù)據(jù)保存命令;⑤遷出的shard數(shù)據(jù)清除命令。
其實也就分別是調(diào)用五種函數(shù)來進行處理,會依次進行介紹。
客戶端普通命令處理
//處理get、put、append命令 func (kv *ShardKV) handleOpCommand(cmdIdx int, op Op) {kv.log("start apply command %v:%+v", cmdIdx, op)kv.lock("handleApplyCh")defer kv.unlock("handleApplyCh")shardId := key2shard(op.Key)//判斷能夠執(zhí)行該命令if err := kv.ProcessKeyReady(op.ConfigNum, op.Key); err != OK {kv.notifyWaitCommand(op.ReqId, err, "")return}if op.Method == "Get" {//處理讀e, v := kv.getValueByKey(op.Key)kv.notifyWaitCommand(op.ReqId, e, v)} else if op.Method == "Put" || op.Method == "Append" {//處理寫//判斷命令是否重復(fù)isRepeated := falseif v, ok := kv.lastApplies[shardId][op.ClientId]; ok {if v == op.CommandId {isRepeated = true}}if !isRepeated {switch op.Method {case "Put":kv.data[shardId][op.Key] = op.Valuekv.lastApplies[shardId][op.ClientId] = op.CommandIdcase "Append":e, v := kv.getValueByKey(op.Key)if e == ErrNoKey {//按put處理kv.data[shardId][op.Key] = op.Valuekv.lastApplies[shardId][op.ClientId] = op.CommandId} else {//追加kv.data[shardId][op.Key] = v + op.Valuekv.lastApplies[shardId][op.ClientId] = op.CommandId}default:panic("unknown method " + op.Method)}}//命令處理成功kv.notifyWaitCommand(op.ReqId, OK, "")} else {panic("unknown method " + op.Method)}kv.log("apply op: cmdId:%d, op: %+v, data:%v", cmdIdx, op, kv.data[shardId][op.Key])//每應(yīng)用一條命令,就判斷是否進行持久化kv.saveSnapshot(cmdIdx) }- 如果是Get操作,簡單的根據(jù)key獲取value,并喚醒等待的協(xié)程;
- 如果是Put、Append操作,先要判斷命令是否重復(fù),如果不重復(fù),表明可以執(zhí)行,因此根據(jù)Put和Append分別進行處理。這里要注意的一點就是Append命令中如果key不存在,就是一個Put操作。最后喚醒等待的協(xié)程。
- 最后嘗試進行一次持久化(滿足一定條件才能持久化)。
這里主要下ProcessKeyReady函數(shù):
//判斷能否執(zhí)行客戶端發(fā)來的命令 func (kv *ShardKV) ProcessKeyReady(configNum int, key string) Err {//config不對if configNum == 0 || configNum != kv.config.Num {kv.log("process key ready err config.")return ErrWrongGroup}shardId := key2shard(key)//沒有分配該shardif _, ok := kv.meShards[shardId]; !ok {kv.log("process key ready err shard.")return ErrWrongGroup}//正在遷移,這里有優(yōu)化的空間,如果沒有遷移完成,可以直接請求目標(biāo)節(jié)點完成操作并返回,但是這樣就太復(fù)雜了,這里簡略了if _, ok := kv.inputShards[shardId]; ok {kv.log("process key ready err waitShard.")return ErrWrongGroup}return OK }只要不滿足任意一個條件,都不能執(zhí)行客戶端發(fā)來的普通命令。
更新配置命令處理
//處理config命令,即更新config //主要是處理meshard、inputshard、outputshard func (kv *ShardKV) handleConfigCommand(cmdIdx int, config shardctrler.Config) {kv.log("start handle config %v:%+v", cmdIdx, config)kv.lock("handleApplyCh")defer kv.unlock("handleApplyCh")if config.Num <= kv.config.Num {kv.saveSnapshot(cmdIdx)return}if config.Num != kv.config.Num+1 {panic("applyConfig err")}oldConfig := kv.config.Copy()outputShards := make([]int, 0, shardctrler.NShards)inputShards := make([]int, 0, shardctrler.NShards)meShards := make([]int, 0, shardctrler.NShards)for i := 0; i < shardctrler.NShards; i++ {if config.Shards[i] == kv.gid {meShards = append(meShards, i)if oldConfig.Shards[i] != kv.gid {inputShards = append(inputShards, i)}} else {if oldConfig.Shards[i] == kv.gid {outputShards = append(outputShards, i)}}}//處理當(dāng)前的shardkv.meShards = make(map[int]bool)for _, shardId := range meShards {kv.meShards[shardId] = true}//處理移出的shard//保存當(dāng)前所處配置的所有移除的shard數(shù)據(jù)d := make(map[int]MergeShardData)for _, shardId := range outputShards {mergeShardData := MergeShardData{ConfigNum: oldConfig.Num,ShardNum: shardId,Data: kv.data[shardId],CommandIndexes: kv.lastApplies[shardId],}d[shardId] = mergeShardData//初始化數(shù)據(jù)kv.data[shardId] = make(map[string]string)kv.lastApplies[shardId] = make(map[int64]int64)}kv.outputShards[oldConfig.Num] = d//處理移入的shardkv.inputShards = make(map[int]bool)if oldConfig.Num != 0 {for _, shardId := range inputShards {kv.inputShards[shardId] = true}}kv.config = configkv.oldConfig = oldConfigkv.log("apply op: cmdId:%d, config:%+v", cmdIdx, config)kv.saveSnapshot(cmdIdx) }主要邏輯:
-
如果configNum小于等于當(dāng)前配置的configNum,直接返回;
-
如果configNum不是當(dāng)前配置的configNum+1,報錯,理論上是不會出現(xiàn)報錯的,因為在leader query到新的config后會進行判斷的;
-
根據(jù)新的config的shard信息更新三個數(shù)據(jù)結(jié)構(gòu):
①meshard:當(dāng)前config負(fù)責(zé)的shard;
②outputShard:當(dāng)前config相比于上一個config不再負(fù)責(zé)的shard信息,主要用于供遷移的目標(biāo)節(jié)點獲取shard數(shù)據(jù);
③inputShard:當(dāng)前config相比于上一個config新負(fù)責(zé)的shard。
后兩個數(shù)據(jù)結(jié)構(gòu)主要用于shard數(shù)據(jù)遷移。
-
保存olgConfig,并嘗試進行一次快照。
遷入的shard數(shù)據(jù)保存命令
//處理新的shard數(shù)據(jù),即input shard func (kv *ShardKV) handleMergeShardDataCommand(cmdIdx int, data MergeShardData) {kv.log("start merge Shard Data %v:%+v", cmdIdx, data)kv.lock("handleApplyCh")defer kv.unlock("handleApplyCh")if kv.config.Num != data.ConfigNum+1 {return}if _, ok := kv.inputShards[data.ShardNum]; !ok {return}kv.data[data.ShardNum] = make(map[string]string)kv.lastApplies[data.ShardNum] = make(map[int64]int64)for k, v := range data.Data {kv.data[data.ShardNum][k] = v}for k, v := range data.CommandIndexes {kv.lastApplies[data.ShardNum][k] = v}delete(kv.inputShards, data.ShardNum)kv.log("apply op: cmdId:%d, mergeShardData:%+v", cmdIdx, data)kv.saveSnapshot(cmdIdx)go kv.callPeerCleanShardData(kv.oldConfig, data.ShardNum) }- 如果待遷移的shard數(shù)據(jù)不是來自上一個config,直接返回;
- 如果當(dāng)前shard數(shù)據(jù)已經(jīng)完成了遷移,直接返回;
- 保存該shard的數(shù)據(jù),以及該shard的用于處理冪等性的數(shù)據(jù)結(jié)構(gòu)CommandIndexes(保存該shard的每個客戶端的最后一條apply的命令);
- 在inputShards中刪除待遷移的shard;
- 嘗試進行一次快照;
- 調(diào)用callPeerCleanShardData請求刪除源節(jié)點保存的該shard數(shù)據(jù)。
遷出的shard數(shù)據(jù)清除命令
//處理已經(jīng)遷移走的shard,即output shard func (kv *ShardKV) handleCleanShardDataCommand(cmdIdx int, data CleanShardDataArgs) {kv.log("start clean shard data %v:%+v", cmdIdx, data)kv.lock("handleApplyCh")defer kv.unlock("handleApplyCh")//如果要清除的shard確實是在outputShard中,且沒有被清除,則需要清除if kv.OutputDataExist(data.ConfigNum, data.ShardNum) {delete(kv.outputShards[data.ConfigNum], data.ShardNum)}//通知等待協(xié)程//if ch, ok := kv.cleanOutputDataNotifyCh[fmt.Sprintf("%d%d", data.ConfigNum, data.ShardNum)]; ok {// ch <- struct{}{}//}kv.saveSnapshot(cmdIdx) }當(dāng)shard數(shù)據(jù)遷移中,目標(biāo)節(jié)點在handleMergeShardDataCommand中完成了數(shù)據(jù)遷移,會調(diào)用callPeerCleanShardData向源節(jié)點發(fā)送RPC請求清除該shard的數(shù)據(jù),源節(jié)點接收到該RPC,就會提交同步一條命令,而這條命令的處理就是handleCleanShardDataCommand。
主要就是從outputShards中清除某個config-shard的數(shù)據(jù)。
server_snapshot(快照處理)
讀取快照
//讀取快照 //兩處調(diào)用:初始化階段;收到Snapshot命令,即接收了leader的Snapshot func (kv *ShardKV) readPersist(isInit bool, snapshotTerm, snapshotIndex int, data []byte) {if data == nil || len(data) < 1 {return}//只要不是初始化調(diào)用,即如果收到一個Snapshot命令,就要執(zhí)行該函數(shù)//不知道為什么,只要在ShardKV中調(diào)用該函數(shù),就會導(dǎo)致測試一直阻塞,就算該函數(shù)為空也沒辦法通過,只能注釋掉,將CondInstallSnapshot的邏輯寫到Raft中的InstallSnapshot RPC的處理代碼中//if !isInit {// res := kv.rf.CondInstallSnapshot(snapshotTerm, snapshotIndex, data)// if !res {// log.Panicln("kv read persist err in CondInstallSnapshot!")// return// }//}//對數(shù)據(jù)進行同步r := bytes.NewBuffer(data)d := labgob.NewDecoder(r)var kvData [shardctrler.NShards]map[string]stringvar lastApplies [shardctrler.NShards]map[int64]int64var inputShards map[int]boolvar outputShards map[int]map[int]MergeShardDatavar config shardctrler.Configvar oldConfig shardctrler.Configvar meShards map[int]boolif d.Decode(&kvData) != nil ||d.Decode(&lastApplies) != nil ||d.Decode(&inputShards) != nil ||d.Decode(&outputShards) != nil ||d.Decode(&config) != nil ||d.Decode(&oldConfig) != nil ||d.Decode(&meShards) != nil {log.Fatal("kv read persist err")} else {kv.data = kvDatakv.lastApplies = lastApplieskv.inputShards = inputShardskv.outputShards = outputShardskv.config = configkv.oldConfig = oldConfigkv.meShards = meShards} }沒什么好說的,只是有一點很離譜,只要我調(diào)用了CondInstallSnapshot函數(shù),TestConcurrent3就會導(dǎo)致測試一直阻塞,就算該函數(shù)為空也沒辦法通過,只能注釋掉,將CondInstallSnapshot的邏輯寫到Raft中的InstallSnapshot RPC的處理代碼中。
生成快照
//保存快照 func (kv *ShardKV) saveSnapshot(logIndex int) {//判斷條件,滿足一定的日志量才能進行持久化if kv.maxraftstate == -1 || kv.persister.RaftStateSize() < kv.maxraftstate {return}//生成快照數(shù)據(jù)w := new(bytes.Buffer)e := labgob.NewEncoder(w)if e.Encode(kv.data) != nil ||e.Encode(kv.lastApplies) != nil ||e.Encode(kv.inputShards) != nil ||e.Encode(kv.outputShards) != nil ||e.Encode(kv.config) != nil ||e.Encode(kv.oldConfig) != nil ||e.Encode(kv.meShards) != nil {panic("gen snapshot data encode err")}data := w.Bytes()kv.rf.Snapshot(logIndex, data) }Common
這部分代碼沒什么好說的,主要是一些RPC參數(shù)和回復(fù)結(jié)構(gòu)定義和回復(fù)狀態(tài)碼定義,
//回復(fù)狀態(tài)碼 const (OK = "OK"ErrNoKey = "ErrNoKey"ErrWrongGroup = "ErrWrongGroup"ErrWrongLeader = "ErrWrongLeader"ErrTimeOut = "ErrTimeOut"ErrServer = "ErrServer" )type Err string//主要是applyCh的處理中,ApplyMsg的Command是一個interface,因此要向labgob注冊具體實現(xiàn)才能進行編解碼 func init() {//labgob.Register(PutAppendArgs{})//labgob.Register(PutAppendReply{})//labgob.Register(GetArgs{})//labgob.Register(GetReply{})//labgob.Register(FetchShardDataArgs{})//labgob.Register(FetchShardDataReply{})labgob.Register(CleanShardDataArgs{})//labgob.Register(CleanShardDataReply{})labgob.Register(MergeShardData{}) }// Put or Append type PutAppendArgs struct {// You'll have to add definitions here.Key stringValue stringOp string // "Put" or "Append"// You'll have to add definitions here.// Field names must start with capital letters,// otherwise RPC will break.ClientId int64CommandId int64ConfigNum int }type PutAppendReply struct {Err Err }func (c *PutAppendArgs) copy() PutAppendArgs {r := PutAppendArgs{Key: c.Key,Value: c.Value,Op: c.Op,ClientId: c.ClientId,CommandId: c.CommandId,ConfigNum: c.ConfigNum,}return r }type GetArgs struct {Key string// You'll have to add definitions here.ClientId int64CommandId int64ConfigNum int }type GetReply struct {Err ErrValue string }func (c *GetArgs) copy() GetArgs {r := GetArgs{Key: c.Key,ClientId: c.ClientId,CommandId: c.CommandId,ConfigNum: c.ConfigNum,}return r }//用于向目標(biāo)節(jié)點獲取input shard type FetchShardDataArgs struct {ConfigNum intShardNum int }type FetchShardDataReply struct {Success boolCommandIndexes map[int64]int64Data map[string]string }func (reply *FetchShardDataReply) Copy() FetchShardDataReply {res := FetchShardDataReply{Success: reply.Success,Data: make(map[string]string),CommandIndexes: make(map[int64]int64),}for k, v := range reply.Data {res.Data[k] = v}for k, v := range reply.CommandIndexes {res.CommandIndexes[k] = v}return res }//用于請求目標(biāo)節(jié)點清除指定的output shard type CleanShardDataArgs struct {ConfigNum intShardNum int }type CleanShardDataReply struct {Success bool }//用于存儲output shard的數(shù)據(jù),以及充當(dāng)input shard在apply的命令 type MergeShardData struct {ConfigNum intShardNum intCommandIndexes map[int64]int64 //當(dāng)前shard的所有客戶端的最后一條命令idData map[string]string }測試結(jié)果
總結(jié)
以上是生活随笔為你收集整理的MIT6.824-lab4B-Sharded Key/Value Server(基于Raft的Shard KV数据库-分片KV存储器)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 双倍体遗传算法
- 下一篇: XCTF Leaking