事件时间/处理时间/进入时间(Event Time / Processing Time / Ingestion Time)
Flink在流處理程序中支持不同的時間概念。
-
處理時間(processing time):處理時間是指執(zhí)行相應操作的機器的系統(tǒng)時間。
當流處理程序基于處理時間運行時,所有基于時間的操作(如時間窗口)將使用運行相應運算符的機器的系統(tǒng)時鐘。 每小時處理時間窗口將包括在系統(tǒng)時鐘指示整個小時之間到達特定運算符的所有記錄。 例如,如果應用程序在上午9:15開始運行,則第一個每小時處理時間窗口將包括在上午9:15到10:00之間處理的事件,下一個窗口將包括在上午10:00到11:00之間處理的事件,以此類推。
處理時間是最簡單的時間概念,不需要流和機器之間的協(xié)調(diào)。 它提供最佳性能和最低延遲。 但是,在分布式和異步環(huán)境中,處理時間不提供確定性,因為它容易受到記錄到達系統(tǒng)的速度(例如從消息隊列),記錄在系統(tǒng)內(nèi)的運算符之間流動的速度的影響,以及停電(計劃或其他)。 -
事件時間(event time):事件時間是每個事件在其生產(chǎn)設備上發(fā)生的時間。此時間通常在進入Flink之前嵌入記錄中,并且可以從每個記錄中提取該事件時間戳。 在事件時間,時間的進展取決于數(shù)據(jù),而不是任何時鐘。 事件時間程序必須指定如何生成事件時間水印,這是表示事件時間進度的機制。 該水印機制在下面的后面部分中描述。
在一個完美的世界中,事件時間處理將產(chǎn)生完全一致和確定的結(jié)果,無論事件何時到達或其它們的順序。 但是,除非事件已知按順序到達(按時間戳),否則事件時間處理會在等待無序事件時產(chǎn)生一些延遲。 由于只能等待一段有限的時間,因此限制了確定性事件時間應用程序的運行方式。
假設所有數(shù)據(jù)都已到達,事件時間操作將按預期運行,即使在處理無序或延遲事件或重新處理歷史數(shù)據(jù)時也會產(chǎn)生正確且一致的結(jié)果。 例如,每小時事件時間窗口將包含帶有落入該小時的事件時間戳的所有記錄,無論它們到達的順序如何,或者何時處理它們。 (有關(guān)更多信息,請參閱有關(guān)遲到事件的部分。)
請注意,有時基于事件時間的程序處理實時數(shù)據(jù)時,它們將使用一些處理時間(processing time)操作,以保證它們及時進行。 -
進入時間(Ingestion time): 進入時間是事件進入Flink的時間。 在源運算符處,每個記錄將源的當前時間作為時間戳,并且基于時間的操作(如時間窗口)引用該時間戳。
進入時間在概念上位于事件時間和處理時間之間。與處理時間相比,它代價稍高,但可以提供更可預測的結(jié)果。 因為進入時間使用穩(wěn)定的時間戳(在源處分配一次),所以對記錄的不同窗口操作將引用相同的時間戳,而在處理時間中,每個窗口操作符可以將記錄分配給不同的窗口(基于本地系統(tǒng)時鐘和 任何傳輸延誤)。
與事件時間相比,進入時間程序無法處理任何無序事件或延遲數(shù)據(jù),但程序不必指定如何生成水印。
在內(nèi)部,攝取時間與事件時間非常相似,但具有自動分配時間戳和自動生成水印功能。
設置時間特征(Setting a Time Characteristic)
Flink DataStream程序的第一部分通常設置基本時間特性。 該設置定義了數(shù)據(jù)流源的行為方式(例如,它們是否將分配時間戳),以及像KeyedStream.timeWindow(Time.seconds(30))這樣的窗口操作應該使用什么時間概念。
以下示例顯示了一個Flink程序,該程序在每小時時間窗口中聚合事件。 窗戶的行為適應時間特征。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// alternatively: // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));stream.keyBy( (event) -> event.getUser() ).timeWindow(Time.hours(1)).reduce( (a, b) -> a.add(b) ).addSink(...);請注意,為了基于事件時間運行此示例,程序需要使用直接定義數(shù)據(jù)事件時間的源并自己輸出水印,或者程序必須在源之后注入時間戳分配器和水印生成器。 這些函數(shù)描述了如何訪問事件時間戳,以及事件流表現(xiàn)出的無序程度。
以下部分描述了時間戳和水印背后的一般機制。 有關(guān)如何在Flink DataStream API中使用時間戳分配和水印生成的指南,請參閱Generating Timestamps / Watermarks。
事件時間和水印(Event Time and Watermarks)
注意:Flink實現(xiàn)了數(shù)據(jù)流模型中的許多技術(shù)。 有關(guān)活動時間和水印的詳細介紹,請查看以下文章。
- Streaming 101 by Tyler Akidau
- The Dataflow Model paper
支持事件時間的流處理器需要一種方法來衡量事件時間的進度。 例如,當事件時間超過一小時結(jié)束時,需要通知構(gòu)建每小時窗口的窗口運算符,以便運算符可以關(guān)閉正在進行的窗口。
事件時間可以獨立于處理時間(由時鐘測量)進行。 例如,在一個程序中,運算符的當前事件時間可能略微落后于處理時間(考慮到接收事件的延遲),而兩者都以相同的速度進行。 另一方面,通過快速轉(zhuǎn)發(fā)已經(jīng)在Kafka主題(或其它消息隊列)中緩沖的一些歷史數(shù)據(jù),另一個流程序只需幾秒鐘處理幾周的事件時間。
Flink中用于衡量事件時間進度的機制是水印。 水印作為數(shù)據(jù)流的一部分流動并帶有時間戳t。 Watermark(t)聲明事件時間已到達該流中的時間t,這意味著不應該有來自流的具有時間戳t’<= t的元素(即,具有更早或等于水印的時間戳的事件)。
下圖顯示了帶有(邏輯)時間戳的事件流,以及內(nèi)聯(lián)流動的水印。 在該示例中,事件按順序(相對于它們的時間戳),意味著水印是流中的周期性標記。
水印對于無序流是至關(guān)重要的,如下所示,其中事件不按時間戳排序。 通常,水印是一種聲明,通過流中的那一點,到達某個時間戳的所有事件都應該到達。 一旦水印到達運算符,運算符就可以將其內(nèi)部事件時鐘提前到水印的值。
請注意,事件時間由一個新生成的流元素(或多個元素)繼承,這些元素來自生成它們的事件或觸發(fā)創(chuàng)建這些元素的水印。
并行流中的水印(Watermarks in Parallel Streams)
在源函數(shù)處或之后生成水印。 源函數(shù)的每個并行子任務通常獨立地生成其水印。 這些水印定義了該特定并行源的事件時間。
當水印流過流媒處理程序時,它們會在他們到達的運算符處提前事件時間。 每當運算符提前其事件時間時,它就為其后繼運算符生成下游的新水印。
一些運算符消費多個輸入流; 例如union,或者跟隨keyBy(…)或partition(…)函數(shù)的運算符。 這樣的運算符的當前事件時間是其輸入流的事件時間的最小值。 由于其輸入流更新其事件時間,運算符也是如此。
下圖顯示了流經(jīng)并行流的事件和水印的示例,以及跟蹤事件時間的運算符。
請注意,Kafka源支持每分區(qū)水印,您可以在此處詳細了解。
晚到元素(Late Elements)
某些元素可能違反水印條件,這意味著即使在Watermark(t)發(fā)生之后,也會出現(xiàn)更多具有時間戳t’<= t的元素。 實際上,在許多現(xiàn)實世界設置中,某些元素可以被任意延遲,從而無法指定某個事件時間戳的所有元素將發(fā)生的時間。 此外,即使遲到可以被限制,通常也不希望延遲太多水印,因為它在事件時間窗口的計算中引起太大延遲。
出于這個原因,流程序可能明確地預料一些晚到元素。 晚到元素是在系統(tǒng)的事件時鐘(水印產(chǎn)生)之后到達的元素,事件時鐘已經(jīng)超過了晚到元素的時間戳。 有關(guān)如何在事件時間窗口中使用延遲元素的更多信息,請參閱Allowed Lateness。
空閑源(Idling sources)
目前,對于純事件時間水印生成器,如果沒有要處理的元素,則水印不能產(chǎn)出。 這意味著在輸入數(shù)據(jù)存在間隙的情況下,事件時間將不會進行,例如窗口操作符將不會被觸發(fā),因此現(xiàn)有窗口將不能產(chǎn)生任何輸出數(shù)據(jù)。
為了避免這種情況,可以使用定期水印分配器,它們不僅基于元素時間戳進行分配。 示例解決方案可以是在觀察不到新事件一段時間之后切換到使用當前處理時間作為時間基礎(chǔ)的分配器。
可以使用SourceFunction.SourceContext #markAsTemporarilyIdle將源標記為空閑。 有關(guān)詳細信息,請參閱此方法的Javadoc以及StreamStatus。
調(diào)試水印(Debugging Watermarks)
有關(guān)在運行時調(diào)試水印的信息,請參閱Debugging Windows & Event Time。
運算符如何處理水印(How operators are processing watermarks)
作為一般規(guī)則,運算符需要在向下游轉(zhuǎn)發(fā)之前完全處理給定的水印。 例如,WindowOperator將首先評估應該觸發(fā)哪些窗口,并且只有在產(chǎn)生由水印觸發(fā)的所有輸出之后,水印本身才會被發(fā)送到下游。 換句話說,由于出現(xiàn)水印而產(chǎn)生的所有元素將在水印之前發(fā)出。
同樣的規(guī)則適用于TwoInputStreamOperator。 但是,在這種情況下,運算符的當前水印被定義為其兩個輸入的最小值。
此行為的詳細信息由OneInputStreamOperator#processWatermark,TwoInputStreamOperator#processWatermark1和TwoInputStreamOperator#processWatermark2方法的實現(xiàn)定義。
總結(jié)
以上是生活随笔為你收集整理的事件时间/处理时间/进入时间(Event Time / Processing Time / Ingestion Time)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 宝石花是国企么
- 下一篇: Ambari2.7.4+HDP3.1.4