kafka 脚本发送_Kafka笔记归纳(第五部分:一致性保证,消息重复消费场景及解决方式)...
寫(xiě)在開(kāi)頭:
本章是Kafka學(xué)習(xí)歸納第五部分,著重于強(qiáng)調(diào)Kafka的事一致性保證,消息重復(fù)消費(fèi)場(chǎng)景及解決方式,記錄偏移量的主題,延時(shí)隊(duì)列的知識(shí)點(diǎn)。
文章內(nèi)容輸出來(lái)源:拉勾教育大數(shù)據(jù)高薪訓(xùn)練營(yíng)。
一致性保證
水位標(biāo)記
水位或水印(watermark)一詞,表示位置信息,即位移(offset)。Kafka源碼中使用的名字是高水位,HW(high watermark)。
LEO和HW
每個(gè)分區(qū)副本對(duì)象都有兩個(gè)重要的屬性:LEO和HW
LEO:即日志末端位移(log end offset),記錄了該副本日志中下一條消息的位移值。如果 LEO=10,那么表示該副本保存了10條消息,位移值范圍是[0, 9]。另外,Leader LEO和 Follower LEO的更新是有區(qū)別的。
HW:即上面提到的水位值。對(duì)于同一個(gè)副本對(duì)象而言,其HW值不會(huì)大于LEO值。小于等于 HW值的所有消息都被認(rèn)為是“已備份”的(replicated)。Leader副本和Follower副本的HW更新不同
上圖中,HW值是7,表示位移是 0~7 的所有消息都已經(jīng)處于“已提交狀態(tài)”(committed),而LEO值是14,8~13的消息就是未完全備份(fully replicated)——為什么沒(méi)有14?LEO指向的是下一條消息到來(lái)時(shí)的位移。
消費(fèi)者無(wú)法消費(fèi)分區(qū)下Leader副本中位移大于分區(qū)HW的消息
Follower副本何時(shí)更新LEO
Follower副本不停地向Leader副本所在的broker發(fā)送FETCH請(qǐng)求,一旦獲取消息后寫(xiě)入自己的日志中進(jìn)行備份。那么Follower副本的LEO是何時(shí)更新的呢?首先我必須言明,Kafka有兩套Follower副本
LEO:
1. 一套LEO保存在Follower副本所在Broker的副本管理機(jī)中;
2. 另一套LEO保存在Leader副本所在Broker的副本管理機(jī)中。Leader副本機(jī)器上保存了所有的follower副本的LEO。
Kafka使用前者幫助Follower副本更新其HW值;利用后者幫助Leader副本更新其HW。
1. Follower副本的本地LEO何時(shí)更新? Follower副本的LEO值就是日志的LEO值,每當(dāng)新寫(xiě)入一條消息,LEO值就會(huì)被更新。當(dāng)Follower發(fā)送FETCH請(qǐng)求后,Leader將數(shù)據(jù)返回給Follower,此時(shí)Follower開(kāi)始Log寫(xiě)數(shù)據(jù),從而自動(dòng)更新LEO值。
2. Leader端Follower的LEO何時(shí)更新? Leader端的Follower的LEO更新發(fā)生在Leader在處理 Follower FETCH請(qǐng)求時(shí)。一旦Leader接收到Follower發(fā)送的FETCH請(qǐng)求,它先從Log中讀取 相應(yīng)的數(shù)據(jù),給Follower返回?cái)?shù)據(jù)前,先更新Follower的LEO。
Follower副本何時(shí)更新HW
Follower更新HW發(fā)生在其更新LEO之后,一旦Follower向Log寫(xiě)完數(shù)據(jù),嘗試更新自己的HW值。
比較當(dāng)前LEO值與FETCH響應(yīng)中Leader的HW值,取兩者的小者作為新的HW值。
即:如果Follower的LEO大于Leader的HW,Follower HW值不會(huì)大于Leader的HW值。
Leader副本何時(shí)更新LEO
和Follower更新LEO相同,Leader寫(xiě)Log時(shí)自動(dòng)更新自己的LEO值。
Leader副本何時(shí)更新HW值
Leader的HW值就是分區(qū)HW值,直接影響分區(qū)數(shù)據(jù)對(duì)消費(fèi)者的可見(jiàn)性
Leader會(huì)嘗試去更新分區(qū)HW的四種情況:
1. Follower副本成為L(zhǎng)eader副本時(shí):Kafka會(huì)嘗試去更新分區(qū)HW。
2. Broker崩潰導(dǎo)致副本被踢出ISR時(shí):檢查下分區(qū)HW值是否需要更新是有必要的。
3. 生產(chǎn)者向Leader副本寫(xiě)消息時(shí):因?yàn)閷?xiě)入消息會(huì)更新Leader的LEO,有必要檢查HW值是否需要更新
4. Leader處理Follower FETCH請(qǐng)求時(shí):首先從Log讀取數(shù)據(jù),之后嘗試更新分區(qū)HW值
結(jié)論:
當(dāng)Kafka broker都正常工作時(shí),分區(qū)HW值的更新時(shí)機(jī)有兩個(gè):
1. Leader處理PRODUCE請(qǐng)求時(shí)
2. Leader處理FETCH請(qǐng)求時(shí)。
Leader如何更新自己的HW值?Leader broker上保存了一套Follower副本的LEO以及自己的LEO。當(dāng)嘗試確定分區(qū)HW時(shí),它會(huì)選出所有滿(mǎn)足條件的副本,比較它們的LEO(包括Leader的LEO),并選擇最小的LEO值作為HW值。
需要滿(mǎn)足的條件,(二選一):
1. 處于ISR中
2. 副本LEO落后于Leader LEO的時(shí)長(zhǎng)不大于 replica.lag.time.max.ms 參數(shù)值(默認(rèn)是10s)
如果Kafka只判斷第一個(gè)條件的話(huà),確定分區(qū)HW值時(shí)就不會(huì)考慮這些未在ISR中的副本,但這些副本已經(jīng)具備了“立刻進(jìn)入ISR”的資格,因此就可能出現(xiàn)分區(qū)HW值越過(guò)ISR中副本LEO的情況——不允許。因?yàn)榉謪^(qū)HW定義就是ISR中所有副本LEO的最小值
消息重復(fù)的場(chǎng)景及解決方案
消息重復(fù)和丟失是kafka中很常見(jiàn)的問(wèn)題,主要發(fā)生在以下三個(gè)階段:
1. 生產(chǎn)者階段
2. broke階段
3. 消費(fèi)者階段
生產(chǎn)者階段重復(fù)場(chǎng)景
生產(chǎn)發(fā)送的消息沒(méi)有收到正確的broke響應(yīng),導(dǎo)致生產(chǎn)者重試。
生產(chǎn)者發(fā)出一條消息,broke落盤(pán)以后因?yàn)榫W(wǎng)絡(luò)等種種原因發(fā)送端得到一個(gè)發(fā)送失敗的響應(yīng)或者網(wǎng)絡(luò)中斷,然后生產(chǎn)者收到一個(gè)可恢復(fù)的Exception重試消息導(dǎo)致消息重復(fù)。
生產(chǎn)者發(fā)送重復(fù)解決方案
啟動(dòng)kafka的冪等性
要啟動(dòng)kafka的冪等性,設(shè)置: enable.idempotence=true ,以及 ack=all 以及 retries > 1
ack=0,不重試。 可能會(huì)丟消息,適用于吞吐量指標(biāo)重要性高于數(shù)據(jù)丟失,例如:日志收集。
生產(chǎn)者和broke階段消息丟失場(chǎng)景
ack=0,不重試
生產(chǎn)者發(fā)送消息完,不管結(jié)果了,如果發(fā)送失敗也就丟失了。
ack=1,leader crash
生產(chǎn)者發(fā)送消息完,只等待Leader寫(xiě)入成功就返回了,Leader分區(qū)丟失了,此時(shí)Follower沒(méi)來(lái)及同步,消息丟失
unclean.leader.election.enable 配置true
允許選舉ISR以外的副本作為leader,會(huì)導(dǎo)致數(shù)據(jù)丟失,默認(rèn)為false。生產(chǎn)者發(fā)送異步消息,只等待Lead寫(xiě)入成功就返回,Leader分區(qū)丟失,此時(shí)ISR中沒(méi)有Follower,Leader從OSR中選舉,因?yàn)镺SR中本來(lái)落后于Leader造成消息丟失
解決生產(chǎn)者和broke階段消息丟失
禁用unclean選舉,ack=all
ack=all / -1,tries > 1,unclean.leader.election.enable=false生產(chǎn)者發(fā)完消息,等待Follower同步完再返回,如果異常則重試。副本的數(shù)量可能影響吞吐量,不超過(guò)5個(gè),一般三個(gè)。 不允許unclean Leader選舉。
配置:min.insync.replicas > 1
當(dāng)生產(chǎn)者將 acks 設(shè)置為 all (或 -1 )時(shí), min.insync.replicas>1 。指定確認(rèn)消息寫(xiě)成功需要的最小副本數(shù)量。達(dá)不到這個(gè)最小值,生產(chǎn)者將引發(fā)一個(gè)異常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。
當(dāng)一起使用時(shí), min.insync.replicas 和 ack 允許執(zhí)行更大的持久性保證。一個(gè)典型的場(chǎng)景是創(chuàng)建一個(gè)復(fù)制因子為3的主題,設(shè)置min.insync復(fù)制到2個(gè),用 all 配置發(fā)送。將確保如果大多數(shù)副本沒(méi)有收到寫(xiě)操作,則生產(chǎn)者將引發(fā)異常。
失敗的offset單獨(dú)記錄
生產(chǎn)者發(fā)送消息,會(huì)自動(dòng)重試,遇到不可恢復(fù)異常會(huì)拋出,這時(shí)可以捕獲異常記錄到數(shù)據(jù)庫(kù)或緩存,進(jìn)行單獨(dú)處理。
消費(fèi)者數(shù)據(jù)重復(fù)場(chǎng)景及解決方案
數(shù)據(jù)消費(fèi)完沒(méi)有及時(shí)提交offset到broker。
消息消費(fèi)端在消費(fèi)過(guò)程中掛掉沒(méi)有及時(shí)提交offset到broke,另一個(gè)消費(fèi)端啟動(dòng)拿之前記錄的offset開(kāi)始消費(fèi),由于offset的滯后性可能會(huì)導(dǎo)致新啟動(dòng)的客戶(hù)端有少量重復(fù)消費(fèi)。
解決方案
取消自動(dòng)提交
每次消費(fèi)完或者程序退出時(shí)手動(dòng)提交。這可能也沒(méi)法保證一條重復(fù)。
下游做冪等
一般是讓下游做冪等或者盡量每消費(fèi)一條消息都記錄offset,對(duì)于少數(shù)嚴(yán)格的場(chǎng)景可能需要把 offset或唯一ID(例如訂單ID)和下游狀態(tài)更新放在同一個(gè)數(shù)據(jù)庫(kù)里面做事務(wù)來(lái)保證精確的一次更新或者在下游數(shù)據(jù)表里面同時(shí)記錄消費(fèi)offset,然后更新下游數(shù)據(jù)的時(shí)候用消費(fèi)位移做樂(lè)觀鎖拒絕舊位移的數(shù)據(jù)更新。
__consumer_offsets
Kafka 1.0.2將consumer的位移信息保存在Kafka內(nèi)部的topic中,即__consumer_offsets主題,并且默認(rèn)提供了kafka_consumer_groups.sh腳本供用戶(hù)查看consumer信息。
創(chuàng)建topic “tp_test_01”
kafka-topics.sh --zookeeper node1:2181/myKafka --create -- topic tp_test_01 --partitions 5 --replication-factor 1使用kafka-console-producer.sh腳本生產(chǎn)消息
[root@node1 ~]# for i in `seq 60`; do echo "hello lagou $i" >> messages.txt; done [root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt由于默認(rèn)沒(méi)有指定key,所以根據(jù)round-robin方式,消息分布到不同的分區(qū)上。 (本例中生產(chǎn)了60條消息)
驗(yàn)證消息生產(chǎn)成功
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node1:9092 --topic tp_test_01 --time -1創(chuàng)建一個(gè)console consumer group
kafka-console-consumer.sh --bootstrap-server linux121:9092 --topic tp_test_01 --from-beginning獲取該consumer group的group id(后面需要根據(jù)該id查詢(xún)它的位移信息)
kafka-consumer-groups.sh --bootstrap-server linux121:9092 --list查詢(xún)__consumer_offsets topic所有內(nèi)容
注意:運(yùn)行下面命令前先要在consumer.properties中設(shè)置exclude.internal.topics=false
kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning默認(rèn)情況下__consumer_offsets有50個(gè)分區(qū),如果你的系統(tǒng)中consumer group也很多的話(huà),那么這個(gè)命令的輸出結(jié)果會(huì)很多。
計(jì)算指定consumer group在__consumer_offsets topic中分區(qū)信息
這時(shí)候就用到了group.id :console-consumer-77682
Kafka會(huì)使用下面公式計(jì)算該group位移保存在__consumer_offsets的哪個(gè)分區(qū)上:
Math.abs(groupID.hashCode()) % numPartitions即
__consumer_offsets的分區(qū)41保存了這個(gè)consumer group的位移信息。
獲取指定consumer group的位移信息
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 41 --broker-list linux121:9092 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"可以看到__consumer_offsets topic的每一日志項(xiàng)的格式都是:
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
延時(shí)隊(duì)列
兩個(gè)follower副本都已經(jīng)拉取到了leader副本的最新位置,此時(shí)又向leader副本發(fā)送拉取請(qǐng)求,而leader副本并沒(méi)有新的消息寫(xiě)入,那么此時(shí)leader副本該如何處理呢?可以直接返回空的拉取結(jié)果給follower副本,不過(guò)在leader副本一直沒(méi)有新消息寫(xiě)入的情況下,follower副本會(huì)一直發(fā)送拉取請(qǐng)求,并且總收到空的拉取結(jié)果,消耗資源。
Kafka在處理拉取請(qǐng)求時(shí),會(huì)先讀取一次日志文件,如果收集不到足夠多(fetchMinBytes,由參數(shù)fetch.min.bytes配置,默認(rèn)值為1)的消息,那么就會(huì)創(chuàng)建一個(gè)延時(shí)拉取操作(DelayedFetch)以等待拉取到足夠數(shù)量的消息。當(dāng)延時(shí)拉取操作執(zhí)行時(shí),會(huì)再讀取一次日志文件,然后將拉取結(jié)果返回給follower副本。
延遲操作不只是拉取消息時(shí)的特有操作,在Kafka中有多種延時(shí)操作,比如延時(shí)數(shù)據(jù)刪除、延時(shí)生產(chǎn)等。
對(duì)于延時(shí)生產(chǎn)(消息)而言,如果在使用生產(chǎn)者客戶(hù)端發(fā)送消息的時(shí)候?qū)cks參數(shù)設(shè)置為-1,那么就意味著需要等待ISR集合中的所有副本都確認(rèn)收到消息之后才能正確地收到響應(yīng)的結(jié)果,或者捕獲超時(shí)異常。
假設(shè)某個(gè)分區(qū)有3個(gè)副本:leader、follower1和follower2,它們都在分區(qū)的ISR集合中。不考慮ISR變動(dòng)的情況,Kafka在收到客戶(hù)端的生產(chǎn)請(qǐng)求后,將消息3和消息4寫(xiě)入leader副本的本地日志文件。
由于客戶(hù)端設(shè)置了acks為-1,那么需要等到follower1和follower2兩個(gè)副本都收到消息3和消息4后才能告知客戶(hù)端正確地接收了所發(fā)送的消息。如果在一定的時(shí)間內(nèi),follower1副本或follower2副本沒(méi)能夠完全拉取到消息3和消息4,那么就需要返回超時(shí)異常給客戶(hù)端。生產(chǎn)請(qǐng)求的超時(shí)時(shí)間由參數(shù)request.timeout.ms配置,默認(rèn)值為30000,即30s。
那么這里等待消息3和消息4寫(xiě)入follower1副本和follower2副本,并返回相應(yīng)的響應(yīng)結(jié)果給客戶(hù)端的動(dòng)作是由誰(shuí)來(lái)執(zhí)行的呢?在將消息寫(xiě)入leader副本的本地日志文件之后,Kafka會(huì)創(chuàng)建一個(gè)延時(shí)的生產(chǎn)操作(DelayedProduce),用來(lái)處理消息正常寫(xiě)入所有副本或超時(shí)的情況,以返回相應(yīng)的響應(yīng)結(jié)果給客戶(hù)端。
延時(shí)操作需要延時(shí)返回響應(yīng)的結(jié)果,首先它必須有一個(gè)超時(shí)時(shí)間(delayMs),如果在這個(gè)超時(shí)時(shí)間內(nèi)沒(méi)有完成既定的任務(wù),那么就需要強(qiáng)制完成以返回響應(yīng)結(jié)果給客戶(hù)端。其次,延時(shí)操作不同于定時(shí)操作,定時(shí)操作是指在特定時(shí)間之后執(zhí)行的操作,而延時(shí)操作可以在所設(shè)定的超時(shí)時(shí)間之前完成,所以延時(shí)操作能夠支持外部事件的觸發(fā)。
就延時(shí)生產(chǎn)操作而言,它的外部事件是所要寫(xiě)入消息的某個(gè)分區(qū)的HW(高水位)發(fā)生增長(zhǎng)。也就是說(shuō),隨著follower副本不斷地與leader副本進(jìn)行消息同步,進(jìn)而促使HW進(jìn)一步增長(zhǎng),HW每增長(zhǎng)一次都會(huì)檢測(cè)是否能夠完成此次延時(shí)生產(chǎn)操作,如果可以就執(zhí)行以此返回響應(yīng)結(jié)果給客戶(hù)端;如果在超時(shí)時(shí)間內(nèi)始終無(wú)法完成,則強(qiáng)制執(zhí)行。
延時(shí)拉取操作,是由超時(shí)觸發(fā)或外部事件觸發(fā)而被執(zhí)行的。超時(shí)觸發(fā)很好理解,就是等到超時(shí)時(shí)間之后觸發(fā)第二次讀取日志文件的操作。外部事件觸發(fā)就稍復(fù)雜了一些,因?yàn)槔≌?qǐng)求不單單由follower副本發(fā)起,也可以由消費(fèi)者客戶(hù)端發(fā)起,兩種情況所對(duì)應(yīng)的外部事件也是不同的。如果是follower副本的延時(shí)拉取,它的外部事件就是消息追加到了leader副本的本地日志文件中;如果是消費(fèi)者客戶(hù)端的延時(shí)拉取,它的外部事件可以簡(jiǎn)單地理解為HW的增長(zhǎng)。
總結(jié)
以上是生活随笔為你收集整理的kafka 脚本发送_Kafka笔记归纳(第五部分:一致性保证,消息重复消费场景及解决方式)...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 深圳2020年生肖纪念币在哪里可以预约
- 下一篇: 华硕k550v系统怎么安装 华硕K550