azkaban获取上游的节点结果_Flink任务实时获取并更新规则
背景
Flink通常被用來處理流式數據,有著眾多的應用場景,比方說實時的ETL、檢測報警等業務場景。這些場景通常會涉及到規則的更新,比如對解析規則和報警規則進行更改后,流任務應能夠實時感知到,并用新的規則繼續檢測,避免因為規則更改而重啟任務造成的開銷。一般來說流式任務的重啟是比較重的。
方案選擇
接下來分別介紹下兩種可行的方案與選型
1.廣播變量與廣播流
廣播變量通常被運用到以下場景中:一個流中的一些數據需要被廣播到所有的下游任務,被下游任務緩存在本地并用于處理另一個流上的所有傳入數據。例如,一個低吞吐量的流包含了一組規則,我們希望根據這些規則對另一個流的所有數據進行檢測。因此,廣播變量(broadcast state)和其他的state相比有以下不同:(1)目前只支持map格式(2)算子需要同時包含廣播流和普通的數據流才可用(3)一個算子可以使用多個廣播變量并用名稱進行區分
2.異步IO
流計算中經常會需要跟外部存儲系統交互,如mysql、redis等。眾所周知,在流處理中查詢外部存儲系統并等結果返回等待時間相對來說是比較長的,若數據量較大則導致會吞吐量會大大降低,使流任務基本處于阻塞不可用狀態。
同步處理與異步處理Flink的異步IO可以使得流任務在不阻塞運算的情況下異步請求外部系統,并且支持超時處理,以及返回結果有序或無序處理等。關于異步IO的詳細原理介紹已存在較多資料不在展開。通過異步IO獲取規則的原理就是在數據到來之后查詢外部系統獲取規則,并根據規則檢測或解析數據。比方將規則存放在redis、mysql等等。
理論上兩種方式都是可以完成從外部系統獲取規則并進行更新的。現分析下量兩種方式的差別與特點。對于異步IO來說若對于每條數據查一次外部系統,當數據量比較大的時候,外部系統的查詢性能會比較容易出現瓶頸,導致流處理能力達到上限。通常的優化方法是設置本地內存緩存規則,并設置過期時間,每次處理數據時判斷規則是否過期,若過期則重新查詢規則,這也導致了規則的獲取可能有一定的延遲性,也就是說需要在數據量較大的情況下需要對處理性能和規則實時性作出一定的平衡,另外若任務運行在分布式架構上,同一算子可能在不同的機器或者容器中運行,則可能導致多個節點查詢同一外部數據源的情況。
接下來分析下廣播變量和廣播流的方法,該方法通常定義一個數據源作為規則流,該數據源可以利用flink已有的connector,如kafka或者其他的消息隊列,也可以繼承flink的RichSourceFunction自定義數據源,比如利用Mysql作為數據源并設置線程定期獲取規則。若將變更的規則放入消息隊列作為規則流則可以做到實時更新,若利用外部存儲如Mysql、Redis作為數據源定時刷新,可以準實時的更新規則,無論利用那種方式,都可以只設置并行度為1的規則流獲取算子,并將獲取到的規則廣播到下游所有算子。
因為筆者遇到的情況如下:需要更新規則的flink任務數大概在300個以上,并且還會持續增多,另外用戶對于規則的更新最多可以容忍分鐘級別的延遲,但最好是可以實時更新,對規則更新的實時性要求不是特別高,另外流任務的數據量比較大,單個任務每秒處理數據條數可能達到幾萬或幾十萬。因此使用廣播變量并將Mysql作為數據源的方式獲取規則,這樣可以滿足目前的需求,而且利于后期利用消息隊列作為規則流實時更新規則擴展。
實踐
背景及方案介紹完了,接下來直接上代碼吧,talk is cheap, show me the code.
首先是繼承RichSourceFunction類并自定義規則流,從msyql定時拉取配置。
public class GetJobConfig extends RichSourceFunction<Map<String,JobConfig>> {private volatile boolean isRunning = true;/*** 規則配置id*/private Long jobHistoryId;public GetJobConfig(Long jobHistoryId) {this.jobHistoryId = jobHistoryId;}/*** 解析規則查詢周期為1分鐘*/private Long duration = 1 * 60 * 1000L;@Overridepublic void run(SourceContext<Map<String,JobConfig>> ctx) throws Exception {while (isRunning) {//從Mysql數據庫獲取配置String jobConfigStr = DBService.getInstance().getJobConfig(jobHistoryId);//解析規則與業務邏輯相關請忽略if (!StringUtils.isEmpty(jobConfigStr)) {JobConfig jobConfig = JsonUtil.string2Object(jobConfigStr, JobConfig.class);Map<String,JobConfig> jobConfigMap = new HashMap<>(12);jobConfigMap.put("jobConfig",jobConfig);//輸出規則流ctx.collect(jobConfigMap);}//線程睡眠Thread.sleep(duration);}}@Overridepublic void cancel() {isRunning = false;} }接下來根據規則的數據類型定義MapState,用來描述如何存儲規則流。之后將規則流通過broadcast方法廣播出去。
//定義MapState MapStateDescriptor<String, JobConfig> etlRules = new MapStateDescriptor<String, JobConfig>("etl_rules",//Key類型BasicTypeInfo.STRING_TYPE_INFO,//Value類型TypeInformation.of(new TypeHint<JobConfig>() {})); //將規則流廣播到下游算子 BroadcastStream<Map<String, JobConfig>> etlRuleBroadcastStream = jobConfigStream.broadcast(etlRules);然后在下游算子將規則流和數據流結合起來作檢測或報警匹配,利用connect實現規則流和數據流的結合,并在process方法中定義具體處理邏輯,其中source為從kafka獲取的要處理的數據流。
//規則流和數據流的結合 DataStream<Tuple2<Integer, Map<String, Object>>> outputStream = source.connect(etlRuleBroadcastStream).process(new EtlBroadcastProcessFunction()).setParallelism(etlParallelism).name("etl").uid("etl");最后看一下具體的處理邏輯,通過繼承BroadcastProcessFunction類并重寫processBroadcastElement與processElement方法,其中processBroadcastElement為處理規則流的邏輯,從該方法中可以獲取到上游廣播出的規則數據,processElement用來處理數據流并將獲取到的規則作用在數據流上。
public class EtlBroadcastProcessFunction extends BroadcastProcessFunction<String,Map<String, JobConfig>,Tuple2<Integer, Map<String, Object>>> {//解析規則private JobConfig jobConfig;/*** 處理數據流* @param record* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(String record, ReadOnlyContext ctx, Collector<Tuple2<Integer, Map<String, Object>>> out) throws Exception {//record為要處理的數據,可利用獲取到的規則jobConfig來檢測收到的數據}/*** 獲取規則流并緩存* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(Map<String, JobConfig> value, Context ctx, Collector<Tuple2<Integer, Map<String, Object>>> out) throws Exception {//value中為獲取到的規則數據//緩存規則數據this.jobConfig = value.get(“jobConfig“);}}最終提交到yarn上運行,可以看到DAG圖如下所示。可以清楚的看到該任務存在兩個輸入流,其中一個為規則數據源mysql,另外一個為消息隊列kafka。
踩坑
在測試的過程中出現了一點小問題,運行過程中使用的時間類型為EventTime,卻發現加上規則流之后下游的時間窗口始終未觸發,而不加規則流則可以正常觸發窗口計算。最后通過觀察watermark找到了問題所在,發現加了規則流之后下游的watermark都為空,于是不由的想到了flink watermark的原理。
watermark機制若一個算子存在兩個上游,則該算子watermark會選擇較小的一個,若一個上游不存在watermark則該算子會獲取不到。由此問題的原因就很清晰了,是由上游規則流未定義watermark導致的,因此將assignTimestampsAndWatermarks方法后置,放在兩個流結合之后,則下游的算子可以正常得到watermark并觸發窗口計算了。
參考資料
1.
The Broadcast State Pattern?ci.apache.org2.
Generating Timestamps / Watermarks?ci.apache.org3.
Asynchronous I/O for External Data Access?ci.apache.org總結
以上是生活随笔為你收集整理的azkaban获取上游的节点结果_Flink任务实时获取并更新规则的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 捕捉不可控iframe的close事件_
- 下一篇: mysql 交集_MySQL 查询结果取