Flink的状态一致性
1 狀態的一致性
1.1 一致性級別
??流處理操作一般分為at-most-once,at-least-once和exactly-once這3個級別。
??at-most-once:至多一次,發生故障恢復后數據可能丟失
??at-least-once:至少一次,發生故障恢復后數據可能多算,絕對不會少算
??exactly-once:精確一次,發生故障恢復后數據不會丟失也不會多算
1.2 端到端的狀態一致性
1.2.1 Spark Streaming的exactly-once
??對于 Spark Streaming 任務,設置了 checkpoint,如果發生故障并重啟,可以從上次 checkpoint 之處恢復,但是這個行為只能使得數據不丟失,可能會重復處理,不能做到恰一次處理語義。如果 Spark Streaming 消費的數據源是 kafka ,那么使用 direct Stream 的方式自己維護 offset 到 zookeeper或者其它外部系統,每次提交完結果之后再提交 offset,這樣故障恢復重啟可以利用上次提交的 offset 恢復,保證數據不丟失。但是假如故障發生在提交結果之后、提交 offset 之前會導致數據多次處理,就需要保證處理結果多次輸出不影響正常的業務。
??所以如果要保證數據恰一次處理語義,那么結果輸出和 offset 提交必須在一個事務內完成。在這里有以下兩種做法:
??(1)使用repartition(1) 將輸出的 partition設置為1,那就可以利用事務操作
Dstream.foreachRDD(rdd=>{rdd.repartition(1).foreachPartition(partition=>{ // 開啟事務partition.foreach(each=>{// 提交數據}) // 提交事務})})??(2)將結果和offset一起提交。這樣提交結果和提交 offset 就是一個操作完成,不會數據丟失,也不會重復處理。故障恢復的時候可以利用上次提交結果帶的 offset。
1.2.2 Flink的exactly-once
??Flink使用checkpoint保證其內部的exactly-once,但是我們的應用還包含了數據源和輸出,每個組件都只是保證了自己的一致性,所以端到端級別的一致性取決于所有組件中一致性最弱的組件。
要滿足端到端的狀態一致性需要滿足以下幾點:
??(1)source:需要外部數據源可以重新設置數據的讀取位置
??(2)內部:通過checkpoint保證內部的一致性
??(3)sink:從故障恢復時,數據不會重復寫入到外部系統
??如果sourced端是kafka的話,可以輕松實現重設讀取位置,Flink內部通過checkpoint就能保證內部一致性,較為復雜的是在sink端不能重復寫入,有兩種具體的實現方式:冪等寫入和事務性寫入。
??冪等寫入:重復執行多次操作,但是重復執行就只會導致一次結果更新,重復執行不起作用。冪等寫入要求外部的數據庫必須支持冪等寫入,像往es和文件里面追加寫入是不行的,像redis和mysql定義了key之后也不一定是真正的exectly-once,保證的是最終的狀態一次性,在中間短暫的恢復中是有短暫的狀態不一致的,因為有些中間狀態要重復寫入
??事務寫入:構建事務寫入外部系統,構建的事務對應checkpoint,到checkpoint真正完成的時候才把所有對應的結果寫入到sink系統。對于事務性寫入,又有兩種實現方式:預寫日志和兩階段提交。GenericWriteAheadSink模板類和TwoPhaseCommitSinkFunction 接口,可以方便地實現這兩種方式的事務性寫入
1.2.3 預寫日志
??把所有要寫入的數據保存成sink的一個狀態,相當于在sink做緩存了,收到checkpoint完成的通知的時候再一次性寫入sink系統。再往外部系統寫入的時候一批寫入如果寫到一半的時候掛了怎么辦,不能保證。不能嚴格意義達到精確一次性。DataStream API提供了GenericWriteAheadSink模板類
1.2.4 兩階段提交
??兩階段提交能真正做到exectly-once。前面說的預寫入日志是對兩次checkpoint之間的所有數據直接緩存在sink任務里面,最后收到checkpoint通知的時候一批直接寫入。兩階段事務提交還是正常的來一個寫一個,只不過和外部系統開啟了一個事務,是在事務里面提交的,假如中間某個掛了就全部回滾,等到checkpoint完成的時候就真正把這個事務提交。
2 端到端狀態一致性
??端到端的狀態一致性的實現,需要每一個組件都實現。以Kafka-Flink-Kafka的端到端的數據管道系統為例:
①source:kafka consumer作為source,可以將偏移量保存下來,如果任務出現了故障,恢復的時候可以由連接器重置偏移量,重新消費數據,即可保證一致性。②內部: 利用checkpoint機制,把狀態存盤,發生故障的時候可以恢復,保證內部的狀態一致性。③sink : kafka producer作為sink,采用兩階段提交 sink,需要實現一個 TwoPhaseCommitSinkFunction
??Flink是由JobManager協調各個TaskManager進行checkpoint存儲, Kafka-Flink-Kafka的流程如下:
checkpoint 啟動,JobManager 會將檢查點分界線barrier注入數據流;barrier往后傳遞下去,每個算子遇到barrier會對當前的狀態做個快照,保存到狀態后端。source就是當前的offset作為狀態保存起來,checkpoint恢復時重新提交偏移量,從上次保存的位置開始重新消費數據。barrier一直傳遞帶sink,當遇到 barrier 時,把狀態保存到狀態后端,并開啟新的預提交事務,sink 任務首先把數據寫入外部 kafka,這些數據都屬于預提交的事務。當所有算子任務的快照完成,也就是這次的 checkpoint 完成時,JobManager 會向所有任務發通知,確認這次 checkpoint 完成。當sink 任務收到確認通知,就會正式提交之前的事務,kafka 中未確認的數據就改為“已確認”,數據就真正可以被消費了。
??兩階段提交步驟:①預提交:第一條數據來了之后,開啟一個 kafka 的事務(transaction),正常寫入 kafka 分區日志但標記為未提交②jobmanager 觸發 checkpoint 操作,barrier 從 source 開始向下傳遞,遇到 barrier 的算子將狀態存入狀態后端,并通知 jobmanager③ sink 連接器收到 barrier,保存當前狀態,存入 checkpoint,通知 jobmanager,并開啟下一階段的事務,用于提交下個檢查點的數據④jobmanager 收到所有任務的通知,發出確認信息,表示 checkpoint 完成⑤sink 任務收到 jobmanager 的確認信息,正式提交這段時間的數據⑥外部kafka關閉事務,提交的數據可以正常消費了。
總結
以上是生活随笔為你收集整理的Flink的状态一致性的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux怎么安装32电脑上,linux
- 下一篇: java sax解析xml_【转】jav