2021年大数据Flink(二十八):Flink 容错机制 自动重启策略和恢复
目錄
自動重啟策略和恢復
重啟策略配置方式
重啟策略分類
代碼演示
手動重啟并恢復-了解
1.把程序打包
2.啟動Flink集群(本地單機版,集群版都可以)
3.訪問webUI
4.使用FlinkWebUI提交
5.取消任務
6.重新啟動任務并指定從哪恢復
7.關閉/取消任務
8.關閉集群
自動重啟策略和恢復
重啟策略配置方式
- 配置文件中
在flink-conf.yml中可以進行配置,示例如下:
restart-strategy: fixed-delayrestart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s
- 代碼中
還可以在代碼中針對該任務進行配置,示例如下:
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重啟次數Time.of(10, TimeUnit.SECONDS) // 延遲時間間隔))
重啟策略分類
- 默認重啟策略
如果配置了Checkpoint,而沒有配置重啟策略,那么代碼中出現了非致命錯誤時,程序會無限重啟
- 無重啟策略
?Job直接失敗,不會嘗試進行重啟
?設置方式1:flink-conf.yaml
?restart-strategy: none
??
?設置方式2:
?無重啟策略也可以在程序中設置
?val env = ExecutionEnvironment.getExecutionEnvironment()
?env.setRestartStrategy(RestartStrategies.noRestart())
- 固定延遲重啟策略--開發中使用
?設置方式1:
?重啟策略可以配置flink-conf.yaml的下面配置參數來啟用,作為默認的重啟策略:
?例子:
?restart-strategy: fixed-delay
?restart-strategy.fixed-delay.attempts: 3
?restart-strategy.fixed-delay.delay: 10 s
??
?設置方式2:
?也可以在程序中設置:
?val env = ExecutionEnvironment.getExecutionEnvironment()
?env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
? ?3, // 最多重啟3次數
? ?Time.of(10, TimeUnit.SECONDS) // 重啟時間間隔
?))
?上面的設置表示:如果job失敗,重啟3次, 每次間隔10
- 失敗率重啟策略--開發偶爾使用
?設置方式1:
?失敗率重啟策略可以在flink-conf.yaml中設置下面的配置參數來啟用:
?例子:
?restart-strategy:failure-rate
?restart-strategy.failure-rate.max-failures-per-interval: 3
?restart-strategy.failure-rate.failure-rate-interval: 5 min
?restart-strategy.failure-rate.delay: 10 s
??
?設置方式2:
?失敗率重啟策略也可以在程序中設置:
?val env = ExecutionEnvironment.getExecutionEnvironment()
?env.setRestartStrategy(RestartStrategies.failureRateRestart(
? ?3, // 每個測量時間間隔最大失敗次數
? ?Time.of(5, TimeUnit.MINUTES), //失敗率測量的時間間隔
? ?Time.of(10, TimeUnit.SECONDS) // 兩次連續重啟的時間間隔
?))
?上面的設置表示:如果5分鐘內job失敗不超過三次,自動重啟, 每次間隔10s (如果5分鐘內程序失敗超過3次,則程序退出)
代碼演示
package cn.it.checkpoint;import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;/*** Author lanson* Desc 演示Checkpoint+重啟策略*/
public class CheckpointDemo02_RestartStrategy {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//===========Checkpoint參數設置====//===========類型1:必須參數=============//設置Checkpoint的時間間隔為1000ms做一次Checkpoint/其實就是每隔1000ms發一次Barrier!env.enableCheckpointing(1000);//設置State狀態存儲介質/*if(args.length > 0){env.setStateBackend(new FsStateBackend(args[0]));}else {env.setStateBackend(new FsStateBackend("file:///D:/ckp"));}*/if(SystemUtils.IS_OS_WINDOWS){env.setStateBackend(new FsStateBackend("file:///D:/ckp"));}else{env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));}//===========類型2:建議參數===========//設置兩個Checkpoint 之間最少等待時間,如設置Checkpoint之間最少是要等?500ms(為了避免每隔1000ms做一次Checkpoint的時候,前一次太慢和后一次重疊到一起去了)//如:高速公路上,每隔1s關口放行一輛車,但是規定了兩車之前的最小車距為500menv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默認是0//設置如果在做Checkpoint過程中出現錯誤,是否讓整體任務失敗:true是??false不是//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默認是trueenv.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默認值為0,表示不容忍任何檢查點失敗//設置是否清理檢查點,表示?Cancel 時是否需要保留當前的?Checkpoint,默認?Checkpoint會在作業被Cancel時被刪除//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,當作業被取消時,刪除外部的checkpoint(默認值)//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,當作業被取消時,保留外部的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//===========類型3:直接使用默認的即可===============//設置checkpoint的執行模式為EXACTLY_ONCE(默認)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//設置checkpoint的超時時間,如果?Checkpoint在?60s內尚未完成說明該次Checkpoint失敗,則丟棄。env.getCheckpointConfig().setCheckpointTimeout(60000);//默認10分鐘//設置同一時間有多少個checkpoint可以同時執行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默認為1//=============重啟策略===========//-1.默認策略:配置了Checkpoint而沒有配置重啟策略默認使用無限重啟//-2.配置無重啟策略//env.setRestartStrategy(RestartStrategies.noRestart());//-3.固定延遲重啟策略--開發中使用!//重啟3次,每次間隔10s/*env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, //嘗試重啟3次Time.of(10, TimeUnit.SECONDS))//每次重啟間隔10s);*///-4.失敗率重啟--偶爾使用//5分鐘內重啟3次(第3次不包括,也就是最多重啟2次),每次間隔10s/*env.setRestartStrategy(RestartStrategies.failureRateRestart(3, // 每個測量時間間隔最大失敗次數Time.of(5, TimeUnit.MINUTES), //失敗率測量的時間間隔Time.of(10, TimeUnit.SECONDS) // 每次重啟的時間間隔));*///上面的能看懂就行,開發中使用下面的代碼即可env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2.SourceDataStream<String> linesDS = env.socketTextStream("node1", 9999);//3.Transformation//3.1切割出每個單詞并直接記為1SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {//value就是每一行String[] words = value.split(" ");for (String word : words) {if(word.equals("bug")){System.out.println("手動模擬的bug...");throw new RuntimeException("手動模擬的bug...");}out.collect(Tuple2.of(word, 1));}}});//3.2分組//注意:批處理的分組是groupBy,流處理的分組是keyByKeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);//3.3聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);//4.sinkresult.print();//5.executeenv.execute();}
}
手動重啟并恢復-了解
1.把程序打包
2.啟動Flink集群(本地單機版,集群版都可以)
/export/server/flink/bin/start-cluster.sh
3.訪問webUI
http://node1:8081/#/overview
http://node2:8081/#/overview
4.使用FlinkWebUI提交
cn.checkpoint.CheckpointDemo01
5.取消任務
6.重新啟動任務并指定從哪恢復
cn.it.checkpoint.CheckpointDemo01
hdfs://node1:8020/flink-checkpoint/checkpoint/9e8ce00dcd557dc03a678732f1552c3a/chk-34
?
7.關閉/取消任務
8.關閉集群
/export/server/flink/bin/stop-cluster.sh
總結
以上是生活随笔為你收集整理的2021年大数据Flink(二十八):Flink 容错机制 自动重启策略和恢复的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(二十七):F
- 下一篇: 2021年大数据Flink(二十九):F