kafka协调者
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
我們先假設(shè)初始時(shí)世界是混沌的還沒(méi)有盤(pán)古的開(kāi)天辟地,協(xié)調(diào)者也是一片荒蕪人煙之地,沒(méi)有保存任何狀態(tài),因?yàn)橄M(fèi)組的初始狀態(tài)是Stable,在第一次的Rebalance時(shí),正常的還沒(méi)有向消費(fèi)組注冊(cè)過(guò)的消費(fèi)者會(huì)執(zhí)行狀態(tài)為Stable而且memberId=UNKNOWN_MEMBER_ID條件分支。在第一次Rebalance之后,每個(gè)消費(fèi)者都分配到了一個(gè)成員編號(hào),系統(tǒng)又會(huì)進(jìn)入Stable穩(wěn)定狀態(tài)(Stable穩(wěn)定狀態(tài)包括兩種:一種是沒(méi)有任何消費(fèi)者的穩(wěn)定狀態(tài),一種是有消費(fèi)者的穩(wěn)定狀態(tài))。因?yàn)樗邢M(fèi)者在執(zhí)行一次JoinGroup后并不是說(shuō)系統(tǒng)就一直保持這種不變的狀態(tài),有可能因?yàn)檫@樣或那樣的事件導(dǎo)致消費(fèi)者要重新進(jìn)行JoinGroup,這個(gè)時(shí)候因?yàn)橹癑oinGroup過(guò)了每個(gè)消費(fèi)者都是有成員編號(hào)的,處理方式肯定是不一樣的。
所以定義一種事件驅(qū)動(dòng)的狀態(tài)機(jī)就很有必要了,這世界看起來(lái)是雜亂無(wú)章的,不過(guò)只要遵循著狀態(tài)機(jī)的規(guī)則(萬(wàn)物生長(zhǎng)的理論),任何事件都是有跡可循有路可走有條不紊地進(jìn)行著。
?
private def doJoinGroup(group: GroupMetadata,memberId: String,clientId: String,clientHost: String,sessionTimeoutMs: Int,protocolType: String,protocols: List[(String, Array[Byte])],responseCallback: JoinCallback) {if (group.protocolType!=protocolType||!group.supportsProtocols(protocols.map(_._1).toSet)) {//protocolType對(duì)于消費(fèi)者是consumer,注意這里的協(xié)議類型和PartitionAssignor協(xié)議不同哦//協(xié)議類型目前總共就兩種消費(fèi)者和Worker,而協(xié)議是PartitionAssignor分配算法responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))} else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {//如果當(dāng)前組沒(méi)有記錄該消費(fèi)者,而該消費(fèi)者卻被分配了成員編號(hào),則重置為未知成員,并讓消費(fèi)者重試responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))} else { group.currentState match {case Dead =>responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))case PreparingRebalance =>if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //2.第二個(gè)消費(fèi)者在這里了!addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)} else {val member = group.get(memberId)updateMemberAndRebalance(group, member, protocols, responseCallback)}case Stable =>if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //1.初始時(shí)第一個(gè)消費(fèi)者在這里!//如果消費(fèi)者成員編號(hào)是未知的,則向GroupMetadata注冊(cè)并被記錄下來(lái)addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)} else { //3.第二次Rebalance時(shí)第一個(gè)消費(fèi)者在這里,此時(shí)要分Leader還是普通的消費(fèi)者了val member = group.get(memberId)if (memberId == group.leaderId || !member.matches(protocols)) {updateMemberAndRebalance(group, member, protocols, responseCallback)} else {responseCallback(JoinGroupResult(members = Map.empty,memberId = memberId,generationId = group.generationId,subProtocol = group.protocol,leaderId = group.leaderId,errorCode = Errors.NONE.code))}}}if (group.is(PreparingRebalance))joinPurgatory.checkAndComplete(GroupKey(group.groupId))} }addMemberAndRebalance和updateMemberAndRebalance會(huì)創(chuàng)建或更新MemberMetadata,并且會(huì)嘗試調(diào)用prepareRebalance,消費(fèi)組中只有一個(gè)消費(fèi)者有機(jī)會(huì)調(diào)用prepareRebalance,并且一旦調(diào)用該方法,會(huì)將消費(fèi)組狀態(tài)更改為PreparingRebalance,就會(huì)使得下一個(gè)消費(fèi)者只能從case PreparingRebalance入口進(jìn)去了,假設(shè)第一個(gè)消費(fèi)者是從Stable進(jìn)入的,它更改了狀態(tài)為PreparingRebalance,下一個(gè)消費(fèi)者就不會(huì)從Stable進(jìn)來(lái)的。不過(guò)進(jìn)入Stable狀態(tài)還要判斷消費(fèi)者是不是已經(jīng)有了成員編號(hào),通常是之前已經(jīng)發(fā)生了Rebalance,這種影響也是比較巨大的,每個(gè)消費(fèi)者走的路徑跟第一次的Rebalance是完全不同的迷宮地圖了。
1)第一次Rebalance如圖6-18的上半部分:
?
圖6-18 第一次和第二次Rebalance
2)第二次Rebalance,對(duì)于之前加入過(guò)的消費(fèi)者都要成員編號(hào)如圖6-18的下半部分:
3)不過(guò)如果有消費(fèi)者在Leader之前發(fā)送又有點(diǎn)不一樣了如圖6-19:
?
圖6-19 Leader非第一個(gè)發(fā)送JoinGroup請(qǐng)求
4)如果第一個(gè)消費(fèi)者不是Leader,也沒(méi)有編號(hào),說(shuō)明這是一個(gè)新增的消費(fèi)者,流程又不同了如圖6-20:
?
圖6-20 新增消費(fèi)組第一個(gè)發(fā)送JoinGroup請(qǐng)求
根據(jù)上面的幾種場(chǎng)景總結(jié)下來(lái)狀態(tài)機(jī)的規(guī)則和一些結(jié)論如下:
轉(zhuǎn)載于:https://my.oschina.net/u/2371517/blog/1142949
總結(jié)
- 上一篇: D1net阅闻:IBM宣布推出全新存储技
- 下一篇: 能源结构进入变革时代 光伏业趋于壮大转型