Flink的ProcessFunction API
1 ProcessFunction
?? ProcessFunction是一個低階的流處理操作,可以訪問事件(event)(流元素),狀態(state)(容錯性,一致性,僅在keyed stream中),定時器(timers)(event time和processing time, 僅在keyed stream中)。也就是說可以訪問普通的轉換算子無法訪問事件的時間戳信息和Watermark的。
?? ProcessFunction可以看作是一個具有keyed state 鍵控狀態和 timers定時器訪問權的FlatMapFunction,通過對輸入流中接收的每個事件調用來處理事件。①通過RuntimeContext訪問keyed state②計時器允許應用程序對處理時間和事件時間中的更改作出響應。對processElement(…)函數的每次調用都獲得一個Context對象,該對象可以訪問元素的event time timestamp和TimerService;③TimerService可用于為將來的event/process time瞬間注冊回調。當到達計時器的特定時間時,將調用onTimer(…)方法。在該調用期間,所有狀態都再次限定在創建計時器時使用的鍵的范圍內,從而允許計時器操作鍵控狀態。總之ProcessFunction可以訪問時間戳、watermark以及注冊定時事件,輸出特定的一些事件等。Flink SQL就是使用Process Function實現的。
?? 如果要訪問鍵控狀態和計時器,則必須應用在keyedStream上
stream.keyBy(...).process(new MyProcessFunction())?? Flink提供了8個Process Function:ProcessFunction,KeyedProcessFunction,CoProcessFunction,ProcessJoinFunction,BroadcastProcessFunction,KeyedBroadcastProcessFunction,ProcessWindowFunction,ProcessAllWindowFunction。
?? 所有的Process Function都繼承自RichFunction接口,所以都有open()、close()和getRuntimeContext()等方法,還額外提供了兩個方法processElement和onTimer
?? processElement:每來一個元素都會調用這個方法,調用結果將會放在Collector數據類型中輸出。獲得的Context可以訪問元素的時間戳,元素的key,以及TimerService時間服務。Context還可以將結果輸出到別的流(side outputs)。
?? onTimer:是一個回調函數,當之前注冊的定時器到達觸發時間調用。參數timestamp為定時器所設定的觸發的時間戳。Collector為輸出結果的集合。OnTimerContext和processElement的Context參數一樣,提供了上下文的一些信息,例如定時器觸發的時間信息(事件時間或者處理時間)。
2 低階join
?? 要實現對兩個輸入的低級操作,應用程序可以使用CoProcessFunction或KeyedCoProcessFunction。
?? CoProcessFunction實現對兩個輸入的低階操作,它綁定到兩個不同的輸入流,分別調用processElement1(…)和processElement2(…)對兩個輸入流的數據進行處理
?? 實現低階join通常遵循以下模式:①為一個(或兩個)輸入創建一個狀態對象②當從輸入源收到元素時,更新狀態③從另一個輸入接收元素后,檢索狀態并生成連接的結果
3 KeyedProcessFunction
?? KeyedProcessFunction作為ProcessFunction的擴展,在其onTimer(…)方法中提供對定時器對應key的訪問。
?? KeyedProcessFunction用來操作KeyedStream。KeyedProcessFunction會處理流的每一個元素,輸出為0個、1個或者多個元素。
override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {var key = ctx.getCurrentKey// ... }4 Timers
?? processing-time/event-time timer都由TimerService在內部維護并排隊等待執行,僅在keyed stream中有效。
?? 由于Flink對(每個key+timestamp)只維護一個計時器。如果為相同的timestamp注冊了多個timer ,則只調用onTimer()方法一次。
?? Flink保證同步調用onTimer()和processElement() 。因此用戶不必擔心狀態的并發修改。
?? 容錯:Timer具有容錯和checkpoint能力(基于flink app的狀態)。從故障恢復或從savepoint啟動應用程序時,Timer將被恢復。大量計時器會增加檢查點時間,因為計時器是檢查點狀態的一部分。
?? 定時器合并:由于Flink對每個鍵和時間戳只維護一個計時器,因此可以通過降低計時器頻率來合并計時器,從而減少計時器的數量。 event-time timer只會在watermarks到來時觸發。
//對于1秒的定時器分辨率(事件或處理時間),可以將目標時間舍入整秒。計時器的發射時間最多提前1秒,但不遲于要求的毫秒精度。因此,每鍵最多有一個定時器和第二個定時器。 val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000 ctx.timerService.registerProcessingTimeTimer(coalescedTime)//事件時間計時器只在水印進入的情況下觸發,您還可以使用當前Watermark調度這些計時器并將其與下一個Watermark合并: val coalescedTime = ctx.timerService.currentWatermark + 1 ctx.timerService.registerEventTimeTimer(coalescedTime)//停止處理時間計時器: val timestampOfTimerToStop = ... ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)//停止事件時間計時器: val timestampOfTimerToStop = ... ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)5 官方案例
?? KeyedProcessFunction維護每個鍵的計數,并在沒有對該鍵進行更新的情況下,在一分鐘內(在事件發生時)發出一個鍵/計數對:
- 計數、鍵和最后修改時間戳存儲在ValueState,這是由Key隱式限定范圍的。
- 對于每個記錄,KeyedProcessFunction增加計數器并設置最后修改的時間戳。
- 該函數還會在以后的一分鐘內安排一個回調(在事件發生時)。
- 在每次回調時,它會檢查回調的事件時間戳和存儲計數的最后修改時間,如果它們匹配,則發出鍵/計數(也就是說,在這一分鐘內沒有發生進一步的更新)。
總結
以上是生活随笔為你收集整理的Flink的ProcessFunction API的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据分析方法-聚类算法
- 下一篇: dll侧加载_WORD打开时出现加载DL