flink如何设置以每天零点到第二天零点为区间的window进行计算
生活随笔
收集整理的這篇文章主要介紹了
flink如何设置以每天零点到第二天零点为区间的window进行计算
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
環境
flink1.11.2
JAVA
stream API
timewindow
背景
公司之前的指標是以分鐘為單位的滾動窗口進行檢查,然后在查詢系統里查詢的時候,對該天所有的分鐘數據進行聚合統計。
?當前需要在flink中添加以天為單位的Job進行額外指標檢查。指標出來之后和發現數據口徑不一致,flink中默認是timeWindow按天進行滾動統計的數據是每天八點到第二天八點的數據。
導致統計指標的含義對不上,沒有參考意義和進行不同數據間的join。
解決方案
使用window配置自定義的窗口分隔TumblingEventTimeWindows對象(因為現在處理數據基本都使用的flink? eventTime作為數據時間進行處理,所以例子中需要數據流的時間用的是eventtime, 使用processtime的話可以使用TumblingProcessTimeWindows處理,講道理應該配置都一樣)
話不多說直接上代碼吧。
默認情況8點->8點的時間統計的代碼:
// 原始數據流 DataStream<RawDataEvent> gyhUserRegisterStream = StreamTransformCommon.preprocessingLogData(rawDataStreamMap.get("gyhUserRegister"));// 進行數據清洗統計的邏輯 DataStream<Object> targetData = rawWebLogData.filter(x -> x.userId != null).map(StreamTransformCommon::renameAppInfoName).filter(x -> x != null).keyBy("page").timeWindow(Time.days(1)) // 默認情況下 以天為單位的滾動窗口.aggregate(new StreamTransformCommon.CountAgg(), new StreamTransformCommon.WindowResultFuction());每天0點->0點的時間窗口統計代碼(實際上可以舉一反三搞出任意想要的時間的規則):
// 原始數據流 DataStream<RawDataEvent> gyhUserRegisterStream = StreamTransformCommon.preprocessingLogData(rawDataStreamMap.get("gyhUserRegister"));// 進行數據清洗統計的邏輯 DataStream<Object> targetData = rawWebLogData.filter(x -> x.userId != null).map(StreamTransformCommon::renameAppInfoName).filter(x -> x != null).keyBy("page").window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(16))) // 改改參數,就可以調整到自己想要的時間窗口統計規則.aggregate(new StreamTransformCommon.CountAgg(), new StreamTransformCommon.WindowResultFuction());結果
大家可以在操作windowFunction的時候打印一下apply方法參數中的TimeWindow對象的起止時間驗證一下。我這邊屢試不爽,問題解決了記錄一下這個過程。
總結
以上是生活随笔為你收集整理的flink如何设置以每天零点到第二天零点为区间的window进行计算的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 最常用、最好用的vue服务端渲染框架
- 下一篇: pthread_cancel 退出线程引