flink checkpoint 恢复_干货:Flink+Kafka 0.11端到端精确一次处理语义实现
2017年12月Apache Flink社區(qū)發(fā)布了1.4版本。該版本正式引入了一個(gè)里程碑式的功能:兩階段提交Sink,即TwoPhaseCommitSinkFunction。該SinkFunction提取并封裝了兩階段提交協(xié)議中的公共邏輯,自此Flink搭配特定source和sink(特別是0.11版本Kafka)搭建精確一次處理語(yǔ)義(?exactly-once semantics)應(yīng)用成為了可能。作為一個(gè)抽象類(lèi)TwoPhaseCommitSinkFunction提供了一個(gè)抽象層供用戶自行實(shí)現(xiàn)特定方法來(lái)支持?exactly-once semantics。
用戶可以閱讀Java文檔來(lái)學(xué)習(xí)如何使用TwoPhaseCommitSinkFunction,或者參考Flink官網(wǎng)文檔來(lái)了解FlinkKafkaProducer011是如何支持 exactly-once semantics的,因?yàn)楹笳哒腔赥woPhaseCommitSinkFunction實(shí)現(xiàn)的。
本文將深入討論一下Flink 1.4這個(gè)新特性以及其背后的設(shè)計(jì)思想。在本文中我們將:
1. 描述Flink應(yīng)用中的checkpoint如何幫助確保exactly-once semantics
2. 展示Flink如何通過(guò)兩階段提交協(xié)議與source和sink交互以實(shí)現(xiàn)端到端的??exactly-once semantics交付保障
3. 給出一個(gè)使用TwoPhaseCommitSinkFunction實(shí)現(xiàn)?exactly-once semantics的文件Sink實(shí)例
1Flink應(yīng)用的僅一次處理當(dāng)談及僅一次處理時(shí),我們真正想表達(dá)的是每條輸入消息只會(huì)影響最終結(jié)果一次!【譯者:影響應(yīng)用狀態(tài)一次,而非被處理一次】即使出現(xiàn)機(jī)器故障或軟件崩潰,Flink也要保證不會(huì)有數(shù)據(jù)被重復(fù)處理或壓根就沒(méi)有被處理從而影響狀態(tài)。長(zhǎng)久以來(lái)Flink一直宣稱支持 exactly-once semantics是指在一個(gè)Flink應(yīng)用內(nèi)部。在過(guò)去的幾年間,Flink開(kāi)發(fā)出了checkpointing機(jī)制,而它則是提供這種應(yīng)用內(nèi)僅一次處理的基石。
在繼續(xù)之前我們簡(jiǎn)要總結(jié)一下checkpointing算法,這對(duì)于我們了解本文內(nèi)容至關(guān)重要。簡(jiǎn)單來(lái)說(shuō),一個(gè)Flink checkpoint是一個(gè)一致性快照,它包含:
1. 應(yīng)用的當(dāng)前狀態(tài)
2. 消費(fèi)的輸入流位置
Flink會(huì)定期地產(chǎn)生checkpoint并且把這些checkpoint寫(xiě)入到一個(gè)持久化存儲(chǔ)上,比如S3或HDFS。這個(gè)寫(xiě)入過(guò)程是異步的,這就意味著Flink即使在checkpointing過(guò)程中也是不斷處理輸入數(shù)據(jù)的。
如果出現(xiàn)機(jī)器或軟件故障,Flink應(yīng)用重啟后會(huì)從最新成功完成的checkpoint中恢復(fù)——重置應(yīng)用狀態(tài)并回滾狀態(tài)到checkpoint中輸入流的正確位置,之后再開(kāi)始執(zhí)行數(shù)據(jù)處理,就好像該故障或崩潰從未發(fā)生過(guò)一般。
在Flink 1.4版本之前,僅一次處理只限于Flink應(yīng)用內(nèi)。Flink處理完數(shù)據(jù)后需要將結(jié)果發(fā)送到外部系統(tǒng),這個(gè)過(guò)程中Flink并不保證僅一次處理。但是Flink應(yīng)用通常都需要接入很多下游子系統(tǒng),而開(kāi)發(fā)人員很希望能在多個(gè)系統(tǒng)上維持僅一次處理語(yǔ)義,即維持端到端的僅一次處理語(yǔ)義。
為了提供端到端的僅一次處理語(yǔ)義,僅一次處理語(yǔ)義必須也要應(yīng)用于Flink寫(xiě)入數(shù)據(jù)的外部系統(tǒng)——故這些外部系統(tǒng)必須提供一種手段允許提交或回滾這些寫(xiě)入操作,同時(shí)還要保證與Flink checkpoint能夠協(xié)調(diào)使用。
在分布式系統(tǒng)中協(xié)調(diào)提交和回滾的一個(gè)常見(jiàn)方法就是使用兩階段提交協(xié)議。下一章節(jié)中我們將討論下Flink的TwoPhaseCommitSinkFunction是如何利用兩階段提交協(xié)議來(lái)實(shí)現(xiàn)exactly-once semantics的。
2Flink實(shí)現(xiàn)僅一次語(yǔ)義的應(yīng)用下面將給出一個(gè)實(shí)例來(lái)幫助了解兩階段提交協(xié)議以及Flink如何使用它來(lái)實(shí)現(xiàn)僅一次處理語(yǔ)義。該實(shí)例從Kafka中讀取數(shù)據(jù),經(jīng)處理之后再寫(xiě)回到Kafka。Kafka是非常受歡迎的消息隊(duì)列,而Kafka 0.11.0.0版本正式發(fā)布了對(duì)于事務(wù)的支持——這是與Kafka交互的Flink應(yīng)用要實(shí)現(xiàn)端到端僅一次語(yǔ)義的必要條件。
當(dāng)然,Flink支持這種僅一次處理語(yǔ)義并不只是限于與Kafka的結(jié)合,可以使用任何source/sink,只要它們提供了必要的協(xié)調(diào)機(jī)制。舉個(gè)例子,Pravega是Dell/EMC的一個(gè)開(kāi)源流式存儲(chǔ)系統(tǒng),Flink搭配它也可以實(shí)現(xiàn)端到端的exactly-once semantics。
本例中的Flink應(yīng)用包含以下組件,如上圖所示:
1. 一個(gè)source,從Kafka中讀取數(shù)據(jù)(即KafkaConsumer)
2. 一個(gè)時(shí)間窗口化的聚會(huì)操作
3. 一個(gè)sink,將結(jié)果寫(xiě)回到Kafka(即KafkaProducer)
若要sink支持 exactly-once semantics,它必須以事務(wù)的方式寫(xiě)數(shù)據(jù)到Kafka,這樣當(dāng)提交事務(wù)時(shí)兩次checkpoint間的所有寫(xiě)入操作當(dāng)作為一個(gè)事務(wù)被提交。這確保了出現(xiàn)故障或崩潰時(shí)這些寫(xiě)入操作能夠被回滾。
當(dāng)然了,在一個(gè)分布式且含有多個(gè)并發(fā)執(zhí)行sink的應(yīng)用中,僅僅執(zhí)行單次提交或回滾是不夠的,因?yàn)樗薪M件都必須對(duì)這些提交或回滾達(dá)成共識(shí),這樣才能保證得到一個(gè)一致性的結(jié)果。Flink使用兩階段提交協(xié)議以及預(yù)提交(pre-commit)階段來(lái)解決這個(gè)問(wèn)題。
Flink checkpointing開(kāi)始時(shí)便進(jìn)入到pre-commit階段。具體來(lái)說(shuō),一旦checkpoint開(kāi)始,Flink的JobManager向輸入流中寫(xiě)入一個(gè)checkpoint barrier將流中所有消息分割成屬于本次checkpoint的消息以及屬于下次checkpoint的。barrier也會(huì)在操作算子間流轉(zhuǎn)。對(duì)于每個(gè)operator來(lái)說(shuō),該barrier會(huì)觸發(fā)operator狀態(tài)后端為該operator狀態(tài)打快照。
眾所周知,flink kafka source保存Kafka消費(fèi)offset,一旦完成位移保存,它會(huì)將checkpoint barrier傳給下一個(gè)operator。
這個(gè)方法對(duì)于opeartor只有內(nèi)部狀態(tài)的場(chǎng)景是可行的。所謂的內(nèi)部狀態(tài)就是完全由Flink狀態(tài)保存并管理的——本例中的第二個(gè)opeartor:時(shí)間窗口上保存的求和數(shù)據(jù)就是這樣的例子。當(dāng)只有內(nèi)部狀態(tài)時(shí),pre-commit階段無(wú)需執(zhí)行額外的操作,僅僅是寫(xiě)入一些已定義的狀態(tài)變量即可。當(dāng)chckpoint成功時(shí)Flink負(fù)責(zé)提交這些寫(xiě)入,否則就終止取消掉它們。
當(dāng)時(shí),一旦operator包含外部狀態(tài),事情就不一樣了。我們不能像處理內(nèi)部狀態(tài)一樣處理這些外部狀態(tài)。因?yàn)橥獠繝顟B(tài)通常都涉及到與外部系統(tǒng)的交互。如果是這樣的話,外部系統(tǒng)必須要支持可與兩階段提交協(xié)議捆綁使用的事務(wù)才能確保實(shí)現(xiàn)整體的exactly-once semantics。
顯然本例中的data sink是有外部狀態(tài)的,因?yàn)樗枰獙?xiě)入數(shù)據(jù)到Kafka。此時(shí)的pre-commit階段下data sink在保存狀態(tài)到狀態(tài)存儲(chǔ)的同時(shí)還必須預(yù)提交它的外部事務(wù),如下圖所示:
??
當(dāng)checkpoint barrier在所有operator都傳遞了一遍且對(duì)應(yīng)的快照也都成功完成之后,pre-commit階段才算完成。該過(guò)程中所有創(chuàng)建的快照都被視為是checkpoint的一部分。其實(shí),checkpoint就是整個(gè)應(yīng)用的全局狀態(tài),當(dāng)然也包含pre-commit階段提交的外部狀態(tài)。當(dāng)出現(xiàn)崩潰時(shí),我們可以回滾狀態(tài)到最新已成功完成快照時(shí)的時(shí)間點(diǎn)。
下一步就是通知所有的operator,告訴它們checkpoint已成功完成。這便是兩階段提交協(xié)議的第二個(gè)階段:commit階段。該階段中JobManager會(huì)為應(yīng)用中每個(gè)operator發(fā)起checkpoint已完成的回調(diào)邏輯。
本例中的data source和窗口操作無(wú)外部狀態(tài),因此在該階段,這兩個(gè)opeartor無(wú)需執(zhí)行任何邏輯,但是data sink是有外部狀態(tài)的,因此此時(shí)我們必須提交外部事務(wù),如下圖所示:
匯總以上所有信息,總結(jié)一下:
1. 一旦所有operator完成各自的pre-commit,它們會(huì)發(fā)起一個(gè)commit操作
2. 倘若有一個(gè)pre-commit失敗,所有其他的pre-commit必須被終止,并且Flink會(huì)回滾到最近成功完成decheckpoint
3. 一旦pre-commit完成,必須要確保commit也要成功——operator和外部系統(tǒng)都需要對(duì)此進(jìn)行保證。倘若commit失敗(比如網(wǎng)絡(luò)故障等),Flink應(yīng)用就會(huì)崩潰,然后根據(jù)用戶重啟策略執(zhí)行重啟邏輯,之后再次重試commit。這個(gè)過(guò)程至關(guān)重要,因?yàn)樘热鬰ommit無(wú)法順利執(zhí)行,就可能出現(xiàn)數(shù)據(jù)丟失的情況
因此,所有opeartor必須對(duì)checkpoint最終結(jié)果達(dá)成共識(shí):即所有operator都必須認(rèn)定數(shù)據(jù)提交要么成功執(zhí)行,要么被終止然后回滾。
3Flink中實(shí)現(xiàn)兩階段提交這種operator的管理有些復(fù)雜,這也是為什么Flink提取了公共邏輯并封裝進(jìn)TwoPhaseCommitSinkFunction抽象類(lèi)的原因。
下面討論一下如何擴(kuò)展TwoPhaseCommitSinkFunction類(lèi)來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的基于文件的sink。若要實(shí)現(xiàn)支持exactly-once semantics的文件sink,我們需要實(shí)現(xiàn)以下4個(gè)方法:
1. beginTransaction:開(kāi)啟一個(gè)事務(wù),在臨時(shí)目錄下創(chuàng)建一個(gè)臨時(shí)文件,之后,寫(xiě)入數(shù)據(jù)到該文件中
2. preCommit:在pre-commit階段,flush緩存數(shù)據(jù)塊到磁盤(pán),然后關(guān)閉該文件,確保再不寫(xiě)入新數(shù)據(jù)到該文件。同時(shí)開(kāi)啟一個(gè)新事務(wù)執(zhí)行屬于下一個(gè)checkpoint的寫(xiě)入操作3. commit:在commit階段,我們以原子性的方式將上一階段的文件寫(xiě)入真正的文件目錄下。注意:這會(huì)增加輸出數(shù)據(jù)可見(jiàn)性的延時(shí)。通俗說(shuō)就是用戶想要看到最終數(shù)據(jù)需要等會(huì),不是實(shí)時(shí)的。4. abort:一旦終止事務(wù),我們離自己刪除臨時(shí)文件
當(dāng)出現(xiàn)崩潰時(shí),Flink會(huì)恢復(fù)最新已完成快照中應(yīng)用狀態(tài)。需要注意的是在某些極偶然的場(chǎng)景下,pre-commit階段已成功完成而commit尚未開(kāi)始(也就是operator尚未來(lái)得及被告知要開(kāi)啟commit),此時(shí)倘若發(fā)生崩潰Flink會(huì)將opeartor狀態(tài)恢復(fù)到已完成pre-commit但尚未commit的狀態(tài)。
在一個(gè)checkpoint狀態(tài)中,對(duì)于已完成pre-commit的事務(wù)狀態(tài),我們必須保存足夠多的信息,這樣才能確保在重啟后要么重新發(fā)起commit亦或是終止掉事務(wù)。本例中這部分信息就是臨時(shí)文件所在的路徑以及目標(biāo)目錄。
TwoPhaseCommitSinkFunction考慮了這種場(chǎng)景,因此當(dāng)應(yīng)用從checkpoint恢復(fù)之后TwoPhaseCommitSinkFunction總是會(huì)發(fā)起一個(gè)搶占式的commit。這種commit必須是冪等性的,雖然大部分情況下這都不是問(wèn)題。本例中對(duì)應(yīng)的這種場(chǎng)景就是:臨時(shí)文件不在臨時(shí)目錄下,而是已經(jīng)被移動(dòng)到目標(biāo)目錄下。
4總結(jié)本文的一些關(guān)鍵要點(diǎn):
Flinkcheckpointing機(jī)制是實(shí)現(xiàn)兩階段提交協(xié)議以及提供僅一次語(yǔ)義的基石
與其他系統(tǒng)持久化傳輸中的數(shù)據(jù)不同,Flink不需要將計(jì)算的每個(gè)階段寫(xiě)入到磁盤(pán)中
Flink新的TwoPhaseCommitSinkFunction封裝兩階段提交協(xié)議的公共邏輯使之搭配支持事務(wù)的外部系統(tǒng)來(lái)共同構(gòu)建僅一次語(yǔ)義應(yīng)用成為可能
自1.4版本起,Flink + Pravega和Kafka 0.11 producer開(kāi)始支持僅一次語(yǔ)義
Flink Kafka 0.11 producer基于TwoPhaseCommitSinkFunction實(shí)現(xiàn),比起至少一次語(yǔ)義的producer而言開(kāi)銷(xiāo)并未顯著增加
推薦閱讀:
Flink 1.10 細(xì)粒度資源管理解析
Flink State 最佳實(shí)踐
不可不知的spark shuffle
總結(jié)
以上是生活随笔為你收集整理的flink checkpoint 恢复_干货:Flink+Kafka 0.11端到端精确一次处理语义实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: mysql2017windows安装_m
- 下一篇: c语言标准整形,C语言整形数值范围问题