flink event-time 和连续窗口的使用
文章目錄
- 1. flink 三種time簡介
- 1.1 Processing Time:
- 1.2 Event time:
- 1.3 Ingestion time:
- 2. flink中使用event-time
- 2.1 在stream-source中直接放入event-time
- 2.2 使用Timestamp Assigners / Watermark Generators
- 2.2.1 AssignerWithPeriodicWatermarks
- 2.2.2 AssignerWithPunctuatedWatermarks
- 3. window的兩種使用方式
- 3.1 evn設置TimeCharacteristic
- 3.2 創建window的時候指定window的類型
- 4. 使用kafka record 的timestamp作為event的timestamp
- 5. watermark的注意事項
- 5.1 . watermark要滿足遞增特性
- 5.2. 多個輸入流需要特別注意
- 6. 連續多個window的計算
1. flink 三種time簡介
flink中有三種時間: processing-time, event-time, ingestion-time
1.1 Processing Time:
Processing Time是指執行程序時對應的物理機系統時間。
當一個流程序通過處理時間來運行時,所有基于時間的操作(如: 時間窗口)將使用各自操作所在的物理機的系統時間。例如:一個每小時處理的時間窗口將包括所有在系統指定的一個小時內到達指定操作的所有記錄。
處理時間是最簡單的時間概念,不需要流和物理機之間的協調,它有最好的性能和最低的延遲。然而,在分布式或者異步環境中,因為受到記錄到達系統時間的影響,處理時間不能夠決定系統內操作之間記錄流的速度。
對于簡單的總量統計模型,則可以采用processing-time 比如統計用戶的發帖總量,實際上對數據是否有序并不敏感。
1.2 Event time:
Event Time是每個獨立事件在產生它的設備上發生的時間,這個時間通常在事件進入Flink之前就已經嵌入到事件中了,并且事件的timestamp是可以從每一個record中抽取出來的。
事件時間可以通過備份或者持久化日志獲取無序數據、延遲事件或者重試數據的正確結果。在事件時間中,時間進度依賴于數據而不是其他形式的時鐘。事件時間程序必須要指定如何產生事件時間水印(Event Time Watermarks),這是事件時間處理進度的信號機制,這個機制在下面描述。對于很多有明確時間以來的數據比較有用,比如統計用戶過去5分鐘內的發帖量,則是有必要使用event-time的,否則如果處理已經堆積的數據,使用proccessing-time則明顯會填進去很多不是對應時間段的數據。
使用event-time,需要我們做兩個工作,
1.為每個record提取他的timestamp,每個事件都要有一個timestamp
2.產生watermark,這個watermark是在整個流當中都會起作用的
1.3 Ingestion time:
攝入時間(Ingestion Time)是事件進入Flink的時間,在源操作中每個記錄都會獲得源的當前時間作為時間戳,后續基于時間的操作(如: time window)會依賴這個時間戳
攝入時間從概念上來講是在event-time和processing-time之間,與處理時間相比,成本可能會高一點,但是會提供更加可預測的結果。因為攝入時間使用的是固定的時間戳(都是在源處指定的),記錄中的不同窗口操作依賴同一個時間戳,而在處理時間中每個窗口操作可能將記錄賦給不同的窗口(根據本地的系統時鐘和傳輸時延)。
與事件時間相比,攝入時間程序不能處理任何無序事件或者延遲事件,但是程序無需指定如何產生水印。
2. flink中使用event-time
flink提供了兩種抽取event-time的方式
2.1 在stream-source中直接放入event-time
DataStreamSource<MyEvent> dataStreamSource = env.addSource(new SourceFunction<MyEvent>() {@Override public void run(SourceContext<MyType> ctx) throws Exception {while (/* condition */) {MyType next = getNext();ctx.collectWithTimestamp(next, next.getEventTimestamp());if (next.hasWatermarkTime()) {ctx.emitWatermark(new Watermark(next.getWatermarkTime()));}} }}這種需要自己對data-source進行封裝,管理起來可能比較麻煩。
2.2 使用Timestamp Assigners / Watermark Generators
這種方式也有兩種,
2.2.1 AssignerWithPeriodicWatermarks
實現這個類需要實現兩個方法
/*** This generator generates watermarks assuming that elements arrive out of order,* but only to a certain degree. The latest elements for a certain timestamp t will arrive* at most n milliseconds after the earliest elements for timestamp t.*/ //這個產生water-mark 對應的情況是相同event-time產生的多個event, //最晚的到達的event只會比最早到達的晚幾毫秒的情況 public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {private final long maxOutOfOrderness = 3500; // 3.5 secondsprivate long currentMaxTimestamp;// 實現了給每個event抽取timestamp@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {long timestamp = element.getCreationTime();currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);return timestamp;}// 實現了獲取watermark@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current highest timestamp minus the out-of-orderness boundreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);} }/*** This generator generates watermarks that are lagging behind processing time by a fixed amount.* It assumes that elements arrive in Flink after a bounded delay.*/ // 這個water-mark允許元素對應當前系統時間有一定的延遲, //感覺不好用,這種是用processing-time來度量延遲的 public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {private final long maxTimeLag = 5000; // 5 seconds@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getCreationTime();}@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current time minus the maximum time lagreturn new Watermark(System.currentTimeMillis() - maxTimeLag);} }在這里 對于每個kafka元素都會調用 extractTimestamp 方法來產生 timestamp
然后再固定的時間片段會調用 getCurrentWatermark ,就像定時任務一樣
具體的調用周期可以使用
進行設置,一般都是幾十毫秒就行了。
2.2.2 AssignerWithPunctuatedWatermarks
這個是針對特定的元素觸發water-mark
public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getCreationTime();}@Overridepublic Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;} }3. window的兩種使用方式
這個實際上說的是具體的api的使用
使用這兩個的前提是先要設置env的時間線類型
3.1 evn設置TimeCharacteristic
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<MyEvent> stream = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter(), typeInfo);DataStream<MyEvent> withTimestampsAndWatermarks = stream.filter( event -> event.severity() == WARNING ).assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());withTimestampsAndWatermarks.keyBy( (event) -> event.getGroup() ).timeWindow(Time.seconds(10)).reduce( (a, b) -> a.add(b) ).addSink(...);3.2 創建window的時候指定window的類型
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> input =env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter(), typeInfo);DataStream<Integer> resultsPerKey = input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new Summer());4. 使用kafka record 的timestamp作為event的timestamp
flink 對kafka consumer做了一些特殊的處理
當你在代碼中進行了如下設置
對于每個kafka元素,在獲取的時候會將kafka record的timestamp作為當前元素的timestamp
所以,如果你選擇使用kafka record的timestamp作為 event-time的話,在實現AssignerWithPeriodicWatermarks 只需要重點關注產生watermark的方法就行了。
下面這個是一個使用kafka record的timestamp作為 event-time的AssignerWithPeriodicWatermarks樣例
5. watermark的注意事項
5.1 . watermark要滿足遞增特性
對于任何一個AssignerWithPeriodicWatermarks ,extractTimestamp產生的是每一個元素具體的timestamp,但是getCurrentWatermark 產生的一系列watermark必須滿足遞增特性,因為window是根據watermark創建和觸發計算以及銷毀的。如果watermark不能滿足遞增特性的話,同一個時間段的window可能會被反復創建,導致數據統計失真。所以在實現AssignerWithPeriodicWatermarks 的extractTimestamp 的時候一定需要注意,要滿足watermark的遞增特性。
下面的代碼保證了watermark是遞增的(嚴格的說是非遞減的)
5.2. 多個輸入流需要特別注意
并行度的引入可能導致可能有些窗口無法被觸發,需要注意,在union的時候,會引入多個并行度,然后window會取每個并行度的最小值來作為窗口的最小warter-mark,這樣有可能會導致water-mark一直沒有辦法觸發。
比如,統計用戶最近30天發帖量,如果使用了兩個kafka-topic,一個是init-topic(存放存量數據),一個是binlog-topic(存放增量數據),使用event-time作為時間線的話,有可能導致window無法觸發,因為init-topic對應的數據是存量,沒有增量數據,但是window中的watermark是取兩個流當中的最小值作為他的watermark,這樣的話會導致window無法觸發計算。
6. 連續多個window的計算
watermark會在產生的地方持續往下游流過去,下游的多個window都會接收到這些watermark,會按照規則觸發計算,當一個大于等于(end-timestamp - 1)的watermark到來的時候,會觸發所有的endTime 小于等于end-timestamp的窗口
可以進行兩個連續的窗口計算,從第一個窗口出來的元素的timestamp都是這個window的 endTime-1.
比如下面的樣例
上面的方式可以在第一個window window01中每隔6s計算一下過去60s的各個key的統計數據,然后再后面第二個window window02會拿到window01中每隔6s輸出的數據,做一次計算,求出topk。
總結
以上是生活随笔為你收集整理的flink event-time 和连续窗口的使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 01.java内存模型
- 下一篇: 01. elastcsearch-mon