2021年大数据Flink(二十七):Flink 容错机制 Checkpoint
目錄
Flink 容錯(cuò)機(jī)制?Checkpoint
State Vs Checkpoint
Checkpoint執(zhí)行流程
簡(jiǎn)單流程
復(fù)雜流程
State狀態(tài)后端/State存儲(chǔ)介質(zhì)
MemStateBackend[了解]
FsStateBackend
RocksDBStateBackend
Checkpoint配置方式
全局配置
在代碼中配置
代碼演示
Flink 容錯(cuò)機(jī)制?Checkpoint
State Vs Checkpoint
- State:
維護(hù)/存儲(chǔ)的是某一個(gè)Operator的運(yùn)行的狀態(tài)/歷史值,是維護(hù)在內(nèi)存中!
一般指一個(gè)具體的Operator的狀態(tài)(operator的狀態(tài)表示一些算子在運(yùn)行的過(guò)程中會(huì)產(chǎn)生的一些歷史結(jié)果,如前面的maxBy底層會(huì)維護(hù)當(dāng)前的最大值,也就是會(huì)維護(hù)一個(gè)keyedOperator,這個(gè)State里面存放就是maxBy這個(gè)Operator中的最大值)
State數(shù)據(jù)默認(rèn)保存在Java的堆內(nèi)存中/TaskManage節(jié)點(diǎn)的內(nèi)存中
State可以被記錄,在失敗的情況下數(shù)據(jù)還可以恢復(fù)
- Checkpoint:
某一時(shí)刻,Flink中所有的Operator的當(dāng)前State的全局快照,一般存在磁盤(pán)上
表示了一個(gè)Flink Job在一個(gè)特定時(shí)刻的一份全局狀態(tài)快照,即包含了所有Operator的狀態(tài)
可以理解為Checkpoint是把State數(shù)據(jù)定時(shí)持久化存儲(chǔ)了
比如KafkaConsumer算子中維護(hù)的Offset狀態(tài),當(dāng)任務(wù)重新恢復(fù)的時(shí)候可以從Checkpoint中獲取
- 注意:
Flink中的Checkpoint底層使用了Chandy-Lamport algorithm分布式快照算法可以保證數(shù)據(jù)的在分布式環(huán)境下的一致性!
分布式快照算法: Chandy-Lamport 算法 - 知乎
Chandy-Lamport algorithm算法的作者也是ZK中Paxos 一致性算法的作者
zookeeper學(xué)習(xí)系列:四、Paxos算法和zookeeper的關(guān)系 - 堅(jiān)毅的刀刀 - 博客園
Flink中使用Chandy-Lamport algorithm分布式快照算法取得了成功,后續(xù)Spark的StructuredStreaming也借鑒了該算法
Checkpoint執(zhí)行流程
簡(jiǎn)單流程
- Flink的JobManager創(chuàng)建CheckpointCoordinator
- Coordinator向所有的SourceOperator發(fā)送Barrier柵欄(理解為執(zhí)行Checkpoint的信號(hào))
- SourceOperator接收到Barrier之后,暫停當(dāng)前的操作(暫停的時(shí)間很短,因?yàn)楹罄m(xù)的寫(xiě)快照是異步的),并制作State快照, 然后將自己的快照保存到指定的介質(zhì)中(如HDFS), 一切 ok之后向Coordinator匯報(bào)并將Barrier發(fā)送給下游的其他Operator
- 其他的如TransformationOperator接收到Barrier,重復(fù)第2步,最后將Barrier發(fā)送給Sink
- Sink接收到Barrier之后重復(fù)第2步
- Coordinator接收到所有的Operator的執(zhí)行ok的匯報(bào)結(jié)果,認(rèn)為本次快照?qǐng)?zhí)行成功
注意:
1.在往介質(zhì)(如HDFS)中寫(xiě)入快照數(shù)據(jù)的時(shí)候是異步的(為了提高效率)
2.分布式快照?qǐng)?zhí)行時(shí)的數(shù)據(jù)一致性由Chandy-Lamport algorithm分布式快照算法保證!
復(fù)雜流程
下圖左側(cè)是 Checkpoint Coordinator,是整個(gè) Checkpoint 的發(fā)起者,中間是由兩個(gè) source,一個(gè) sink 組成的 Flink 作業(yè),最右側(cè)的是持久化存儲(chǔ),在大部分用戶場(chǎng)景中對(duì)應(yīng) HDFS。
1.Checkpoint Coordinator 向所有 source 節(jié)點(diǎn) trigger Checkpoint。
2.source 節(jié)點(diǎn)向下游廣播 barrier,這個(gè) barrier 就是實(shí)現(xiàn) Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才會(huì)執(zhí)行相應(yīng)的 Checkpoint。
3.當(dāng) task 完成 state 備份后,會(huì)將備份數(shù)據(jù)的地址(state handle)通知給 Checkpoint coordinator。
4.下游的 sink 節(jié)點(diǎn)收集齊上游兩個(gè) input 的 barrier 之后,會(huì)執(zhí)行本地快照,(柵欄對(duì)齊)
這里還展示了 RocksDB incremental Checkpoint (增量Checkpoint)的流程,首先 RocksDB 會(huì)全量刷數(shù)據(jù)到磁盤(pán)上(紅色大三角表示),然后 Flink 框架會(huì)從中選擇沒(méi)有上傳的文件進(jìn)行持久化備份(紫色小三角)。
5.同樣的,sink 節(jié)點(diǎn)在完成自己的 Checkpoint 之后,會(huì)將 state handle 返回通知 Coordinator。
6.最后,當(dāng) Checkpoint coordinator 收集齊所有 task 的 state handle,就認(rèn)為這一次的 Checkpoint 全局完成了,向持久化存儲(chǔ)中再備份一個(gè) Checkpoint meta 文件。
???????State狀態(tài)后端/State存儲(chǔ)介質(zhì)
注意:
前面學(xué)習(xí)了Checkpoint其實(shí)就是Flink中某一時(shí)刻,所有的Operator的全局快照,
那么快照應(yīng)該要有一個(gè)地方進(jìn)行存儲(chǔ),而這個(gè)存儲(chǔ)的地方叫做狀態(tài)后端
Flink中的State狀態(tài)后端有很多種:
MemStateBackend[了解]
第一種是內(nèi)存存儲(chǔ),即 MemoryStateBackend,構(gòu)造方法是設(shè)置最大的StateSize,選擇是否做異步快照,
對(duì)于State狀態(tài)存儲(chǔ)在 TaskManager 節(jié)點(diǎn)也就是執(zhí)行節(jié)點(diǎn)內(nèi)存中的,因?yàn)閮?nèi)存有容量限制,所以單個(gè) State maxStateSize 默認(rèn) 5 M,且需要注意 maxStateSize <= akka.framesize 默認(rèn) 10 M。
對(duì)于Checkpoint 存儲(chǔ)在 JobManager 內(nèi)存中,因此總大小不超過(guò) JobManager 的內(nèi)存。
推薦使用的場(chǎng)景為:本地測(cè)試、幾乎無(wú)狀態(tài)的作業(yè),比如 ETL、JobManager 不容易掛,或掛掉影響不大的情況。
不推薦在生產(chǎn)場(chǎng)景使用。
FsStateBackend
另一種就是在文件系統(tǒng)上的 FsStateBackend 構(gòu)建方法是需要傳一個(gè)文件路徑和是否異步快照。
State 依然在 TaskManager 內(nèi)存中,但不會(huì)像 MemoryStateBackend 是?5 M 的設(shè)置上限
Checkpoint 存儲(chǔ)在外部文件系統(tǒng)(本地或 HDFS),打破了總大小 Jobmanager 內(nèi)存的限制。
推薦使用的場(chǎng)景為:常規(guī)使用狀態(tài)的作業(yè)、例如分鐘級(jí)窗口聚合或 join、需要開(kāi)啟HA的作業(yè)。
如果使用HDFS,則初始化FsStateBackend時(shí),需要傳入以 “hdfs://”開(kāi)頭的路徑(即: new FsStateBackend("hdfs:///hacluster/checkpoint")),
如果使用本地文件,則需要傳入以“file://”開(kāi)頭的路徑(即:new FsStateBackend("file:///Data"))。
在分布式情況下,不推薦使用本地文件。因?yàn)槿绻硞€(gè)算子在節(jié)點(diǎn)A上失敗,在節(jié)點(diǎn)B上恢復(fù),使用本地文件時(shí),在B上無(wú)法讀取節(jié)點(diǎn) A上的數(shù)據(jù),導(dǎo)致?tīng)顟B(tài)恢復(fù)失敗。
RocksDBStateBackend
還有一種存儲(chǔ)為 RocksDBStateBackend ,
RocksDB 是一個(gè) key/value 的內(nèi)存存儲(chǔ)系統(tǒng),和其他的 key/value 一樣,先將狀態(tài)放到內(nèi)存中,如果內(nèi)存快滿時(shí),則寫(xiě)入到磁盤(pán)中,
但需要注意 RocksDB 不支持同步的 Checkpoint,構(gòu)造方法中沒(méi)有同步快照這個(gè)選項(xiàng)。
不過(guò) RocksDB 支持增量的 Checkpoint,意味著并不需要把所有 sst 文件上傳到 Checkpoint 目錄,僅需要上傳新生成的 sst 文件即可。它的 Checkpoint 存儲(chǔ)在外部文件系統(tǒng)(本地或HDFS),
其容量限制只要單個(gè) TaskManager 上 State 總量不超過(guò)它的內(nèi)存+磁盤(pán),單 Key最大 2G,總大小不超過(guò)配置的文件系統(tǒng)容量即可。
推薦使用的場(chǎng)景為:超大狀態(tài)的作業(yè),例如天級(jí)窗口聚合、需要開(kāi)啟 HA 的作業(yè)、最好是對(duì)狀態(tài)讀寫(xiě)性能要求不高的作業(yè)。
Checkpoint配置方式
全局配置
修改flink-conf.yaml
#這里可以配置
#jobmanager(即MemoryStateBackend),
#filesystem(即FsStateBackend),
#rocksdb(即RocksDBStateBackend)
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
???????在代碼中配置
//1.MemoryStateBackend--開(kāi)發(fā)中不用
????env.setStateBackend(new MemoryStateBackend)
//2.FsStateBackend--開(kāi)發(fā)中可以使用--適合一般狀態(tài)--秒級(jí)/分鐘級(jí)窗口...
????env.setStateBackend(new FsStateBackend("hdfs路徑或測(cè)試時(shí)的本地路徑"))
//3.RocksDBStateBackend--開(kāi)發(fā)中可以使用--適合超大狀態(tài)--天級(jí)窗口...
env.setStateBackend(new RocksDBStateBackend(filebackend, true))
注意:RocksDBStateBackend還需要引入依賴
????<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.7.2</version></dependency>
???????代碼演示
package cn.it.checkpoint;import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;import java.util.Properties;/*** Author lanson* Desc 演示Checkpoint參數(shù)設(shè)置(也就是Checkpoint執(zhí)行流程中的步驟0相關(guān)的參數(shù)設(shè)置)*/
public class CheckpointDemo01 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//===========Checkpoint參數(shù)設(shè)置====//===========類型1:必須參數(shù)=============//設(shè)置Checkpoint的時(shí)間間隔為1000ms做一次Checkpoint/其實(shí)就是每隔1000ms發(fā)一次Barrier!env.enableCheckpointing(1000);//設(shè)置State狀態(tài)存儲(chǔ)介質(zhì)/*if(args.length > 0){env.setStateBackend(new FsStateBackend(args[0]));}else {env.setStateBackend(new FsStateBackend("file:///D:\\data\\ckp"));}*/if (SystemUtils.IS_OS_WINDOWS) {env.setStateBackend(new FsStateBackend("file:///D:\\data\\ckp"));} else {env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));}//===========類型2:建議參數(shù)===========//設(shè)置兩個(gè)Checkpoint 之間最少等待時(shí)間,如設(shè)置Checkpoint之間最少是要等?500ms(為了避免每隔1000ms做一次Checkpoint的時(shí)候,前一次太慢和后一次重疊到一起去了)//如:高速公路上,每隔1s關(guān)口放行一輛車,但是規(guī)定了兩車之前的最小車距為500menv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默認(rèn)是0//設(shè)置如果在做Checkpoint過(guò)程中出現(xiàn)錯(cuò)誤,是否讓整體任務(wù)失敗:true是??false不是//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默認(rèn)是trueenv.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默認(rèn)值為0,表示不容忍任何檢查點(diǎn)失敗//設(shè)置是否清理檢查點(diǎn),表示?Cancel 時(shí)是否需要保留當(dāng)前的?Checkpoint,默認(rèn)?Checkpoint會(huì)在作業(yè)被Cancel時(shí)被刪除//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,當(dāng)作業(yè)被取消時(shí),刪除外部的checkpoint(默認(rèn)值)//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,當(dāng)作業(yè)被取消時(shí),保留外部的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//===========類型3:直接使用默認(rèn)的即可===============//設(shè)置checkpoint的執(zhí)行模式為EXACTLY_ONCE(默認(rèn))env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//設(shè)置checkpoint的超時(shí)時(shí)間,如果?Checkpoint在?60s內(nèi)尚未完成說(shuō)明該次Checkpoint失敗,則丟棄。env.getCheckpointConfig().setCheckpointTimeout(60000);//默認(rèn)10分鐘//設(shè)置同一時(shí)間有多少個(gè)checkpoint可以同時(shí)執(zhí)行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默認(rèn)為1//2.SourceDataStream<String> linesDS = env.socketTextStream("node1", 9999);//3.Transformation//3.1切割出每個(gè)單詞并直接記為1DataStream<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {//value就是每一行String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}});//3.2分組//注意:批處理的分組是groupBy,流處理的分組是keyByKeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);//3.3聚合DataStream<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);DataStream<String> result = (SingleOutputStreamOperator<String>) aggResult.map(new RichMapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {return value.f0 + ":::" + value.f1;}});//4.sinkresult.print();Properties props = new Properties();props.setProperty("bootstrap.servers", "node1:9092");FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);result.addSink(kafkaSink);//5.executeenv.execute();// /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka}
}
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Flink(二十七):Flink 容错机制 Checkpoint的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 2021年大数据Flink(二十六):
- 下一篇: 2021年大数据Flink(二十八):F