flink自定义trigger详解
適用的場景解釋:
[1]中有句話是這樣的:
"其實,我們要實現基于事件時間的窗口隨意輸出,比如1000個元素觸發一次輸出,那么我們就可以通過修改這個觸發器來實現。"
這句話的意思是,默認的自帶的trigger一般是基于EventTime的。
那么這1000 個元素可能跨度是一小時,也可能跨度是兩小時,對吧
但是顯然默認的Trigger只能是盯著EventTime(時間戳)來決定是否觸發計算,并不能根據元素個數進行觸發。
也就是說,默認的Trigger盯著的跨度是"時間差"。而不是"個數差"
講人話就是:
①例如Flink的Trigger默認每隔一天輸出統計數據,
②但是不支持默認每隔一千個訂單輸出統計數據。
但是注意這里的一千個統計數據可能超過一天,甚至超過一周,耗時可能不固定。
因為你想啊,代碼都是要把邏輯寫死的對吧?
一千個訂單可能一開始耗時一周,后來耗時一個月。那程序要怎么根據變化的時間來鎖定一千個訂單觸發一次?
顯然做不到,這個時候我們就希望鎖定"個數間隔"、“個數差”,這個時候就需要自定義Trigger
?
官方文檔說明:
?
下面是官方文檔[4]中Triggers這一節的內容概括
.
| 需要override的函數 | 函數作用 |
| onElement() | 數據(element)被加入window的時候會調用該函數 |
| onEventTime()? | 當一個注冊的Event-Time定時器觸發 |
| onProcessingTime()? | 當一個注冊的Processing-Time定時器觸發 |
| onMerge() | 與有狀態觸發器(stateful triggers)和當兩個窗口整合的時候整合(merge)狀態相關。 例如使用session windows |
| clear() | window清理數據需要 |
?
前面三個用來設定調用事件(invocation event)以后如何操作,
所以這些"操作"必須是一個TriggerResult
也就是說,前三個函數返回的TriggerResult可以是下面幾種選擇:
| 返回的TriggerResult | 作用 |
| CONTINUE | 什么都不做 |
| FIRE | 觸發計算 |
| PURGE | 刪除窗口中的所有數據 |
| FIRE_AND_PURG | 觸發計算后刪除窗口中所有數據 |
然后是Fire and Purge這一節的內容:
觸發計算時,返回的一定是FIRE或者FIRE_AND_PURG(這個話僅僅是來自官方文檔的翻譯,其實Intellij提示的選項并不僅僅是上面幾個)
?
?
具體示范代碼參考[5]即可
private static Logger LOG = LoggerFactory.getLogger(CountTriggerWithTimeout.class);/*** 窗口最大數據量*/private int maxCount;/*** event time / process time*/private TimeCharacteristic timeType;/*** 用于儲存窗口當前數據量的狀態對象*/private ReducingStateDescriptor<Long> countStateDescriptor =new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE);public CountTriggerWithTimeout(int maxCount, TimeCharacteristic timeType) {this.maxCount = maxCount;this.timeType = timeType;}private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {clear(window, ctx);return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);countState.add(1L);if (countState.get() >= maxCount) {LOG.info("fire with count: " + countState.get());return fireAndPurge(window, ctx);}if (timestamp >= window.getEnd()) {LOG.info("fire with tiem: " + timestamp);return fireAndPurge(window, ctx);} else {return TriggerResult.CONTINUE;}}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (timeType != TimeCharacteristic.ProcessingTime) {return TriggerResult.CONTINUE;}if (time >= window.getEnd()) {return TriggerResult.CONTINUE;} else {LOG.info("fire with process tiem: " + time);return fireAndPurge(window, ctx);}}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (timeType != TimeCharacteristic.EventTime) {return TriggerResult.CONTINUE;}if (time >= window.getEnd()) {return TriggerResult.CONTINUE;} else {LOG.info("fire with event tiem: " + time);return fireAndPurge(window, ctx);}}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);countState.clear();}/*** 計數方法*/class Sum implements ReduceFunction<Long> {@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return value1 + value2;}} }?
?
Reference:
[1]flink自定義trigger-實現窗口隨意輸出
[2]Flink 自定義Trigger
[3]Flink 自定義trigger
[4]flink官方文檔-窗口
[5]Flink 自定義觸發器
總結
以上是生活随笔為你收集整理的flink自定义trigger详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 家装全屋wifi方案对比与推荐 全屋wi
- 下一篇: 分析师:2025年Threads能为Me