Mongodb源码分析--Replication之主从模式--Master
生活随笔
收集整理的這篇文章主要介紹了
Mongodb源码分析--Replication之主从模式--Master
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
mongodb中提供了復制(Replication)機制,通過該機制可以幫助我們很容易實現讀寫分離方案,并支持災難恢復(服務器斷電)等意外情況下的數據安全。
?? ?? 在老版本(1.6)中,Mongo提供了兩種方式的復制:master-slave及replica pair模式(注:mongodb最新支持的replset復制集方式可看成是pair的升級版,它解決pair只能在兩個結點間同步的限制,支持多個結點同步且支持主從宕機時的自動切換, 在1.6版以后提供)。
???
?? ?? 利用前者,我們可以實現讀寫分離(主從復制模式),后者則支持當主服務器斷電情況下的集群中其它slave自動接管,并升級為主服務器。 并且如果后來的也出錯了,那么master狀態將會轉回給第一個服務器(之前宕機但后來又恢復運行的服務器)。
? ? ? 同時mongodb支持使用安全認證(enable)。不管哪種replicate方式,只要在master/slave中創建一個能為各個database認識的用戶名/密碼即可。其認證過程如下:
????slave先在local.system.users里查找一個名為"repl"的用戶,找到后用它去認證master。如果"repl"用戶沒有找到,則使用local.system.users中的第一個用戶去認證。local數據庫和admin數據庫一樣,local中的用戶可以訪問整個db?server。
?? ?下面介紹分別介紹一下這兩種復制的配置方式:
?? ?
?? ?Master-Slave(主從)模式:
?? ? 一個server可以同時為master和slave。一個slave可以有多個master(不推薦,可能會產生不可預期的結果)。
???? 配置選項:
?? ? --master? 以主服務器方式啟動
?? ? --slave?? 以從服務器方式啟動
???? --autoresync:自動重新sync,因為該操作會copy 主服務器上的所有document,比較耗時,在10分鐘內最多只會進行一次。
?? ? --oplogSize:指定master上用于存放更改的數據量,如果不指定,在32位機上最少為50M,在64位機上最少為 1G,最大為磁盤空間的5%。
?? ? --source? 主服務器地址(與--slave組合使用)
?? ? --only??? 僅限于同步指定數據庫(下面示例為test庫)
?? ? --slavedelay? 同步延時
?? ? 下面是本人在本地為了測試方便所使用的配置參數
?? ???? Master:? IP->10.0.1.103?? ????
mongod --dbpath=d:\mongodb\db?--master?--oplogSize?64 ?? ??? ?Slave:?? IP->10.0.4.210
?? ????
mongod --dbpath=d:\mongodb\db?--slave?--source?10.0.1.103:27017?--only?test?--slavedelay?100
????
??? 補充:受限的master-master復制,這種模式對插入、查詢及根據_id進行的刪除操作都是安全的。但對同一對象的并發更新無法進行。Mongo?不支持完全的master-master復制,通常情況下不推薦使用master-master模式,但在一些特定的情況下master-master也可用。master-master也只支持最終一致性。配置master-master只需運行mongod時同時加上--master選項和?--slave選項。如下:
???? mongod?--dbpath=d:\mongodb\db?--port?27017?--master?--slave?--source?localhost:27018
???? mongod?--dbpath=d:\mongodb\db?--port?27018?--master?--slave?--source?localhost:27017
???? ?
??? Replica pairs模式
???? 以這種方式啟動后,數據庫會自動協商誰是master誰是slave。一旦一個數據庫服務器斷電,另一個會自動接管,并從那一刻起起為master。萬一另一個將來也出錯了,那么master狀態將會轉回給第一個服務器。以這種復制方式啟動mongod的命令如下:
???? 配置選項:
?? ?? mongod --pairwith <remoteserver> --arbiter <arbiterserver>
?? ?? --pairwith: remoteserver是pair里的另一個server
?? ?? --arbiter:? arbiterserver是一個起仲裁作用的Mongo數據庫,用來協商pair中哪一個是master。arbiter運行在第三個機器上,利用“平分決勝制”決定在pair中的兩臺機器不能聯系上對方時讓哪一個做master,一般是能同arbiter通話的那臺機器做master。如果不加--arbiter選項,出現網絡問題時兩臺機器都作為master。
?? ?? 注:可使用db.$cmd.findOne({ismaster:1})可以檢查當前哪一個database是master。
???? 另外這種模式下的兩臺機器只能滿足最終一致性。當replica pair中的一臺機器完全掛掉時,需要用一臺新的來代替。如(n1, n2)中的n2掛掉,這時用n3來代替n2。步驟如下:
?? ?? 1. 告訴n1用n3來代替n2:db.$cmd.findOne({replacepeer:1});
?? ?? 2. 重啟n1讓它同n3對話:mongod --pairwith n3 --arbiter <arbiterserver>
?? ?? 3. 啟動n3:mongod --pairwith n1 --arbiter <arbiterserver>。
???? 在n3的數據沒有同步到n1前n3還不能做master,這個過程長短由數據量的多少決定。
?
?? ?
???? 了解了復制模式之后,還有一個問題需要介紹一下,不是就是本文中mongodb使用cap collection來存儲操作日志,并進而使用日志來復制(同步)結點間的數據,其中由主結點保存的操作的記錄叫做oplog(operation log的簡稱)。
?
?? Oplog存在一個叫local的特殊數據庫中,在oplog.$main集合。Oplog中的每一個文檔表示一個在主結點上執行的操作。文檔主要包括4塊內容,如下:
??
?Ts:操作的時間戳。時間戳類型是一個用來跟蹤操作是何時執行的一種內部類型。它由4字節的時間戳和四字節的增量計數器組成。
?Op:執行的操作的類型,大小為1字節。(例如,“i”代表insert,"u":update,?"d":delete,?"n":none無操作等)
?Ns:執行操作的命名空間(集合名)
?O:執行操作的文檔。對于插入,這是將要插入的文檔。
???? 另外這種日志只保存會“改變數據庫狀態”的操作。查詢操作不會記錄在oplog中。
? ?? 好了,了解這些知識之后,我們就來開始看一下如何調試master-slave模式的源碼,首先要在vs2010中打開mongod項目,并將啟動參數中設置如下:
????? --master --oplogSize 64?? (master IP為10.0.1.103)
???? 如下圖:
?
?? ?
???? 之后編譯該項目,啟動該主服務結點,如下:
?? ?
?? ?
???? 接著我們可以在本地或另外一臺機器上啟動一個slave結點:
? mongod?--dbpath=d:\mongodb\db?--slave?--source?10.0.1.103:27017?--only?test?--slavedelay?100
?? 下面介紹一下master(主服務端)的代碼執行流程。首先我們打開instance.cpp文件,找到下面方法:
??
??? //instance.cpp
????//?Returns?false?when?request?includes?'end'
????void?assembleResponse(?Message?&m,?DbResponse?&dbresponse,?const?SockAddr?&client?)?{
????......
???????if?(?op?==?dbQuery?)?{
????????????if?(?handlePossibleShardedMessage(?m?,?&dbresponse?)?)
????????????????return;
????????????receivedQuery(c?,?dbresponse,?m?);
????????}
????????//服務端(master)?收到message執行相關查詢操作
????????else?if?(?op?==?dbGetMore?)?{
????????????if?(?!?receivedGetMore(dbresponse,?m,?currentOp)?)
????????????????log?=?true;
????????}
????.....
????}
?? ?看過本系列開頭那幾篇BLOG的朋友,會看出上面方法其實在mongodb的crud操作中都會執行到,更多內容可以參見這篇BLOG,這里不再贅述。
?? ?當slave 從結點發送同步復制請求時,master會執行上面的dbGetMore操作,從主庫中的oplog中獲取相應日志并返回給slave結點,下面是receivedGetMore()方法的具體實現:
???
???? //instance.cpp
?????bool?receivedGetMore(DbResponse&?dbresponse,?Message&?m,?CurOp&?curop?)?{
????????StringBuilder&?ss?=?curop.debug().str;
????????bool?ok?=?true;
????????//參見:Mongodb源碼分析--消息(message)中的?查詢更多(document)消息結構相關內容
????????//http://www.cnblogs.com/daizhj/archive/2011/04/02/2003335.html
????????DbMessage?d(m);
????????//完整的集合名稱,形如:"dbname.collectionname"
????????const?char?*ns?=?d.getns();
????????//返回的document數
????????int?ntoreturn?=?d.pullInt();
????????//在REPLY消息中的Cursor標識符,其必須來自于數據庫
????????long?long?cursorid?=?d.pullInt64();
????????ss?<<?ns?<<?"?cid:"?<<?cursorid;
????????if(?ntoreturn?)
????????????ss?<<?"?ntoreturn:"?<<?ntoreturn;
????????time_t?start?=?0;
????????int?pass?=?0;
????????bool?exhaust?=?false;
????????QueryResult*?msgdata;//查詢結果
????????while(?1?)?{
????????????try?{
????????????????readlock?lk;
????????????????Client::Context?ctx(ns);
????????????????//執行GetMore查詢
????????????????msgdata?=?processGetMore(ns,?ntoreturn,?cursorid,?curop,?pass,?exhaust);
????????????}
????????????catch?(?GetMoreWaitException&?)?{
????????????????exhaust?=?false;
????????????????massert(13073,?"shutting?down",?!inShutdown()?);
????????????????if(?pass?==?0?)?{
????????????????????start?=?time(0);
????????????????}
????????????????else?{
????????????????????if(?time(0)?-?start?>=?4?)?{
????????????????????????//?after?about?4?seconds,?return.??this?is?a?sanity?check.??pass?stops?at?1000?normally
????????????????????????//?for?DEV?this?helps?and?also?if?sleep?is?highly?inaccurate?on?a?platform.??we?want?to
????????????????????????//?return?occasionally?so?slave?can?checkpoint.
????????????????????????pass?=?10000;
????????????????????}
????????????????}
????????????????pass++;
????????????????DEV
????????????????sleepmillis(20);
????????????????else
????????????????????sleepmillis(2);
????????????????continue;
????????????}
????????????catch?(?AssertionException&?e?)?{
????????????????exhaust?=?false;
????????????????ss?<<?"?exception?"?<<?e.toString();
????????????????msgdata?=?emptyMoreResult(cursorid);
????????????????ok?=?false;
????????????}
????????????break;
????????};
????????//將查詢結果集綁定到message對象
????????Message?*resp?=?new?Message();
????????resp->setData(msgdata,?true);
????????ss?<<?"?bytes:"?<<?resp->header()->dataLen();
????????ss?<<?"?nreturned:"?<<?msgdata->nReturned;
????????//將上面的消息對象指針綁定到dbresponse
????????dbresponse.response?=?resp;
????????dbresponse.responseTo?=?m.header()->id;
????????if(?exhaust?)?{
????????????ss?<<?"?exhaust?";
????????????dbresponse.exhaust?=?ns;
????????}
????????return?ok;
????} ?? ?
可以看出,通過對message的解析找出相應的cursorid,因為mongodb如果發現游標為tailable(類型)時,會cache該cursor而不是關閉它,這主要是考慮到當下次slave請求來時,直接從cache中獲取該cursor以提升效率并用它來作為繼續獲取后續oplog操作信息。上面方法在執行結束處會將獲取到的oplog結果封裝到message中并返回。但其如何獲取,就要分析下面方法了:
?? ?
????//query.cpp
?????QueryResult*?processGetMore(const?char?*ns,?int?ntoreturn,?long?long?cursorid?,?CurOp&?curop,?int?pass,?bool&?exhaust?)?{
????????exhaust?=?false;
????????//在map<CursorId,?ClientCursor*>中查詢相應游客信息
????????ClientCursor::Pointer?p(cursorid);
????????//將結果返回(可能沒找到)
????????ClientCursor?*cc?=?p.c();
????????int?bufSize?=?512;
????????if?(?cc?)?{
????????????bufSize?+=?sizeof(?QueryResult?);
????????????bufSize?+=?MaxBytesToReturnToClientAtOnce;
????????}
????????//創建收集查詢記錄結果的buf對象
????????BufBuilder?b(?bufSize?);
????????//跳過預留數據區間(QueryResult)
????????b.skip(sizeof(QueryResult));
????????int?resultFlags?=?ResultFlag_AwaitCapable;
????????int?start?=?0;
????????int?n?=?0;
????????//判斷cc是否有效(如未找到則無效)
????????if?(?!cc?)?{
????????????log()?<<?"getMore:?cursorid?not?found?"?<<?ns?<<?"?"?<<?cursorid?<<?endl;
????????????cursorid?=?0;
????????????resultFlags?=?ResultFlag_CursorNotFound;
????????}
????????else?{
????????????//更新master結點local.slaves中的相應信息(包括lastop時間戳)
????????????//注:主結點使用存儲在local.slaves中的syncedTo來跟蹤多少slave是已經更新的。
????????????if?(?pass?==?0?)
????????????????cc->updateSlaveLocation(?curop?);
????????????int?queryOptions?=?cc->queryOptions();
????????????if(?pass?==?0?)?{
????????????????StringBuilder&?ss?=?curop.debug().str;
????????????????ss?<<?"?getMore:?"?<<?cc->query().toString()?<<?"?";
????????????}
????????????//獲取相應cursor,以便while遍歷
????????????start?=?cc->pos();
????????????Cursor?*c?=?cc->c();
????????????c->checkLocation();
????????????DiskLoc?last;
????????????scoped_ptr<Projection::KeyOnly>?keyFieldsOnly;
????????????if?(?cc->modifiedKeys()?==?false?&&?cc->isMultiKey()?==?false?&&?cc->fields?)
????????????????keyFieldsOnly.reset(?cc->fields->checkKey(?cc->indexKeyPattern()?)?);
????????????//遍歷cursor,找到并封裝相應查詢結果給buf對象
????????????while?(?1?)?{
????????????????if?(?!c->ok()?)?{//到結尾
????????????????????if?(?c->tailable()?)?{//處理tailable情況
????????????????????????//Tailable?表示在返回最后一條數據后,不要關閉當前?cursor。
????????????????????????//這是因為系統考慮到稍后你可以再次使用該cursor.??
????????????????????????/*?when?a?tailable?cursor?hits?"EOF",?ok()?goes?false,?and?current()?is?null.??however
???????????????????????????advance()?can?still?be?retries?as?a?reactivation?attempt.??when?there?is?new?data,?it?will
???????????????????????????return?true.??that's?what?we?are?doing?here.
???????????????????????????*/
????????????????????????if?(?c->advance()?)
????????????????????????????continue;
????????????????????????if(?n?==?0?&&?(queryOptions?&?QueryOption_AwaitData)?&&?pass?<?1000?)?{
????????????????????????????throw?GetMoreWaitException();
????????????????????????}
????????????????????????break;
????????????????????}
????????????????????//釋放cursor資源關閉它(執行delete操作)
????????????????????p.release();
????????????????????bool?ok?=?ClientCursor::erase(cursorid);
????????????????????assert(ok);
????????????????????cursorid?=?0;
????????????????????cc?=?0;
????????????????????break;
????????????????}
????????????????//?如果是clone?collection時,則不會匹配 // If match succeeds on index key, then attempt to match full document.??????????????? if?(?c->matcher()?&&?!c->matcher()->matches(c->currKey(),?c->currLoc()?)?)?{
????????????????}
????????????????/*
??????????????????TODO
????????????????else?if?(?_chunkMatcher?&&?!?_chunkMatcher->belongsToMe(?c->currKey(),?c->currLoc()?)?){
????????????????????cout?<<?"TEMP?skipping?un-owned?chunk:?"?<<?c->current()?<<?endl;
????????????????}
????????????????*/
????????????????else?{//值是否重復
????????????????????if(?c->getsetdup(c->currLoc())?)?{
????????????????????????//out()?<<?"??but?it's?a?dup?\n";
????????????????????}
????????????????????else?{//如匹配
????????????????????????last?=?c->currLoc();
????????????????????????n++;
????????????????????????//裝填數據到buf中
????????????????????????if?(?keyFieldsOnly?)?{
????????????????????????????fillQueryResultFromObj(b,?0,?keyFieldsOnly->hydrate(?c->currKey()?)?);
????????????????????????}
????????????????????????else?{
????????????????????????????BSONObj?js?=?c->current();
????????????????????????????//?show?disk?loc?should?be?part?of?the?main?query,?not?in?an?$or?clause,?so?this?should?be?ok
????????????????????????????fillQueryResultFromObj(b,?cc->fields.get(),?js,?(?cc->pq.get()?&&?cc->pq->showDiskLoc()???&last?:?0));
????????????????????????}
????????????????????????if?(?(?ntoreturn?&&?n?>=?ntoreturn?)?||?b.len()?>?MaxBytesToReturnToClientAtOnce?)?{
????????????????????????????c->advance();
????????????????????????????cc->incPos(?n?);
????????????????????????????break;
????????????????????????}
????????????????????}
????????????????}
????????????????//指向下一條記錄
????????????????c->advance();
????????????????if?(?!?cc->yieldSometimes()?)?{
????????????????????cc?=?0;
????????????????????break;
????????????????}
????????????}
????????????if?(?cc?)?{
????????????????cc->updateLocation();
????????????????cc->mayUpgradeStorage();
????????????????//用last中的optime?更新_slaveReadTill
????????????????cc->storeOpForSlave(?last?);
????????????????exhaust?=?cc->queryOptions()?&?QueryOption_Exhaust;
????????????}
????????}
????????//將buf中的信息綁定到查詢結果集
????????QueryResult?*qr?=?(QueryResult?*)?b.buf();
????????qr->len?=?b.len();
????????qr->setOperation(opReply);
????????qr->_resultFlags()?=?resultFlags;
????????qr->cursorId?=?cursorid;
????????qr->startingFrom?=?start;
????????qr->nReturned?=?n;
????????b.decouple();
????????return?qr;
????}
?? ? 上面代碼有些長,但其目的很明確,就是針對指定的cursor進行遍歷。這里mongodb會為每個slave保存一個cursor,并且其在遍歷完成后將最后一條oplog的時間戳作為當前slave在local.slaves中的更新標識信息(syncedTo),來標識當前slave的更新情況。(注:首次同步時全部復制會執行copyDatabase,復制master db上的所有document)。該方法運行截圖如下:
???
??
?? ?http://www.snailinaturtleneck.com/blog/2010/10/14/getting-to-know-your-oplog/
?? ?http://www.snailinaturtleneck.com/blog/2010/08/02/replica-sets-part-2-what-are-replica-sets/
?? ?原文鏈接:http://www.cnblogs.com/daizhj/archive/2011/06/13/mongodb_sourcecode_repl_master_run.html
??? 作者: daizhj, 代震軍? ?
??? 微博: http://t.sina.com.cn/daizhj
??? Tags: mongodb,c++,Replica,master-slave
?? ?? 在老版本(1.6)中,Mongo提供了兩種方式的復制:master-slave及replica pair模式(注:mongodb最新支持的replset復制集方式可看成是pair的升級版,它解決pair只能在兩個結點間同步的限制,支持多個結點同步且支持主從宕機時的自動切換, 在1.6版以后提供)。
???
?? ?? 利用前者,我們可以實現讀寫分離(主從復制模式),后者則支持當主服務器斷電情況下的集群中其它slave自動接管,并升級為主服務器。 并且如果后來的也出錯了,那么master狀態將會轉回給第一個服務器(之前宕機但后來又恢復運行的服務器)。
? ? ? 同時mongodb支持使用安全認證(enable)。不管哪種replicate方式,只要在master/slave中創建一個能為各個database認識的用戶名/密碼即可。其認證過程如下:
????slave先在local.system.users里查找一個名為"repl"的用戶,找到后用它去認證master。如果"repl"用戶沒有找到,則使用local.system.users中的第一個用戶去認證。local數據庫和admin數據庫一樣,local中的用戶可以訪問整個db?server。
?? ?下面介紹分別介紹一下這兩種復制的配置方式:
?? ?
?? ?Master-Slave(主從)模式:
?? ? 一個server可以同時為master和slave。一個slave可以有多個master(不推薦,可能會產生不可預期的結果)。
???? 配置選項:
?? ? --master? 以主服務器方式啟動
?? ? --slave?? 以從服務器方式啟動
???? --autoresync:自動重新sync,因為該操作會copy 主服務器上的所有document,比較耗時,在10分鐘內最多只會進行一次。
?? ? --oplogSize:指定master上用于存放更改的數據量,如果不指定,在32位機上最少為50M,在64位機上最少為 1G,最大為磁盤空間的5%。
?? ? --source? 主服務器地址(與--slave組合使用)
?? ? --only??? 僅限于同步指定數據庫(下面示例為test庫)
?? ? --slavedelay? 同步延時
?? ? 下面是本人在本地為了測試方便所使用的配置參數
?? ???? Master:? IP->10.0.1.103?? ????
mongod --dbpath=d:\mongodb\db?--master?--oplogSize?64 ?? ??? ?Slave:?? IP->10.0.4.210
?? ????
mongod --dbpath=d:\mongodb\db?--slave?--source?10.0.1.103:27017?--only?test?--slavedelay?100
????
??? 補充:受限的master-master復制,這種模式對插入、查詢及根據_id進行的刪除操作都是安全的。但對同一對象的并發更新無法進行。Mongo?不支持完全的master-master復制,通常情況下不推薦使用master-master模式,但在一些特定的情況下master-master也可用。master-master也只支持最終一致性。配置master-master只需運行mongod時同時加上--master選項和?--slave選項。如下:
???? mongod?--dbpath=d:\mongodb\db?--port?27017?--master?--slave?--source?localhost:27018
???? mongod?--dbpath=d:\mongodb\db?--port?27018?--master?--slave?--source?localhost:27017
???? ?
??? Replica pairs模式
???? 以這種方式啟動后,數據庫會自動協商誰是master誰是slave。一旦一個數據庫服務器斷電,另一個會自動接管,并從那一刻起起為master。萬一另一個將來也出錯了,那么master狀態將會轉回給第一個服務器。以這種復制方式啟動mongod的命令如下:
???? 配置選項:
?? ?? mongod --pairwith <remoteserver> --arbiter <arbiterserver>
?? ?? --pairwith: remoteserver是pair里的另一個server
?? ?? --arbiter:? arbiterserver是一個起仲裁作用的Mongo數據庫,用來協商pair中哪一個是master。arbiter運行在第三個機器上,利用“平分決勝制”決定在pair中的兩臺機器不能聯系上對方時讓哪一個做master,一般是能同arbiter通話的那臺機器做master。如果不加--arbiter選項,出現網絡問題時兩臺機器都作為master。
?? ?? 注:可使用db.$cmd.findOne({ismaster:1})可以檢查當前哪一個database是master。
???? 另外這種模式下的兩臺機器只能滿足最終一致性。當replica pair中的一臺機器完全掛掉時,需要用一臺新的來代替。如(n1, n2)中的n2掛掉,這時用n3來代替n2。步驟如下:
?? ?? 1. 告訴n1用n3來代替n2:db.$cmd.findOne({replacepeer:1});
?? ?? 2. 重啟n1讓它同n3對話:mongod --pairwith n3 --arbiter <arbiterserver>
?? ?? 3. 啟動n3:mongod --pairwith n1 --arbiter <arbiterserver>。
???? 在n3的數據沒有同步到n1前n3還不能做master,這個過程長短由數據量的多少決定。
?
?? ?
???? 了解了復制模式之后,還有一個問題需要介紹一下,不是就是本文中mongodb使用cap collection來存儲操作日志,并進而使用日志來復制(同步)結點間的數據,其中由主結點保存的操作的記錄叫做oplog(operation log的簡稱)。
?
?? Oplog存在一個叫local的特殊數據庫中,在oplog.$main集合。Oplog中的每一個文檔表示一個在主結點上執行的操作。文檔主要包括4塊內容,如下:
??
?Ts:操作的時間戳。時間戳類型是一個用來跟蹤操作是何時執行的一種內部類型。它由4字節的時間戳和四字節的增量計數器組成。
?Op:執行的操作的類型,大小為1字節。(例如,“i”代表insert,"u":update,?"d":delete,?"n":none無操作等)
?Ns:執行操作的命名空間(集合名)
?O:執行操作的文檔。對于插入,這是將要插入的文檔。
???? 另外這種日志只保存會“改變數據庫狀態”的操作。查詢操作不會記錄在oplog中。
? ?? 好了,了解這些知識之后,我們就來開始看一下如何調試master-slave模式的源碼,首先要在vs2010中打開mongod項目,并將啟動參數中設置如下:
????? --master --oplogSize 64?? (master IP為10.0.1.103)
???? 如下圖:
?
?? ?
???? 之后編譯該項目,啟動該主服務結點,如下:
?? ?
?? ?
???? 接著我們可以在本地或另外一臺機器上啟動一個slave結點:
? mongod?--dbpath=d:\mongodb\db?--slave?--source?10.0.1.103:27017?--only?test?--slavedelay?100
?? 下面介紹一下master(主服務端)的代碼執行流程。首先我們打開instance.cpp文件,找到下面方法:
??
??? //instance.cpp
????//?Returns?false?when?request?includes?'end'
????void?assembleResponse(?Message?&m,?DbResponse?&dbresponse,?const?SockAddr?&client?)?{
????......
???????if?(?op?==?dbQuery?)?{
????????????if?(?handlePossibleShardedMessage(?m?,?&dbresponse?)?)
????????????????return;
????????????receivedQuery(c?,?dbresponse,?m?);
????????}
????????//服務端(master)?收到message執行相關查詢操作
????????else?if?(?op?==?dbGetMore?)?{
????????????if?(?!?receivedGetMore(dbresponse,?m,?currentOp)?)
????????????????log?=?true;
????????}
????.....
????}
?? ?看過本系列開頭那幾篇BLOG的朋友,會看出上面方法其實在mongodb的crud操作中都會執行到,更多內容可以參見這篇BLOG,這里不再贅述。
?? ?當slave 從結點發送同步復制請求時,master會執行上面的dbGetMore操作,從主庫中的oplog中獲取相應日志并返回給slave結點,下面是receivedGetMore()方法的具體實現:
???
???? //instance.cpp
?????bool?receivedGetMore(DbResponse&?dbresponse,?Message&?m,?CurOp&?curop?)?{
????????StringBuilder&?ss?=?curop.debug().str;
????????bool?ok?=?true;
????????//參見:Mongodb源碼分析--消息(message)中的?查詢更多(document)消息結構相關內容
????????//http://www.cnblogs.com/daizhj/archive/2011/04/02/2003335.html
????????DbMessage?d(m);
????????//完整的集合名稱,形如:"dbname.collectionname"
????????const?char?*ns?=?d.getns();
????????//返回的document數
????????int?ntoreturn?=?d.pullInt();
????????//在REPLY消息中的Cursor標識符,其必須來自于數據庫
????????long?long?cursorid?=?d.pullInt64();
????????ss?<<?ns?<<?"?cid:"?<<?cursorid;
????????if(?ntoreturn?)
????????????ss?<<?"?ntoreturn:"?<<?ntoreturn;
????????time_t?start?=?0;
????????int?pass?=?0;
????????bool?exhaust?=?false;
????????QueryResult*?msgdata;//查詢結果
????????while(?1?)?{
????????????try?{
????????????????readlock?lk;
????????????????Client::Context?ctx(ns);
????????????????//執行GetMore查詢
????????????????msgdata?=?processGetMore(ns,?ntoreturn,?cursorid,?curop,?pass,?exhaust);
????????????}
????????????catch?(?GetMoreWaitException&?)?{
????????????????exhaust?=?false;
????????????????massert(13073,?"shutting?down",?!inShutdown()?);
????????????????if(?pass?==?0?)?{
????????????????????start?=?time(0);
????????????????}
????????????????else?{
????????????????????if(?time(0)?-?start?>=?4?)?{
????????????????????????//?after?about?4?seconds,?return.??this?is?a?sanity?check.??pass?stops?at?1000?normally
????????????????????????//?for?DEV?this?helps?and?also?if?sleep?is?highly?inaccurate?on?a?platform.??we?want?to
????????????????????????//?return?occasionally?so?slave?can?checkpoint.
????????????????????????pass?=?10000;
????????????????????}
????????????????}
????????????????pass++;
????????????????DEV
????????????????sleepmillis(20);
????????????????else
????????????????????sleepmillis(2);
????????????????continue;
????????????}
????????????catch?(?AssertionException&?e?)?{
????????????????exhaust?=?false;
????????????????ss?<<?"?exception?"?<<?e.toString();
????????????????msgdata?=?emptyMoreResult(cursorid);
????????????????ok?=?false;
????????????}
????????????break;
????????};
????????//將查詢結果集綁定到message對象
????????Message?*resp?=?new?Message();
????????resp->setData(msgdata,?true);
????????ss?<<?"?bytes:"?<<?resp->header()->dataLen();
????????ss?<<?"?nreturned:"?<<?msgdata->nReturned;
????????//將上面的消息對象指針綁定到dbresponse
????????dbresponse.response?=?resp;
????????dbresponse.responseTo?=?m.header()->id;
????????if(?exhaust?)?{
????????????ss?<<?"?exhaust?";
????????????dbresponse.exhaust?=?ns;
????????}
????????return?ok;
????} ?? ?
可以看出,通過對message的解析找出相應的cursorid,因為mongodb如果發現游標為tailable(類型)時,會cache該cursor而不是關閉它,這主要是考慮到當下次slave請求來時,直接從cache中獲取該cursor以提升效率并用它來作為繼續獲取后續oplog操作信息。上面方法在執行結束處會將獲取到的oplog結果封裝到message中并返回。但其如何獲取,就要分析下面方法了:
?? ?
????//query.cpp
?????QueryResult*?processGetMore(const?char?*ns,?int?ntoreturn,?long?long?cursorid?,?CurOp&?curop,?int?pass,?bool&?exhaust?)?{
????????exhaust?=?false;
????????//在map<CursorId,?ClientCursor*>中查詢相應游客信息
????????ClientCursor::Pointer?p(cursorid);
????????//將結果返回(可能沒找到)
????????ClientCursor?*cc?=?p.c();
????????int?bufSize?=?512;
????????if?(?cc?)?{
????????????bufSize?+=?sizeof(?QueryResult?);
????????????bufSize?+=?MaxBytesToReturnToClientAtOnce;
????????}
????????//創建收集查詢記錄結果的buf對象
????????BufBuilder?b(?bufSize?);
????????//跳過預留數據區間(QueryResult)
????????b.skip(sizeof(QueryResult));
????????int?resultFlags?=?ResultFlag_AwaitCapable;
????????int?start?=?0;
????????int?n?=?0;
????????//判斷cc是否有效(如未找到則無效)
????????if?(?!cc?)?{
????????????log()?<<?"getMore:?cursorid?not?found?"?<<?ns?<<?"?"?<<?cursorid?<<?endl;
????????????cursorid?=?0;
????????????resultFlags?=?ResultFlag_CursorNotFound;
????????}
????????else?{
????????????//更新master結點local.slaves中的相應信息(包括lastop時間戳)
????????????//注:主結點使用存儲在local.slaves中的syncedTo來跟蹤多少slave是已經更新的。
????????????if?(?pass?==?0?)
????????????????cc->updateSlaveLocation(?curop?);
????????????int?queryOptions?=?cc->queryOptions();
????????????if(?pass?==?0?)?{
????????????????StringBuilder&?ss?=?curop.debug().str;
????????????????ss?<<?"?getMore:?"?<<?cc->query().toString()?<<?"?";
????????????}
????????????//獲取相應cursor,以便while遍歷
????????????start?=?cc->pos();
????????????Cursor?*c?=?cc->c();
????????????c->checkLocation();
????????????DiskLoc?last;
????????????scoped_ptr<Projection::KeyOnly>?keyFieldsOnly;
????????????if?(?cc->modifiedKeys()?==?false?&&?cc->isMultiKey()?==?false?&&?cc->fields?)
????????????????keyFieldsOnly.reset(?cc->fields->checkKey(?cc->indexKeyPattern()?)?);
????????????//遍歷cursor,找到并封裝相應查詢結果給buf對象
????????????while?(?1?)?{
????????????????if?(?!c->ok()?)?{//到結尾
????????????????????if?(?c->tailable()?)?{//處理tailable情況
????????????????????????//Tailable?表示在返回最后一條數據后,不要關閉當前?cursor。
????????????????????????//這是因為系統考慮到稍后你可以再次使用該cursor.??
????????????????????????/*?when?a?tailable?cursor?hits?"EOF",?ok()?goes?false,?and?current()?is?null.??however
???????????????????????????advance()?can?still?be?retries?as?a?reactivation?attempt.??when?there?is?new?data,?it?will
???????????????????????????return?true.??that's?what?we?are?doing?here.
???????????????????????????*/
????????????????????????if?(?c->advance()?)
????????????????????????????continue;
????????????????????????if(?n?==?0?&&?(queryOptions?&?QueryOption_AwaitData)?&&?pass?<?1000?)?{
????????????????????????????throw?GetMoreWaitException();
????????????????????????}
????????????????????????break;
????????????????????}
????????????????????//釋放cursor資源關閉它(執行delete操作)
????????????????????p.release();
????????????????????bool?ok?=?ClientCursor::erase(cursorid);
????????????????????assert(ok);
????????????????????cursorid?=?0;
????????????????????cc?=?0;
????????????????????break;
????????????????}
????????????????//?如果是clone?collection時,則不會匹配 // If match succeeds on index key, then attempt to match full document.??????????????? if?(?c->matcher()?&&?!c->matcher()->matches(c->currKey(),?c->currLoc()?)?)?{
????????????????}
????????????????/*
??????????????????TODO
????????????????else?if?(?_chunkMatcher?&&?!?_chunkMatcher->belongsToMe(?c->currKey(),?c->currLoc()?)?){
????????????????????cout?<<?"TEMP?skipping?un-owned?chunk:?"?<<?c->current()?<<?endl;
????????????????}
????????????????*/
????????????????else?{//值是否重復
????????????????????if(?c->getsetdup(c->currLoc())?)?{
????????????????????????//out()?<<?"??but?it's?a?dup?\n";
????????????????????}
????????????????????else?{//如匹配
????????????????????????last?=?c->currLoc();
????????????????????????n++;
????????????????????????//裝填數據到buf中
????????????????????????if?(?keyFieldsOnly?)?{
????????????????????????????fillQueryResultFromObj(b,?0,?keyFieldsOnly->hydrate(?c->currKey()?)?);
????????????????????????}
????????????????????????else?{
????????????????????????????BSONObj?js?=?c->current();
????????????????????????????//?show?disk?loc?should?be?part?of?the?main?query,?not?in?an?$or?clause,?so?this?should?be?ok
????????????????????????????fillQueryResultFromObj(b,?cc->fields.get(),?js,?(?cc->pq.get()?&&?cc->pq->showDiskLoc()???&last?:?0));
????????????????????????}
????????????????????????if?(?(?ntoreturn?&&?n?>=?ntoreturn?)?||?b.len()?>?MaxBytesToReturnToClientAtOnce?)?{
????????????????????????????c->advance();
????????????????????????????cc->incPos(?n?);
????????????????????????????break;
????????????????????????}
????????????????????}
????????????????}
????????????????//指向下一條記錄
????????????????c->advance();
????????????????if?(?!?cc->yieldSometimes()?)?{
????????????????????cc?=?0;
????????????????????break;
????????????????}
????????????}
????????????if?(?cc?)?{
????????????????cc->updateLocation();
????????????????cc->mayUpgradeStorage();
????????????????//用last中的optime?更新_slaveReadTill
????????????????cc->storeOpForSlave(?last?);
????????????????exhaust?=?cc->queryOptions()?&?QueryOption_Exhaust;
????????????}
????????}
????????//將buf中的信息綁定到查詢結果集
????????QueryResult?*qr?=?(QueryResult?*)?b.buf();
????????qr->len?=?b.len();
????????qr->setOperation(opReply);
????????qr->_resultFlags()?=?resultFlags;
????????qr->cursorId?=?cursorid;
????????qr->startingFrom?=?start;
????????qr->nReturned?=?n;
????????b.decouple();
????????return?qr;
????}
?? ? 上面代碼有些長,但其目的很明確,就是針對指定的cursor進行遍歷。這里mongodb會為每個slave保存一個cursor,并且其在遍歷完成后將最后一條oplog的時間戳作為當前slave在local.slaves中的更新標識信息(syncedTo),來標識當前slave的更新情況。(注:首次同步時全部復制會執行copyDatabase,復制master db上的所有document)。該方法運行截圖如下:
???
??
?
另外需要解釋的是,master結點貌似并不會使用slave發來的syncedTo來過濾capped collection中的舊oplog(指小于syncedTo時間戳)的數據,而是使用tailable類型的cursor來解決如果持續獲取后續新增oplog操作信息。前者的主觀臆測讓我在源碼中兜了一個圈子,因為我一直主觀認為mongod會執行類似查詢操作來過濾相應舊oplog的時間戳信息,并將結果集返回給slave端。現在看來master只是不斷返回后續添加到cap collection中oplog(有可能是out of sync的情況而引發slave地點執行resync操作),而最終的過濾判斷操作完全交給了slave端。這一點會在我下一篇文章中有所介紹。?
? ?? 好了,今天的內容到這里就告一段落了。在接下來的文章中,將會介紹slave端是如何發起同步操作,以及最終如何使用獲取到的oplog來構造本機數據的。
?? ?參考鏈接:
http://www.mongodb.org/display/DOCS/Replicationhttp://www.mongodb.org/display/DOCS/Master+Slave ??? http://www.snailinaturtleneck.com/blog/2010/10/12/replication-internals/
?? ?http://www.snailinaturtleneck.com/blog/2010/10/14/getting-to-know-your-oplog/
?? ?http://www.snailinaturtleneck.com/blog/2010/08/02/replica-sets-part-2-what-are-replica-sets/
?? ?原文鏈接:http://www.cnblogs.com/daizhj/archive/2011/06/13/mongodb_sourcecode_repl_master_run.html
??? 作者: daizhj, 代震軍? ?
??? 微博: http://t.sina.com.cn/daizhj
??? Tags: mongodb,c++,Replica,master-slave
?
轉載于:https://www.cnblogs.com/daizhj/archive/2011/06/13/mongodb_sourcecode_repl_master_run.html
總結
以上是生活随笔為你收集整理的Mongodb源码分析--Replication之主从模式--Master的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【小假期】反思与计划。6.9-6.10
- 下一篇: C++ 与 JAVA的不同点