MongoDB源码阅读之ReplSet源码分析
1. ReplSet源碼結構
| rs_config.h replSet間同步設置的工具類 rs_member.h 心跳檢測類和replSet成員狀態的定義 rs_sync.h 同步數據類 rs.h 定義了幾乎所有replSet相關的類(Member:replSet中的節點成員, GhostSync:備份同步類,ReplSet:管理所有Member的核心類) |
2. ReplSet的結構分析
RSBase定義了線程鎖。
ReplSetImpl完成了大部分Replication Set的操作:心跳檢測,選舉,設置主節點,查詢節點信息。
Member記錄其他節點的信息,Manager查詢節點信息,ReplSetHealthPoolTask心跳檢測,ReplSetConfig同步設置,Consensus選舉主節點,StateBox設置主節點。
ReplSet封裝了很多方便的操作。
3. 創建并設置ReplSet
在mongod開始運行后(mongoDBMain),會啟動監聽程序同時,根據運行參數中是否標明了”--replSet”,調用startReplication來創建ReplSet。
void?startReplication()?{????????/*?if?we?are?going?to?be?a?replica?set,?we?aren't?doing?other?forms?of?replication.?*/
????????if(?!cmdLine._replSet.empty()?)?{??//看看參數里面有沒有--replSet
????????????if(?replSettings.slave?||?replSettings.master?)?{???//這個參數不能與—slave與—master共存
????????????????log()?<<?"***"?<<?endl;
????????????????log()?<<?"ERROR:?can't?use?--slave?or?--master?replication?options?with?--replSet"?<<?endl;
????????????????log()?<<?"***"?<<?endl;
????????????}
????????????newRepl();?//將自己標示成為一個新的Replication節點
????????????replSet?=?true;
????????????ReplSetCmdline?*replSetCmdline?=?new?ReplSetCmdline(cmdLine._replSet);
????????????boost::thread?t(?boost::bind(?&startReplSets,?replSetCmdline)?);??//啟動線程完成真正的ReplSet創建
????????????return;
????????}
????????//……
???}
????void?startReplSets(ReplSetCmdline?*replSetCmdline)?{??//線程中執行的函數
????????Client::initThread("rsStart");
????????try?{
????????????verify(?theReplSet?==?0?);
????????????if(?replSetCmdline?==?0?)?{
????????????????verify(!replSet);
????????????????return;
????????????}
????????????replLocalAuth();
????????????(theReplSet?=?new?ReplSet(*replSetCmdline))->go();?//創建一個ReplSet賦值到一個全局的指針中,隨即調用go啟動
????????}
????????catch(std::exception&?e)?{
????????????log()?<<?"replSet?caught?exception?in?startReplSets?thread:?"?<<?e.what()?<<?rsLog;
????????????if(?theReplSet?)
????????????????theReplSet->fatal();
????????}
????????cc().shutdown();
????}
?
在構造ReplSetImpl時,會調用ReplSetImpl::loadConfig去設置ReplSet。
?
void?ReplSetImpl::loadConfig()?{????????startupStatus?=?LOADINGCONFIG;?//標注狀態為讀取config
????????startupStatusMsg.set("loading?"?+?rsConfigNs?+?"?config?(LOADINGCONFIG)");
????????LOG(1)?<<?"loadConfig()?"?<<?rsConfigNs?<<?endl;
????????while(?1?)?{???//循環獲取配置
????????????try?{
????????????????vector<ReplSetConfig>?configs;
????????????????try?{
????????????????????configs.push_back(?ReplSetConfig(HostAndPort::me())?);??//嘗試從本機端口獲取配置
????????????????}
????????????????catch(DBException&?e)?{
????????????????????log()?<<?"replSet?exception?loading?our?local?replset?configuration?object?:?"?<<?e.toString()?<<?rsLog;
????????????????}
????????????????//這里的種子是用戶配置的
????????????????for(?vector<HostAndPort>::const_iterator?i?=?_seeds->begin();?i?!=?_seeds->end();?i++?)?{???
????????????????????try?{
????????????????????????configs.push_back(?ReplSetConfig(*i)?);???//嘗試從其他設備上獲取配置
????????????????????}
????????????????????catch(?DBException&?e?)?{
????????????????????????log()?<<?"replSet?exception?trying?to?load?config?from?"?<<?*i?<<?"?:?"?<<?e.toString()?<<?rsLog;
????????????????????}
????????????????}
????????????????{
????????????????????scoped_lock?lck(?replSettings.discoveredSeeds_mx?);
????????????????????if(?replSettings.discoveredSeeds.size()?>?0?)?{????//其他線程搜索的其他設備(心跳檢測中提供的節點)
????????????????????????for?(set<string>::iterator?i?=?replSettings.discoveredSeeds.begin();?
?????????????????????????????i?!=?replSettings.discoveredSeeds.end();?
?????????????????????????????i++)?{
????????????????????????????try?{
????????????????????????????????configs.push_back(?ReplSetConfig(HostAndPort(*i))?);???//從新搜索的設備中獲取配置
????????????????????????????}
????????????????????????????catch(?DBException&?)?{
????????????????????????????????log(1)?<<?"replSet?exception?trying?to?load?config?from?discovered?seed?"?<<?*i?<<?rsLog;
????????????????????????????????replSettings.discoveredSeeds.erase(*i);
????????????????????????????}
????????????????????????}
????????????????????}
????????????????}
????????????????if?(!replSettings.reconfig.isEmpty())?{
????????????????????try?{
????????????????????????configs.push_back(ReplSetConfig(replSettings.reconfig,?true));?//從shell獲取配置
????????????????????}
????????????????????catch(?DBException&?re)?{
????????????????????????log()?<<?"replSet?couldn't?load?reconfig:?"?<<?re.what()?<<?rsLog;
????????????????????????replSettings.reconfig?=?BSONObj();
????????????????????}
????????????????}
????????????????int?nok?=?0;
????????????????int?nempty?=?0;
????????????????for(?vector<ReplSetConfig>::iterator?i?=?configs.begin();?i?!=?configs.end();?i++?)?{
????????????????????if(?i->ok()?)???//判斷得到的設置是否可用
????????????????????????nok++;
????????????????????if(?i->empty()?)???//判斷得到配置是否為空
????????????????????????nempty++;????
????????????????}
????????????????if(?nok?==?0?)?{
????????????????????if(?nempty?==?(int)?configs.size()?)?{???//如果沒有獲取到配置信息
????????????????????????startupStatus?=?EMPTYCONFIG;???//標注狀態
????????????????????????startupStatusMsg.set("can't?get?"?+?rsConfigNs?+?"?config?from?self?or?any?seed?(EMPTYCONFIG)");
????????????????????????log()?<<?"replSet?can't?get?"?<<?rsConfigNs?<<?"?config?from?self?or?any?seed?(EMPTYCONFIG)"?<<?rsLog;
????????????????????????static?unsigned?once;
????????????????????????if(?++once?==?1?)?{
????????????????????????????log()?<<?"replSet?info?you?may?need?to?run?replSetInitiate?--?rs.initiate()?in?the?shell?--?if?that?is?not?already?done"?<<?rsLog;
????????????????????????}
????????????????????????if(?_seeds->size()?==?0?)?{
????????????????????????????LOG(1)?<<?"replSet?info?no?seed?hosts?were?specified?on?the?--replSet?command?line"?<<?rsLog;
????????????????????????}
????????????????????}
????????????????????else?{
????????????????????????startupStatus?=?EMPTYUNREACHABLE;
????????????????????????startupStatusMsg.set("can't?currently?get?"?+?rsConfigNs?+?"?config?from?self?or?any?seed?(EMPTYUNREACHABLE)");
????????????????????????log()?<<?"replSet?can't?get?"?<<?rsConfigNs?<<?"?config?from?self?or?any?seed?(yet)"?<<?rsLog;
????????????????????}
????????????????????sleepsecs(10);??//十秒后重試
????????????????????continue;
????????????????}
????????????????if(?!_loadConfigFinish(configs)?)?{?????//找到了配置信息但是配置失敗
????????????????????log()?<<?"replSet?info?Couldn't?load?config?yet.?Sleeping?20sec?and?will?try?again."?<<?rsLog;
????????????????????sleepsecs(20);??//二十秒后重試
????????????????????continue;
????????????????}
????????????}
????????????catch(DBException&?e)?{
????????????????startupStatus?=?BADCONFIG;
????????????????startupStatusMsg.set("replSet?error?loading?set?config?(BADCONFIG)");
????????????????log()?<<?"replSet?error?loading?configurations?"?<<?e.toString()?<<?rsLog;
????????????????log()?<<?"replSet?error?replication?will?not?start"?<<?rsLog;
????????????????sethbmsg("error?loading?set?config");
????????????????_fatal();
????????????????throw;
????????????}
????????????break;???//設置成功跳出循環
????????}
????????startupStatusMsg.set("??started");
????????startupStatus?=?STARTED;???//標注狀態為開始運行
????}
從代碼上看,設置過程為先去各個地方找到可用的配置。之后,統計其中可用的配置,如果沒有可用的配置也沒有不為空的配置,則輸出log提示用戶設置。當找到了可用的配置之后,將其數組發送到_loadConfigFinish函數進行設置。
ReplSet的設置工作:
//?Our?own?config?must?be?the?first?one.????bool?ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>&?cfgs)?{
????????int?v?=?-1;????//當前版本號
????????ReplSetConfig?*highest?=?0;????
????????int?myVersion?=?-2000;????
????????int?n?=?0;
????????for(?vector<ReplSetConfig>::iterator?i?=?cfgs.begin();?i?!=?cfgs.end();?i++?)?{???//循環取出配置對象
????????????ReplSetConfig&?cfg?=?*i;
????????????DEV?log(1)?<<?n+1?<<?"?config?shows?version?"?<<?cfg.version?<<?rsLog;?
????????????if(?++n?==?1?)?myVersion?=?cfg.version;
????????????if(?cfg.ok()?&&?cfg.version?>?v?)?{????//如果可用,并且配置版本比當前高,則存儲起來
????????????????highest?=?&cfg;
????????????????v?=?cfg.version;
????????????}
????????}
????????verify(?highest?);
????????if(?!initFromConfig(*highest)?)???//將最高配置對象,進行初始化
????????????return?false;
????????if(?highest->version?>?myVersion?&&?highest->version?>=?0?)?{
????????????log()?<<?"replSet?got?config?version?"?<<?highest->version?<<?"?from?a?remote,?saving?locally"?<<?rsLog;
????????????highest->saveConfigLocally(BSONObj());
????????}
????????return?true;
????}
?
4. ReplSet間的心跳檢測
在配置ReplSet的過程中,通過調用initFromConfig來更新配置。這個函數中會先去找到新添加的節點,如果有新的節點,為新的節點添加心跳檢測的task,如果沒有新的節點,則認為是更新配置,這樣停掉之前所有的心跳檢測task開啟新的task for health。
心跳檢測過程的示意圖:
//開啟一個心跳檢測task????void?ReplSetImpl::startHealthTaskFor(Member?*m)?{
????????DEV?log()?<<?"starting?rsHealthPoll?for?"?<<?m->fullName()?<<?endl;
????????ReplSetHealthPollTask?*task?=?new?ReplSetHealthPollTask(m->h(),?m->hbinfo());???//建立心跳檢測task
????????healthTasks.insert(task);?????//插入心跳檢測task集合
????????task::repeat(task,?2000);?????//每隔兩秒心跳檢測一次?(在一次檢測完成以后等待兩秒后再次檢測)
????}
?
以發送請求方說明:
心跳檢測的目是查看其他Repl上的節點是不是存活。
存活的話調用up函數如下:
?
void?ReplSetHealthPollTask::up(const?BSONObj&?info,?HeartbeatInfo&?mem)?{????????????HeartbeatInfo::numPings++;
????????????mem.authIssue?=?false;
????????????if(?mem.upSince?==?0?)?{
????????????????log()?<<?"replSet?member?"?<<?h.toString()?<<?"?is?up"?<<?rsLog;
????????????????mem.upSince?=?mem.lastHeartbeat;
????????????}
????????????mem.health?=?1.0;
????????????mem.lastHeartbeatMsg?=?info["hbmsg"].String();
????????????if(?info.hasElement("opTime")?)
????????????????mem.opTime?=?info["opTime"].Date();
????????????//?see?if?this?member?is?in?the?electable?set
????????????//如果返回的心跳包含投票信息做下面的操作
????????????if(?info["e"].eoo()?)?{????
????????????????//?for?backwards?compatibility
????????????????const?Member?*member?=?theReplSet->findById(mem.id());
????????????????if?(member?&&?member->config().potentiallyHot())?{???//對方是否為活著的節點
????????????????????theReplSet->addToElectable(mem.id());?????//目標節點獲得投票權
????????????????}
????????????????else?{
????????????????????theReplSet->rmFromElectable(mem.id());????//目標節點失去投票權
????????????????}
????????????}
????????????//?add?this?server?to?the?electable?set?if?it?is?within?10
????????????//?seconds?of?the?latest?optime?we?know?of
????????????else?if(?info["e"].trueValue()?&&
?????????????????????mem.opTime?>=?theReplSet->lastOpTimeWritten.getSecs()?-?10)?{
????????????????unsigned?lastOp?=?theReplSet->lastOtherOpTime().getSecs();
????????????????if?(lastOp?>?0?&&?mem.opTime?>=?lastOp?-?10)?{
????????????????????theReplSet->addToElectable(mem.id());
????????????????}
????????????}
????????????else?{
????????????????theReplSet->rmFromElectable(mem.id());
????????????}
????????????//如果返回的信息中包含設置信息,做同步設置的工作
????????????be?cfg?=?info["config"];
????????????if(?cfg.ok()?)?{
????????????????//?received?a?new?config
????????????????boost::function<void()>?f?=
????????????????????boost::bind(&Manager::msgReceivedNewConfig,?theReplSet->mgr,?cfg.Obj().copy());
????????????????theReplSet->mgr->send(f);
????????????}
????????}
????};
連接失敗時,表示目標節點離線。
?
void?ReplSetHealthPollTask::down(HeartbeatInfo&?mem,?string?msg)?{????????????//設置各種健康狀態
????????????mem.authIssue?=?false;
????????????mem.health?=?0.0;
????????????mem.ping?=?0;
????????????//打印log信息
????????????if(?mem.upSince?||?mem.downSince?==?0?)?{
????????????????mem.upSince?=?0;
????????????????mem.downSince?=?jsTime();
????????????????mem.hbstate?=?MemberState::RS_DOWN;
????????????????log()?<<?"replSet?info?"?<<?h.toString()?<<?"?is?down?(or?slow?to?respond):?"?<<?msg?<<?rsLog;
????????????}
????????????//剝奪節點投票權
????????????mem.lastHeartbeatMsg?=?msg;
????????????theReplSet->rmFromElectable(mem.id());
????????}
如果在配置階段接收到了不是自己列表中節點發出的心跳檢測請求,可以檢測新節點的配置信息。具體代碼在heartbeat.cpp中的CmdReplSetHeartbeat(line:104)中實現。
5. ReplSet間的投票過程
投票過程是伴隨在心跳檢測和消息傳遞的過程中的,下面的代碼閱讀都本著只關心頭片相關過程的代碼閱讀,其他枝節部分就略去了。
投票過程的序列圖:
a) 消息傳遞
消息傳遞的過程是由心跳檢測來完成的,在心跳檢測時,單個節點既是發送方也是接收方。
作為發送方,向其他節點發出:
?
//heartbeat.cpp?line?134????bool?requestHeartbeat(string?setName,?string?from,?string?memberFullName,?BSONObj&?result,
??????????????????????????int?myCfgVersion,?int&?theirCfgVersion,?bool?checkEmpty)?{
????????if(?replSetBlind?)?{
????????????return?false;
????????}
????????//從這段代碼中看出,節點向外傳出的是節點名稱、節點的設置版本號和路徑
????????BSONObj?cmd?=?BSON(?"replSetHeartbeat"?<<?setName?<<
????????????????????????????"v"?<<?myCfgVersion?<<
????????????????????????????"pv"?<<?1?<<
????????????????????????????"checkEmpty"?<<?checkEmpty?<<
????????????????????????????"from"?<<?from?);
????????//?generally?not?a?great?idea?to?do?outbound?waiting?calls?in?a
????????//?write?lock.?heartbeats?can?be?slow?(multisecond?to?respond),?so
????????//?generally?we?don't?want?to?be?locked,?at?least?not?without
????????//?thinking?acarefully?about?it?first.
????????massert(15900,?"can't?heartbeat:?too?much?lock",
????????????????!Lock::somethingWriteLocked()?||?theReplSet?==?0?||?!theReplSet->lockedByMe()?);
????????ScopedConn?conn(memberFullName);
????????return?conn.runCommand("admin",
???????????????????????????????cmd,
???????????????????????????????result,
???????????????????????????????0,
???????????????????????????????&AuthenticationTable::getInternalSecurityAuthenticationTable());
????}
????????bool?ReplSetHealthPollTask::_requestHeartbeat(HeartbeatInfo&?mem,?BSONObj&?info,?int&?theirConfigVersion)?{
????????????if?(tries++?%?threshold?==?(threshold?-?1))?{
????????????????ScopedConn?conn(h.toString());
????????????????conn.reconnect();
????????????}
????????????Timer?timer;
????????????time_t?before?=?curTimeMicros64()?/?1000000;
????????????//調用函數傳出消息
????????????bool?ok?=?requestHeartbeat(theReplSet->name(),?theReplSet->selfFullName(),
???????????????????????????????????????h.toString(),?info,?theReplSet->config().version,?theirConfigVersion);
????????//info是傳出心跳檢測后的返回值
???????//根據返回值同步記錄同步的偏差時間
????????????mem.ping?=?(unsigned?int)timer.millis();
????????????//?we?set?this?on?any?response?-?we?don't?get?this?far?if
????????????//?couldn't?connect?because?exception?is?thrown
????????????time_t?after?=?mem.lastHeartbeat?=?before?+?(mem.ping?/?1000);
????????????if?(?info["time"].isNumber()?)?{
????????????????long?long?t?=?info["time"].numberLong();
????????????????if(?t?>?after?)
????????????????????mem.skew?=?(int)?(t?-?after);
????????????????else?if(?t?<?before?)
????????????????????mem.skew?=?(int)?(t?-?before);?//?negative
????????????}
????????????else?{
????????????????//?it?won't?be?there?if?remote?hasn't?initialized?yet
????????????????if(?info.hasElement("time")?)
????????????????????warning()?<<?"heatbeat.time?isn't?a?number:?"?<<?info?<<?endl;
????????????????mem.skew?=?INT_MIN;
????????????}
????????????{
????????????????//記錄下其他節點的狀態
????????????????be?state?=?info["state"];
????????????????if(?state.ok()?)
????????????????????mem.hbstate?=?MemberState(state.Int());
????????????}
????????????return?ok;
????????}
?
?
//消息返回后的另一部分狀態處理????????void?ReplSetHealthPollTask::doWork()?{
????????????if?(?!theReplSet?)?{
????????????????LOG(2)?<<?"replSet?not?initialized?yet,?skipping?health?poll?this?round"?<<?rsLog;
????????????????return;
????????????}
????????????HeartbeatInfo?mem?=?m;
????????????HeartbeatInfo?old?=?mem;
????????????try?{
????????????????BSONObj?info;
????????????????int?theirConfigVersion?=?-10000;
????????????????bool?ok?=?_requestHeartbeat(mem,?info,?theirConfigVersion);
????????????????//?weight?new?ping?with?old?pings
????????????????//?on?the?first?ping,?just?use?the?ping?value
????????????????if?(old.ping?!=?0)?{
????????????????????mem.ping?=?(unsigned?int)((old.ping?*?.8)?+?(mem.ping?*?.2));
????????????????}
????????????????//……
????????????}
????????????catch(...)?{
????????????????//……
????????????}
????????????m?=?mem;
????????????//通知更新節點信息
????????????theReplSet->mgr->send(?boost::bind(&ReplSet::msgUpdateHBInfo,?theReplSet,?mem)?);
????????????static?time_t?last?=?0;
????????????time_t?now?=?time(0);
????????????//判斷節點信息是否被改變
????????????bool?changed?=?mem.changed(old);
????????????if(?changed?)?{
????????????????if(?old.hbstate?!=?mem.hbstate?)
????????????????????log()?<<?"replSet?member?"?<<?h.toString()?<<?"?is?now?in?state?"?<<?mem.hbstate.toString()?<<?rsLog;
????????????}
????????????//當其他節點信息信息改變或者前后兩次連接服務器的時間大于4秒,則更新一下自己節點的狀態(在函數中發出投票請求)
????????????if(?changed?||?now-last>4?)?{
????????????????last?=?now;
????????????????theReplSet->mgr->send(?boost::bind(&Manager::msgCheckNewState,?theReplSet->mgr)?);
????????????}
????????}
作為接收方:
//接收到數據后會調用這個函數????????virtual?bool?CmdReplSetHeartbeat::run(const?string&?,?BSONObj&?cmdObj,?int,?string&?errmsg,?BSONObjBuilder&?result,?bool?fromRepl)?{
????????????//略過鑒權等工作
????????????//……
????????????result.append("set",?theReplSet->name());???????//名稱
????????????result.append("state",?theReplSet->state().s);?????//節點狀態
????????????result.append("e",?theReplSet->iAmElectable());??//是否可以參加投票
????????????result.append("hbmsg",?theReplSet->hbmsg());???//心跳的一些信息(具體還沒看太明白)
????????????result.append("time",?(long?long)?time(0));???????//當前的服務器時間
????????????result.appendDate("opTime",?theReplSet->lastOpTimeWritten.asDate());?//最后一次寫操作的時間
????????????int?v?=?theReplSet->config().version;????//設置的版本信息
????????????result.append("v",?v);
????????????if(?v?>?cmdObj["v"].Int()?)
????????????????result?<<?"config"?<<?theReplSet->config().asBson();
????????????return?true;
????????}
?
b) 檢測那些節點可以投票
結合之前的心跳檢測和后面的消息傳遞,ReplSet會根據心跳檢測的結果調用addToElectable和rmFromElectable來添加和刪除投票節點。
值得一提的是,在Manager::msgCheckNewState()被調用時,會去判斷當前節點是否可以參與投票:
void?Manager::checkElectableSet()?{????????unsigned?otherOp?=?rs->lastOtherOpTime().getSecs();
????????
????????//?make?sure?the?electable?set?is?up-to-date
????????if?(rs->elect.aMajoritySeemsToBeUp()?&&??????????//看看是否有選上的可能
????????????rs->iAmPotentiallyHot()?&&??????????????????//看看自己是不是活躍節點
????????????(otherOp?==?0?||?rs->lastOpTimeWritten.getSecs()?>=?otherOp?-?10))?{???????//上次寫操作是否在10秒以內
????????????theReplSet->addToElectable(rs->selfId());
????????}
????????else?{
????????????theReplSet->rmFromElectable(rs->selfId());
????????}
//……
}
?c) 確定是不是要發起投票
當確定了那些節點可以投票以后,就要判斷是不是要發起投票了。
/**?called?as?the?health?threads?get?new?results?*/????void?Manager::msgCheckNewState()?{
????????{
????????????//判斷之前的節點狀態
????????????//……
????????????const?Member?*p?=?rs->box.getPrimary();?????????//
????????????if(?p?&&?p?!=?rs->_self?)?{
????????????????if(?!p->hbinfo().up()?||
????????????????????????!p->hbinfo().hbstate.primary()?)?{
????????????????????p?=?0;
????????????????????rs->box.setOtherPrimary(0);
????????????????}
????????????}
????????????const?Member?*p2;
????????????{
????????????????bool?two;
????????????????//判斷一下是不是還有別的主節點
????????????????//如果有兩個主節點說明就等待其他節點自己解決誰是主節點的問題
????????????????//返回的節點是確定的主節點
????????????????p2?=?findOtherPrimary(two);
????????????????if(?two?)?{
????????????????????/*?two?other?nodes?think?they?are?primary?(asynchronously?polled)?--?wait?for?things?to?settle?down.?*/
????????????????????log()?<<?"replSet?info?two?primaries?(transiently)"?<<?rsLog;
????????????????????return;
????????????????}
????????????}
????????????if(?p2?)?{
????????????????noteARemoteIsPrimary(p2);
????????????????return;
????????????}
????????????/*?didn't?find?anyone?who?wants?to?be?primary?*/
???????????//如果包含一個主節點
????????????if(?p?)?{
????????????????/*?we?are?already?primary?*/
????????????????//主節點不是自己,說明有人能做主
????????????????if(?p?!=?rs->_self?)?{
????????????????????rs->sethbmsg("error?p?!=?rs->self?in?checkNewState");
????????????????????log()?<<?"replSet?"?<<?p->fullName()?<<?rsLog;
????????????????????log()?<<?"replSet?"?<<?rs->_self->fullName()?<<?rsLog;
????????????????????return;
????????????????}
????????????????
????????????????//如果自己是主節點,需要將自己降級
????????????????if(?rs->elect.shouldRelinquish()?)?{
????????????????????log()?<<?"can't?see?a?majority?of?the?set,?relinquishing?primary"?<<?rsLog;
????????????????????rs->relinquish();
????????????????}
????????????????return;
????????????}
????????????if(?!rs->iAmPotentiallyHot()?)?{?//?if?not?we?never?try?to?be?primary
????????????????OCCASIONALLY?log()?<<?"replSet?I?don't?see?a?primary?and?I?can't?elect?myself"?<<?endl;
????????????????return;
????????????}
????????????//看自己有沒有可能成為主節點?
????????????/*?no?one?seems?to?be?primary.??shall?we?try?to?elect?ourself??*/
????????????if(?!rs->elect.aMajoritySeemsToBeUp()?)?{
????????????????static?time_t?last;
????????????????static?int?n;
????????????????int?ll?=?0;
????????????????if(?++n?>?5?)?ll++;
????????????????if(?last?+?60?>?time(0?)?)?ll++;
????????????????log(ll)?<<?"replSet?can't?see?a?majority,?will?not?try?to?elect?self"?<<?rsLog;
????????????????last?=?time(0);
????????????????return;
????????????}
????????????if(?!rs->iAmElectable()?)?{
????????????????return;
????????????}
????????????busyWithElectSelf?=?true;?//?don't?try?to?do?further?elections?&?such?while?we?are?already?working?on?one.
????????}
????????try?{
????????????//開始投票
????????????rs->elect.electSelf();
????????}
????????catch(RetryAfterSleepException&)?{
????????????/*?we?want?to?process?new?inbounds?before?trying?this?again.??so?we?just?put?a?checkNewstate?in?the?queue?for?eval?later.?*/
????????????requeue();
????????}
????????catch(...)?{
????????????log()?<<?"replSet?error?unexpected?assertion?in?rs?manager"?<<?rsLog;
????????}
????????busyWithElectSelf?=?false;
????}
?
d) 投票
作為心跳檢測的接收方,當其他節點信息做了改變或者對某個節點前后連接時差大于4秒,就有可能調用Manager::msgCheckNewState更改自己狀態并且發出投票請求。
與其說是投票不如說是就是一次詢問的過程,就是節點向其他節點詢問自己的狀態是否符合當主機的條件。
//詢問其他節點????bool?Consensus::weAreFreshest(bool&?allUp,?int&?nTies)?{
????????const?OpTime?ord?=?theReplSet->lastOpTimeWritten;
????????nTies?=?0;
????????verify(?!ord.isNull()?);
????????//組織請求數據
????????BSONObj?cmd?=?BSON(
??????????????????????????"replSetFresh"?<<?1?<<??????????
??????????????????????????"set"?<<?rs.name()?<<?????????????????//當前節點名稱
??????????????????????????"opTime"?<<?Date_t(ord.asDate())?<<????//最后一次寫入的時間
??????????????????????????"who"?<<?rs._self->fullName()?<<???????//當前節點路徑
??????????????????????????"cfgver"?<<?rs._cfg->version?<<?????????//當前節點設置的版本號
??????????????????????????"id"?<<?rs._self->id());?????????????????//當前節點的id
????????list<Target>?L;
????????int?ver;
????????/*?the?following?queries?arbiters,?even?though?they?are?never?fresh.??wonder?if?that?makes?sense.
???????????it?doesn't,?but?it?could,?if?they?"know"?what?freshness?it?one?day.??so?consider?removing
???????????arbiters?from?getTargets()?here.??although?getTargets?is?used?elsewhere?for?elections;?there
???????????arbiters?are?certainly?targets?-?so?a?"includeArbs"?bool?would?be?necessary?if?we?want?to?make
???????????not?fetching?them?herein?happen.
???????????*/
????????//獲得投票列表
????????rs.getTargets(L,?ver);
????????//發出請求
????????multiCommand(cmd,?L);
????????int?nok?=?0;
????????allUp?=?true;
????????//處理請求(稍后分析)
????????//……
????}//收到請求后的處理,忽略了驗證處理和比較版本配置版本號的代碼
????????bool?CmdReplSetFresh::shouldVeto(const?BSONObj&?cmdObj,?string&?errmsg)?{
????????????//?don't?veto?older?versions
????????????if?(cmdObj["id"].eoo())?{
????????????????//?they?won't?be?looking?for?the?veto?field
????????????????return?false;
????????????}
????????????unsigned?id?=?cmdObj["id"].Int();
????????????const?Member*?primary?=?theReplSet->box.getPrimary();???????????//當前的主服務器
????????????const?Member*?hopeful?=?theReplSet->findById(id);???????????????//希望成為主機的服務器(從上面的代碼看是發送請求方服務器)
????????????const?Member?*highestPriority?=?theReplSet->getMostElectable();????//當前節點心目中的主機
????????????//以下判斷發現不符合條件的就否決投票
????????????if(?!hopeful?)?{??//沒有目標服務器
????????????????errmsg?=?str::stream()?<<?"replSet?couldn't?find?member?with?id?"?<<?id;
????????????????return?true;
????????????}
????????????//如果當前的服務器是主機,而自己剛剛進行級別的調整
????????????else?if(?theReplSet->isPrimary()?&&?theReplSet->lastOpTimeWritten?>=?hopeful->hbinfo().opTime?)?{??
????????????????//?hbinfo?is?not?updated,?so?we?have?to?check?the?primary's?last?optime?separately
????????????????errmsg?=?str::stream()?<<?"I?am?already?primary,?"?<<?hopeful->fullName()?<<
????????????????????"?can?try?again?once?I've?stepped?down";
????????????????return?true;
????????????}
????????????//如果主服務器,剛剛進行級別調整
????????????else?if(?primary?&&?primary->hbinfo().opTime?>=?hopeful->hbinfo().opTime?)?{
????????????????//?other?members?might?be?aware?of?more?up-to-date?nodes
????????????????errmsg?=?str::stream()?<<?hopeful->fullName()?<<?"?is?trying?to?elect?itself?but?"?<<
????????????????????primary->fullName()?<<?"?is?already?primary?and?more?up-to-date";
????????????????return?true;
????????????}
????????????//當前節點記錄的最高優先級節點的優先級比目標節點高
????????????else?if(?highestPriority?&&?highestPriority->config().priority?>?hopeful->config().priority)?{
????????????????errmsg?=?str::stream()?<<?hopeful->fullName()?<<?"?has?lower?priority?than?"?<<?highestPriority->fullName();
????????????????return?true;
????????????}
????????????//當前節點不允許投票或者當前節點記錄的最高優先級節點的優先級比目標節點高(不明白為什么又判斷一遍)
????????????if?(?!theReplSet->isElectable(id)?||
????????????????(highestPriority?&&?highestPriority->config().priority?>?hopeful->config().priority))?{
????????????????return?true;
????????????}
????????????return?false;
????????}//得到投票數據以后解析
bool?Consensus::weAreFreshest(bool&?allUp,?int&?nTies)?{
???????//省略發送請求的部分
???????//請求返回后存儲在list<Target>中
????????int?nok?=?0;
????????allUp?=?true;
????????for(?list<Target>::iterator?i?=?L.begin();?i?!=?L.end();?i++?)?{
????????//i存儲這其他節點返回的結果
????????????if(?i->ok?)?{
????????????????nok++;
????????????????if(?i->result["fresher"].trueValue()?)?{??????//當前服務器不是最新的
????????????????????log()?<<?"not?electing?self,?we?are?not?freshest"?<<?rsLog;
????????????????????return?false;
????????????????}
????????????????//根據返回的操作時間,統計與自己時間相同的節點(這部分從代碼上是這個意思,不過后面的解析沒有太懂,從其他地方的注釋看,是個正在開發的功能)
????????????????OpTime?remoteOrd(?i->result["opTime"].Date()?);
????????????????if(?remoteOrd?==?ord?)
????????????????????nTies++;?????
????????????????verify(?remoteOrd?<=?ord?);
????????????????//查看是當前節點是否否決自己成為主節點
????????????????if(?i->result["veto"].trueValue()?)?{
????????????????????BSONElement?msg?=?i->result["errmsg"];
????????????????????if?(!msg.eoo())?{
????????????????????????log()?<<?"not?electing?self,?"?<<?i->toHost?<<?"?would?veto?with?'"?<<
????????????????????????????msg.String()?<<?"'"?<<?rsLog;
????????????????????}
????????????????????else?{
????????????????????????log()?<<?"not?electing?self,?"?<<?i->toHost?<<?"?would?veto"?<<?rsLog;
????????????????????}
????????????????????return?false;
????????????????}
????????????}
????????????else?{
????????????????DEV?log()?<<?"replSet?freshest?returns?"?<<?i->result.toString()?<<?rsLog;
????????????????allUp?=?false;
????????????}
????????}
????????LOG(1)?<<?"replSet?dev?we?are?freshest?of?up?nodes,?nok:"?<<?nok?<<?"?nTies:"?<<?nTies?<<?rsLog;
????????verify(?ord?<=?theReplSet->lastOpTimeWritten?);?//?<=?as?this?may?change?while?we?are?working...
????????return?true;
?????}
?如果沒有其他服務器投反對票,那么就向其他服務器發送設置自己為主節點的信息。之后,會向其他節點發送消息通知自己當選的消息,其他節點返回是否贊成,當贊成票過半的時候,自己當選。
void?Consensus::_electSelf()?{???????????//略去投票部分
????????????//……
?????????????//分發投票結果的確認部分
????????????BSONObj?electCmd?=?BSON(
???????????????????????????????????"replSetElect"?<<?1?<<
???????????????????????????????????"set"?<<?rs.name()?<<??????????????//節點名稱
???????????????????????????????????"who"?<<?me.fullName()?<<????????//節點路徑
???????????????????????????????????"whoid"?<<?me.hbinfo().id()?<<??????//節點的id
???????????????????????????????????"cfgver"?<<?rs._cfg->version?<<??????//節點的配置信息
???????????????????????????????????"round"?<<?OID::gen()?/*?this?is?just?for?diagnostics?*/
???????????????????????????????);
????????????int?configVersion;
????????????list<Target>?L;
????????????rs.getTargets(L,?configVersion);
????????????multiCommand(electCmd,?L);????//請求發送
?
//處理確認請求????void?Consensus::electCmdReceived(BSONObj?cmd,?BSONObjBuilder*?_b)?{
????????BSONObjBuilder&?b?=?*_b;
????????DEV?log()?<<?"replSet?received?elect?msg?"?<<?cmd.toString()?<<?rsLog;
????????else?LOG(2)?<<?"replSet?received?elect?msg?"?<<?cmd.toString()?<<?rsLog;
????????string?set?=?cmd["set"].String();
????????unsigned?whoid?=?cmd["whoid"].Int();
????????int?cfgver?=?cmd["cfgver"].Int();
????????OID?round?=?cmd["round"].OID();
????????int?myver?=?rs.config().version;
????????const?Member*?primary?=?rs.box.getPrimary();
????????const?Member*?hopeful?=?rs.findById(whoid);
????????const?Member*?highestPriority?=?rs.getMostElectable();
????????int?vote?=?0;
????????if(?set?!=?rs.name()?)?{
????????????log()?<<?"replSet?error?received?an?elect?request?for?'"?<<?set?<<?"'?but?our?set?name?is?'"?<<?rs.name()?<<?"'"?<<?rsLog;
????????}
????????else?if(?myver?<?cfgver?)?{????//如果接收方的版本號小于目標節點的版本號
????????????//?we?are?stale.??don't?vote??不做其他處理
????????}
????????else?if(?myver?>?cfgver?)?{????//如果接收方的版本號大于目標節點版本號,對方版本號過期,不贊成
????????????//?they?are?stale!
????????????log()?<<?"replSet?electCmdReceived?info?got?stale?version?#?during?election"?<<?rsLog;
????????????vote?=?-10000;
????????}
????????else?if(?!hopeful?)?{????//目標節點id沒有在接收方節點中掛名,不贊成
????????????log()?<<?"replSet?electCmdReceived?couldn't?find?member?with?id?"?<<?whoid?<<?rsLog;
????????????vote?=?-10000;
????????}
????????//如果主節點是自己,并且自己的最后寫入時間比目標節點新,不贊成
????????else?if(?primary?&&?primary?==?rs._self?&&?rs.lastOpTimeWritten?>=?hopeful->hbinfo().opTime?)?{
????????????//?hbinfo?is?not?updated,?so?we?have?to?check?the?primary's?last?optime?separately
????????????log()?<<?"I?am?already?primary,?"?<<?hopeful->fullName()
??????????????????<<?"?can?try?again?once?I've?stepped?down"?<<?rsLog;
????????????vote?=?-10000;
????????}
????????//如果接收方認為的主節點的寫入時間比目標節點的寫入時間新,不贊成
????????else?if(?primary?&&?primary->hbinfo().opTime?>=?hopeful->hbinfo().opTime?)?{
????????????//?other?members?might?be?aware?of?more?up-to-date?nodes
????????????log()?<<?hopeful->fullName()?<<?"?is?trying?to?elect?itself?but?"?<<
??????????????????primary->fullName()?<<?"?is?already?primary?and?more?up-to-date"?<<?rsLog;
????????????vote?=?-10000;
????????}
????????//接收方認為當選呼聲最高的節點比目標節點的當選呼聲高,不贊成
????????else?if(?highestPriority?&&?highestPriority->config().priority?>?hopeful->config().priority)?{
????????????log()?<<?hopeful->fullName()?<<?"?has?lower?priority?than?"?<<?highestPriority->fullName();
????????????vote?=?-10000;
????????}
????????else?{
????????????//如果滿足上面的所有條件
????????????try?{
????????????????vote?=?yea(whoid);?????//算出贊成度
????????????????dassert(?hopeful->id()?==?whoid?);
????????????????rs.relinquish();
????????????????log()?<<?"replSet?info?voting?yea?for?"?<<??hopeful->fullName()?<<?"?("?<<?whoid?<<?')'?<<?rsLog;
????????????}
????????????catch(VoteException&)?{
????????????????log()?<<?"replSet?voting?no?for?"?<<?hopeful->fullName()?<<?"?already?voted?for?another"?<<?rsLog;
????????????}
????????}
????????//組織返回數據
????????b.append("vote",?vote);??
????????b.append("round",?round);
????} //處理返回結果?
???????????{
????????????????for(?list<Target>::iterator?i?=?L.begin();?i?!=?L.end();?i++?)?{
????????????????????DEV?log()?<<?"replSet?elect?res:?"?<<?i->result.toString()?<<?rsLog;
????????????????????if(?i->ok?)?{
????????????????????????int?v?=?i->result["vote"].Int();
????????????????????????tally?+=?v;???//統計贊成結果
????????????????????}
????????????????}
????????????????if(?tally*2?<=?totalVotes()?)?{??//贊成結果小于投票總數的一半,終止成為主節點
????????????????????log()?<<?"replSet?couldn't?elect?self,?only?received?"?<<?tally?<<?"?votes"?<<?rsLog;
????????????????}
????????????????else?if(?time(0)?-?start?>?30?)?{??//投票時間大于三十秒,終止成為主節點
????????????????????//?defensive;?should?never?happen?as?we?have?timeouts?on?connection?and?operation?for?our?conn
????????????????????log()?<<?"replSet?too?much?time?passed?during?our?election,?ignoring?result"?<<?rsLog;
????????????????}
????????????????else?if(?configVersion?!=?rs.config().version?)?{???//傳出的版本號與返回的版本號不一致(說明投票過程中有版本修改),終止成為主節點
????????????????????log()?<<?"replSet?config?version?changed?during?our?election,?ignoring?result"?<<?rsLog;
????????????????}
????????????????else?{
????????????????????/*?succeeded.?*/
????????????????????log(1)?<<?"replSet?election?succeeded,?assuming?primary?role"?<<?rsLog;
????????????????????success?=?true;
????????????????????rs.assumePrimary();??//當選為主節點
????????????????}
????????????}
?e) 總結投票過程
在心跳檢測中,節點間互相傳遞著信息。通過這些信息,節點能了解到其他節點的情況(配置版本,是否能連接上等等)。單個節點統計著這些信息,當某個節點設置發生改變,或者網絡連接出現異常的時候,開始發送投票事件。
投票時,首先根據心跳檢測記錄的信息判斷哪些節點可以被連接到,即有投票權。之后向所有可以連接到的節點,發出自己能否成為主節點的請求。其他節點投是否否決票,如果一致通過,該節點進行下一步操作:向其他節點發出自己當選的請求,其他節點根據情況確定請求,如果返回的贊成數大于投票總數的一半,該節點當選。
轉載于:https://www.cnblogs.com/biosli/archive/2012/10/19/2730776.html
總結
以上是生活随笔為你收集整理的MongoDB源码阅读之ReplSet源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 消息称三星Galaxy Z Fold4价
- 下一篇: 阿里巴巴员工人数超24万:上半年减少13