Apache Flink 进阶入门(二):Time 深度解析
前言
Flink 的 API 大體上可以劃分為三個層次:處于最底層的 ProcessFunction、中間一層的 DataStream API 和最上層的 SQL/Table API,這三層中的每一層都非常依賴于時間屬性。時間屬性是流處理中最重要的一個方面,是流處理系統(tǒng)的基石之一,貫穿這三層 API。在 DataStream API 這一層中因為封裝方面的原因,我們能夠接觸到時間的地方不是很多,所以我們將重點放在底層的 ProcessFunction 和最上層的 SQL/Table API。
Flink 時間語義
在不同的應用場景中時間語義是各不相同的,Flink 作為一個先進的分布式流處理引擎,它本身支持不同的時間語義。其核心是 Processing Time 和 Event Time(Row Time),這兩類時間主要的不同點如下表所示:
Processing Time 是來模擬我們真實世界的時間,其實就算是處理數(shù)據(jù)的節(jié)點本地時間也不一定就是完完全全的我們真實世界的時間,所以說它是用來模擬真實世界的時間。而 Event Time 是數(shù)據(jù)世界的時間,就是我們要處理的數(shù)據(jù)流世界里面的時間。關于他們的獲取方式,Process Time 是通過直接去調(diào)用本地機器的時間,而 Event Time 則是根據(jù)每一條處理記錄所攜帶的時間戳來判定。
這兩種時間在 Flink 內(nèi)部的處理以及還是用戶的實際使用方面,難易程度都是不同的。相對而言的 Processing Time 處理起來更加的簡單,而 Event Time 要更麻煩一些。而在使用 Processing Time 的時候,我們得到的處理結(jié)果(或者說流處理應用的內(nèi)部狀態(tài))是不確定的。而因為在 Flink 內(nèi)部對 Event Time 做了各種保障,使用 Event Time 的情況下,無論重放數(shù)據(jù)多少次,都能得到一個相對確定可重現(xiàn)的結(jié)果。
因此在判斷應該使用 Processing Time 還是 Event Time 的時候,可以遵循一個原則:當你的應用遇到某些問題要從上一個 checkpoint 或者 savepoint 進行重放,是不是希望結(jié)果完全相同。如果希望結(jié)果完全相同,就只能用 Event Time;如果接受結(jié)果不同,則可以用 Processing Time。Processing Time 的一個常見的用途是,我們要根據(jù)現(xiàn)實時間來統(tǒng)計整個系統(tǒng)的吞吐,比如要計算現(xiàn)實時間一個小時處理了多少條數(shù)據(jù),這種情況只能使用 Processing Time。
時間的特性
時間的一個重要特性是:時間只能遞增,不會來回穿越。?在使用時間的時候我們要充分利用這個特性。假設我們有這么一些記錄,然后我們來分別看一下 Processing Time 還有 Event Time 對于時間的處理。
- 對于 Processing Time,因為我們是使用的是本地節(jié)點的時間(假設這個節(jié)點的時鐘同步?jīng)]有問題),我們每一次取到的 Processing Time 肯定都是遞增的,遞增就代表著有序,所以說我們相當于拿到的是一個有序的數(shù)據(jù)流。
- 而在用 Event Time 的時候因為時間是綁定在每一條的記錄上的,由于網(wǎng)絡延遲、程序內(nèi)部邏輯、或者其他一些分布式系統(tǒng)的原因,數(shù)據(jù)的時間可能會存在一定程度的亂序,比如上圖的例子。在 Event Time 場景下,我們把每一個記錄所包含的時間稱作 Record Timestamp。如果 Record Timestamp 所得到的時間序列存在亂序,我們就需要去處理這種情況。
如果單條數(shù)據(jù)之間是亂序,我們就考慮對于整個序列進行更大程度的離散化。簡單地講,就是把數(shù)據(jù)按照一定的條數(shù)組成一些小批次,但這里的小批次并不是攢夠多少條就要去處理,而是為了對他們進行時間上的劃分。經(jīng)過這種更高層次的離散化之后,我們會發(fā)現(xiàn)最右邊方框里的時間就是一定會小于中間方框里的時間,中間框里的時間也一定會小于最左邊方框里的時間。
這個時候我們在整個時間序列里插入一些類似于標志位的一些特殊的處理數(shù)據(jù),這些特殊的處理數(shù)據(jù)叫做 watermark。一個 watermark 本質(zhì)上就代表了這個 watermark 所包含的 timestamp 數(shù)值,表示以后到來的數(shù)據(jù)已經(jīng)再也沒有小于或等于這個時間的了。
Timestamp 和 Watermark 行為概覽
接下來我們重點看一下 Event Time 里的 Record Timestamp(簡寫成 timestamp)和 watermark 的一些基本信息。絕大多數(shù)的分布式流計算引擎對于數(shù)據(jù)都是進行了 DAG 圖的抽象,它有自己的數(shù)據(jù)源,有處理算子,還有一些數(shù)據(jù)匯。數(shù)據(jù)在不同的邏輯算子之間進行流動。watermark 和 timestamp 有自己的生命周期,接下來我會從 watermark 和 timestamp 的產(chǎn)生、他們在不同的節(jié)點之間的傳播、以及在每一個節(jié)點上的處理,這三個方面來展開介紹。
Timestamp 分配和 Watermark 生成
Flink 支持兩種 watermark 生成方式。第一種是在 SourceFunction 中產(chǎn)生,相當于把整個的 timestamp 分配和 watermark 生成的邏輯放在流處理應用的源頭。我們可以在 SourceFunction 里面通過這兩個方法產(chǎn)生 watermark:
- 通過 collectWithTimestamp 方法發(fā)送一條數(shù)據(jù),其中第一個參數(shù)就是我們要發(fā)送的數(shù)據(jù),第二個參數(shù)就是這個數(shù)據(jù)所對應的時間戳;也可以調(diào)用 emitWatermark 方法去產(chǎn)生一條 watermark,表示接下來不會再有時間戳小于等于這個數(shù)值記錄。
- 另外,有時候我們不想在 SourceFunction 里生成 timestamp 或者 watermark,或者說使用的 SourceFunction 本身不支持,我們還可以在使用 DataStream API 的時候指定,調(diào)用的 DataStream.assignTimestampsAndWatermarks 這個方法,能夠接收不同的 timestamp 和 watermark 的生成器。
總體上而言生成器可以分為兩類:第一類是定期生成器;第二類是根據(jù)一些在流處理數(shù)據(jù)流中遇到的一些特殊記錄生成的。
兩者的區(qū)別主要有三個方面,首先定期生成是現(xiàn)實時間驅(qū)動的,這里的“定期生成”主要是指 watermark(因為 timestamp 是每一條數(shù)據(jù)都需要有的),即定期會調(diào)用生成邏輯去產(chǎn)生一個 watermark。而根據(jù)特殊記錄生成是數(shù)據(jù)驅(qū)動的,即是否生成 watermark 不是由現(xiàn)實時間來決定,而是當看到一些特殊的記錄就表示接下來可能不會有符合條件的數(shù)據(jù)再發(fā)過來了,這個時候相當于每一次分配 Timestamp 之后都會調(diào)用用戶實現(xiàn)的 watermark 生成方法,用戶需要在生成方法中去實現(xiàn) watermark 的生成邏輯。
大家要注意的是就是我們在分配 timestamp 和生成 watermark 的過程,雖然在 SourceFunction 和 DataStream 中都可以指定,但是還是建議生成的工作越靠近 DataSource 越好。這樣會方便讓程序邏輯里面更多的 operator 去判斷某些數(shù)據(jù)是否亂序。Flink 內(nèi)部提供了很好的機制去保證這些 timestamp 和 watermark 被正確地傳遞到下游的節(jié)點。
Watermark 傳播
具體的傳播策略基本上遵循這三點。
- 首先,watermark 會以廣播的形式在算子之間進行傳播。比如說上游的算子,它連接了三個下游的任務,它會把自己當前的收到的 watermark 以廣播的形式傳到下游。
- 第二,如果在程序里面收到了一個 Long.MAX_VALUE 這個數(shù)值的 watermark,就表示對應的那一條流的一個部分不會再有數(shù)據(jù)發(fā)過來了,它相當于就是一個終止的一個標志。
- 第三,對于單流而言,這個策略比較好理解,而對于有多個輸入的算子,watermark 的計算就有講究了,一個原則是:單輸入取其大,多輸入取小。
舉個例子,假設這邊藍色的塊代表一個算子的一個任務,然后它有三個輸入,分別是 W1、W2、W3,這三個輸入可以理解成任何的輸入,這三個輸入可能是屬于同一個流,也可能是屬于不同的流。然后在計算 watermark 的時候,對于單個輸入而言是取他們的最大值,因為我們都知道 watermark 應該遵循一個單調(diào)遞增的一個原則。對于多輸入,它要統(tǒng)計整個算子任務的 watermark 時,就會取這三個計算出來的 watermark 的最小值。即一個多個輸入的任務,它的 watermark 受制于最慢的那條輸入流。這一點類似于木桶效應,整個木桶中裝的水會就是受制于最矮的那塊板。
watermark 在傳播的時候有一個特點是,它的傳播是冪等的。多次收到相同的 watermark,甚至收到之前的 watermark 都不會對最后的數(shù)值產(chǎn)生影響,因為對于單個輸入永遠是取最大的,而對于整個任務永遠是取一個最小的。
同時我們可以注意到這種設計其實有一個局限,具體體現(xiàn)在它沒有區(qū)分你這個輸入是一條流多個 partition 還是來自于不同的邏輯上的流的 JOIN。對于同一個流的不同 partition,我們對他做這種強制的時鐘同步是沒有問題的,因為一開始就是把一條流拆散成不同的部分,但每一個部分之間共享相同的時鐘。但是如果算子的任務是在做類似于 JOIN 操作,那么要求你兩個輸入的時鐘強制同步其實沒有什么道理的,因為完全有可能是把一條離現(xiàn)在時間很近的數(shù)據(jù)流和一個離當前時間很遠的數(shù)據(jù)流進行 JOIN,這個時候?qū)τ诳斓哪菞l流,因為它要等慢的那條流,所以說它可能就要在狀態(tài)中去緩存非常多的數(shù)據(jù),這對于整個集群來說是一個很大的性能開銷。
ProcessFunction
在正式介紹 watermark 的處理之前,先簡單介紹 ProcessFunction,因為 watermark 在任務里的處理邏輯分為內(nèi)部邏輯和外部邏輯。外部邏輯其實就是通過 ProcessFunction 來體現(xiàn)的,如果你需要使用 Flink 提供的時間相關的 API 的話就只能寫在 ProcessFunction 里。
ProcessFunction 和時間相關的功能主要有三點:
- 第一點就是根據(jù)你當前系統(tǒng)使用的時間語義不同,你可以去獲取當前你正在處理這條記錄的 Record Timestamp,或者當前的 Processing Time。
- 第二點就是它可以獲取當前算子的時間,可以把它理解成當前的 watermark。
- 第三點就是為了在 ProcessFunction 中去實現(xiàn)一些相對復雜的功能,允許注冊一些 timer(定時器)。比如說在 watermark 達到某一個時間點的時候就觸發(fā)定時器,所有的這些回調(diào)邏輯也都是由用戶來提供,涉及到如下三個方法,registerEventTimeTimer、registerProcessingTimeTimer 和 onTimer。在 onTimer 方法中就需要去實現(xiàn)自己的回調(diào)邏輯,當條件滿足時回調(diào)邏輯就會被觸發(fā)。
一個簡單的應用是,我們在做一些時間相關的處理的時候,可能需要緩存一部分數(shù)據(jù),但這些數(shù)據(jù)不能一直去緩存下去,所以需要有一些過期的機制,我們可以通過 timer 去設定這么一個時間,指定某一些數(shù)據(jù)可能在將來的某一個時間點過期,從而把它從狀態(tài)里刪除掉。所有的這些和時間相關的邏輯在 Flink 內(nèi)部都是由自己的 Time Service(時間服務)完成的。
Watermark處理
一個算子的實例在收到 watermark 的時候,首先要更新當前的算子時間,這樣的話在 ProcessFunction 里方法查詢這個算子時間的時候,就能獲取到最新的時間。第二步它會遍歷計時器隊列,這個計時器隊列就是我們剛剛說到的 timer,你可以同時注冊很多 timer,Flink 會把這些 Timer 按照觸發(fā)時間放到一個優(yōu)先隊列中。第三步 Flink 得到一個時間之后就會遍歷計時器的隊列,然后逐一觸發(fā)用戶的回調(diào)邏輯。 通過這種方式,Flink 的某一個任務就會將當前的 watermark 發(fā)送到下游的其他任務實例上,從而完成整個 watermark 的傳播,從而形成一個閉環(huán)。
Table API 中的時間
下面我們來看一看 Table/SQL API 中的時間。為了讓時間參與到 Table/SQL 這一層的運算中,我們需要提前把時間屬性放到表的 schema 中,這樣的話我們才能夠在 SQL 語句或者 Table 的一些邏輯表達式里面去使用這些時間去完成需求。
Table 中指定時間列
其實之前社區(qū)就怎么在 Table/SQL 中去使用時間這個問題做過一定的討論,是把獲取當前 Processing Time 的方法是作為一個特殊的 UDF,還是把這一個列物化到整個的 schema 里面,最終采用了后者。我們這里就分開來講一講 Processing Time 和 Event Time 在使用的時候怎么在 Table 中指定。
對于 Processing Time,我們知道要得到一個 Table 對象(或者注冊一個 Table)有兩種手段:
(1)可以從一個 DataStream 轉(zhuǎn)化成一個 Table;
(2)直接通過 TableSource 去生成這么一個 Table;
對于第一種方法而言,我們只需要在你已有的這些列中(例子中 f1 和 f2 就是兩個已有的列),在最后用“列名.proctime”這種寫法就可以把最后的這一列注冊為一個 Processing Time,以后在寫查詢的時候就可以去直接使用這一列。如果 Table 是通過 TableSource 生成的,就可以通過實現(xiàn)這一個 DefinedRowtimeAttributes 接口,然后就會自動根據(jù)你提供的邏輯去生成對應的 Processing Time。
相對而言,在使用 Event Time 時則有一個限制,因為 Event Time 不像 Processing Time 那樣是隨拿隨用。如果你要從 DataStream 去轉(zhuǎn)化得到一個 Table,必須要提前保證原始的 DataStream 里面已經(jīng)存在了 Record Timestamp 和 watermark。如果你想通過 TableSource 生成的,也一定要保證你要接入的一個數(shù)據(jù)里面存在一個類型為 long 或者 timestamp 的這么一個時間字段。
具體來說,如果你要從 DataStream 去注冊一個表,和 proctime 類似,你只需要加上“列名.rowtime”就可以。需要注意的是,如果你要用 Processing Time,必須保證你要新加的字段是整個 schema 中的最后一個字段,而 Event Time 的時候你其實可以去替換某一個已有的列,然后 Flink 會自動的把這一列轉(zhuǎn)化成需要的 rowtime 這個類型。 如果是通過 TableSource 生成的,只需要實現(xiàn) DefinedRowtimeAttributes 接口就可以了。需要說明的一點是,在 DataStream API 這一側(cè)其實不支持同時存在多個 Event Time(rowtime),但是在 Table 這一層理論上可以同時存在多個 rowtime。因為 DefinedRowtimeAttributes 接口的返回值是一個對于 rowtime 描述的 List,即其實可以同時存在多個 rowtime 列,在將來可能會進行一些其他的改進,或者基于去做一些相應的優(yōu)化。
時間列和Table操作
指定完了時間列之后,當我們要真正去查詢時就會涉及到一些具體的操作。這里我列舉的這些操作都是和時間列緊密相關,或者說必須在這個時間列上才能進行的。比如說“Over 窗口聚合”和“Group by 窗口聚合”這兩種窗口聚合,在寫 SQL 提供參數(shù)的時候只能允許你在這個時間列上進行這種聚合。第三個就是時間窗口聚合,你在寫條件的時候只支持對應的時間列。最后就是排序,我們知道在一個無盡的數(shù)據(jù)流上對數(shù)據(jù)做排序幾乎是不可能的事情,但因為這個數(shù)據(jù)本身到來的順序已經(jīng)是按照時間屬性來進行排序,所以說我們?nèi)绻獙σ粋€ DataStream 轉(zhuǎn)化成 Table 進行排序的話,你只能是按照時間列進行排序,當然同時你也可以指定一些其他的列,但是時間列這個是必須的,并且必須放在第一位。
為什么說這些操作只能在時間列上進行?因為我們有的時候可以把到來的數(shù)據(jù)流就看成是一張按照時間排列好的一張表,而我們?nèi)魏螌τ诒淼牟僮?#xff0c;其實都是必須在對它進行一次順序掃描的前提下完成的。因為大家都知道數(shù)據(jù)流的特性之一就是一過性,某一條數(shù)據(jù)處理過去之后,將來其實不太好去訪問它。當然因為 Flink 中內(nèi)部提供了一些狀態(tài)機制,我們可以在一定程度上去弱化這個特性,但是最終還是不能超越的限制狀態(tài)不能太大。所有這些操作為什么只能在時間列上進行,因為這個時間列能夠保證我們內(nèi)部產(chǎn)生的狀態(tài)不會無限的增長下去,這是一個最終的前提。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的Apache Flink 进阶入门(二):Time 深度解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 神仙在双11晚上,都干了些啥?
- 下一篇: 云原生下日志方案的架构设计