Flink触发器Triggers
觸發器(Triggers)
觸發器確定窗口(由窗口分配器形成)何時準備好由窗口功能處理。每個WindowAssigner都帶有一個默認觸發器。如果默認觸發器不適合您的需求,則可以使用trigger(...)指定自定義觸發器。
trigger觸發器接口有五個方法允許trigger對不同的事件做出反應:
-
onElement()進入窗口的每個元素都會調用該方法。
-
onEventTime()事件時間timer觸發的時候被調用。
-
onProcessingTime()處理時間timer觸發的時候會被調用。
-
onMerge()有狀態的觸發器相關,并在它們相應的窗口合并時合并兩個觸發器的狀態,例如使用會話窗口。
-
clear()該方法主要是執行窗口的刪除操作。
關于上述方法需要注意兩點:
1).前三方法決定著如何通過返回一個TriggerResult來操作輸入事件。
CONTINUE:什么都不做。
FIRE:觸發計算。
PURE:清除窗口的元素。
FIRE_AND_PURE:觸發計算和清除窗口元素。
2). 這些方法中的任何一個都可用于為將來的操作注冊處理或事件時間計時器
Fire和Purge
一旦觸發器確定窗口已準備好進行處理,它將觸發,即返回FIRE或FIRE_AND_PURGE。這是窗口操作員發出當前窗口結果的信號。給定一個帶有ProcessWindowFunction的窗口,所有元素都將傳遞給ProcessWindowFunction(可能在將它們傳遞給逐出者之后)。具有ReduceFunction,AggregateFunction或FoldFunction的Windows只會發出其急切的聚合結果。
當觸發器觸發時,它可以是FIRE或FIRE_AND_PURGE。在FIRE保留窗口內容的同時,FIRE_AND_PURGE刪除其內容。默認情況下,預實現的觸發器僅觸發FIRE,而不會清除窗口狀態。
注意??: 清除將僅刪除窗口的內容,并將保留有關該窗口的任何潛在元信息以及任何觸發狀態。默認觸發器
WindowAssigner的默認觸發器適用于許多用例。例如,所有事件時間窗口分配器都有一個EventTimeTrigger作為默認觸發器。一旦WaterMark通過窗口的末端,該觸發器便會觸發。
注意??: GlobalWindow的默認觸發器是NeverTrigger,它從不觸發。因此,在使用GlobalWindow時,您始終必須定義一個自定義觸發器。 通過使用trigger()指定觸發器,您將覆蓋WindowAssigner的默認觸發器。例如,如果為TumblingEventTimeWindows指定CountTrigger, 則將不再基于時間進度而是僅通過計數來獲取窗口觸發。現在,如果要基于時間和計數做出反應,則必須編寫自己的自定義觸發器。內置和自定義觸發器
Flink帶有一些內置觸發器。
-
EventTimeTrigger基于事件時間和watermark機制來對窗口進行觸發計算。
-
ProcessingTimeTrigger基于處理時間觸發。
-
CountTrigger窗口元素數超過預先給定的限制值的話會觸發計算。
-
PurgingTrigger作為其它trigger的參數,將其轉化為一個purging觸發器。
如果需要實現自定義觸發器,則應該實現Trigger類。請注意,API仍在不斷發展,并可能在Flink的未來版本中更改。
import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** <p/>* <li>title: DataStream 觸發器</li>* <li>@author: li.pan</li>* <li>Date: 2019/12/29 5:00 下午</li>* <li>Version: V1.0</li>* <li>Description: 自定義元素個數觸發器</li>*/ public class CustomProcessingTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private CustomProcessingTimeTrigger() {}private static int flag = 0;@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {ctx.registerProcessingTimeTimer(window.maxTimestamp());// CONTINUE是代表不做輸出,也即是,此時我們想要實現比如100條輸出一次,// 而不是窗口結束再輸出就可以在這里實現。if(flag > 9){flag = 0;return TriggerResult.FIRE;}else{flag++;}System.out.println("onElement : "+element);return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {return TriggerResult.FIRE_AND_PURGE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteProcessingTimeTimer(window.maxTimestamp());}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(TimeWindow window, OnMergeContext ctx) {// only register a timer if the time is not yet past the end of the merged window// this is in line with the logic in onElement(). If the time is past the end of// the window onElement() will fire and setting a timer here would fire the window twice.long windowMaxTimestamp = window.maxTimestamp();if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {ctx.registerProcessingTimeTimer(windowMaxTimestamp);}}@Overridepublic String toString() {return "ProcessingTimeTrigger()";}/*** 創建一個自定義觸發器對象*/public static CustomProcessingTimeTrigger create() {return new CustomProcessingTimeTrigger();}}?
?
總結
以上是生活随笔為你收集整理的Flink触发器Triggers的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql 表中触发器 TRIGGERS
- 下一篇: WPF-Interaction.Trig