flink 三种时间机制_Flink的时间与watermarks详解
時間語義
基本概念
時間是Flink等流處理中最重要的概念之一,在 Flink 中 Time 可以分為三種:Event-Time,Processing-Time 以及 Ingestion-Time,如下圖所示:
Event Time
事件時間,事件(Event)本身的時間,即數(shù)據(jù)流中事件實際發(fā)生的時間,通常使用事件發(fā)生時的時間戳來描述,這些事件的時間戳通常在進入流處理應用之前就已經(jīng)存在了,事件時間反映了事件真實的發(fā)生時間。所以,基于事件時間的計算操作,其結(jié)果是具有確定性的,無論數(shù)據(jù)流的處理速度如何、事件到達算子的順序是否會亂,最終生成的結(jié)果都是一樣的。
Ingestion Time
攝入時間,事件進入Flink的時間,即將每一個事件在數(shù)據(jù)源算子的處理時間作為事件時間的時間戳,并自動生成水位線(watermarks,關于watermarks下文會詳細分析)。
Ingestion Time從概念上講介于Event Time和Processing Time之間。與Processing Time相比 ,它的性能消耗更多一些,但結(jié)果卻更可預測。由于 Ingestion Time使用穩(wěn)定的時間戳(在數(shù)據(jù)源處分配了一次),因此對記錄的不同窗口操作將引用相同的時間戳,而在Processing Time中每個窗口算子都可以將記錄分配給不同的窗口。
與Event Time相比,Ingestion Time無法處理任何亂序事件或遲到的數(shù)據(jù),即無法提供確定的結(jié)果,但是程序不必指定如何生成水位線。在內(nèi)部,Ingestion Time與Event Time非常相似,但是可以實現(xiàn)自動分配時間戳和自動生成水位線的功能。
Processing Time
處理時間,根據(jù)處理機器的系統(tǒng)時鐘決定數(shù)據(jù)流當前的時間,即事件被處理時當前系統(tǒng)的時間。還以窗口算子為例(關于window,下文會詳細分析),基于處理時間的窗口操作是以機器時間來進行觸發(fā)的,由于數(shù)據(jù)到達窗口的速率不同,所以窗口算子中使用處理時間會導致不確定的結(jié)果。在使用處理時間時,無需等待水位線的到來后進行觸發(fā)窗口,所以可以提供較低的延遲。
對比
經(jīng)過上面的分析,應該對Flink的時間語義有了大致的了解。不知道你會不會有這樣一個疑問:既然事件時間已經(jīng)能夠解決所有的問題了,那為何還要用處理時間呢?其實處理時間有其特定的使用場景,處理時間由于不用考慮事件的延遲與亂序,所以其處理數(shù)據(jù)的延遲較低。因此如果一些應用比較重視處理速度而非準確性,那么就可以使用處理時間,比如要實時監(jiān)控儀表盤。總之,雖然處理時間的延遲較低,但是其結(jié)果具有不確定性,事件時間雖然有延遲,但是能夠保證處理的結(jié)果具有準確性,并且可以處理延遲甚至無序的數(shù)據(jù)。
使用
上一小結(jié)講述了三種時間語義的基本概念,接下來將從代碼層面講解在程序中該如何配置這三種時間語義。首先來看一段代碼:
/**?The?time?characteristic?that?is?used?if?none?other?is?set.?*/????private?static?final?TimeCharacteristic?DEFAULT_TIME_CHARACTERISTIC?=?TimeCharacteristic.ProcessingTime;
//省略的代碼
/**?The?time?characteristic?used?by?the?data?streams.?*/
????private?TimeCharacteristic?timeCharacteristic?=?DEFAULT_TIME_CHARACTERISTIC;
上述兩行代碼摘自StreamExecutionEnvironment類,可以看出,Flink在流處理程序中默認的時間語義是Processing Time,那么該如何修改默認的時間語義呢?很簡單,再來看一段代碼,下面的代碼片段同樣來自于StreamExecutionEnvironment類:
????/**?????*?如果使用Processing?Time或者Event?Time,默認的水位線間隔時間是200毫秒
?????*?可以通過ExecutionConfig#setAutoWatermarkInterval(long)設置
?????*?@param?characteristic?The?time?characteristic.
?????*/
????@PublicEvolving
????public?void?setStreamTimeCharacteristic(TimeCharacteristic?characteristic)?{
????????this.timeCharacteristic?=?Preconditions.checkNotNull(characteristic);
????????if?(characteristic?==?TimeCharacteristic.ProcessingTime)?{
????????????getConfig().setAutoWatermarkInterval(0);
????????}?else?{
????????????getConfig().setAutoWatermarkInterval(200);
????????}
????}
上述的方法可以配置不同的時間語義,參數(shù)TimeCharacteristic是一個枚舉類,包括ProcessingTime,IngestionTime,EventTime三個元素。具體使用方式如下:
//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
watermarks
在解釋watermarks(水位線)之前,先看一個我們身邊發(fā)生的真實案例。高考,是大家非常熟悉的場景。如果把高考的考試安排簡單地看作是一個流處理應用,那么,每一個考試科目的開始時間到結(jié)束時間就是一個窗口,每個考生可以理解成一條記錄,考生到達考場的時間可以理解成記錄的時間戳,而考試可以理解成某種算子操作。大家都知道,高考考試在開考后15分鐘是不允許進場的,這個規(guī)定可以理解成一個水位線,比如,上午第一場語文考試,開考時間是9:30,允許在9:45之前進入考場,那么9:45這個時間可以理解成一個水位線。在開考之前,有的同學喜歡提前到考場,有的同學喜歡卡點到考場。假設有個同學叫考必勝,ta是卡著時間點到的考場,但是早上由于吃了不干凈的東西,突然感覺肚子不適,無奈之下在廁所里耽誤了16分鐘,那么按照規(guī)定,此時考必勝是不能夠進入考場的,因為此時已經(jīng)默認所有考生都已經(jīng)在考場了,此時考試也已經(jīng)觸發(fā),那么考必勝就可以理解為遲到的事件。以上就是對窗口、事件時間以及水位線的簡單理解,下面開始詳細解釋什么水位線。
基本概念
在上一節(jié)中,詳細講解了Flink提供的三種時間語義,在講解這三種時間語義的時候,提到了一個名詞---水位線,那么究竟什么是水位線呢?先來看一個例子,假如要每5分鐘統(tǒng)計一次過去1個小時內(nèi)的熱門商品的topN,這是一個典型的滑動窗口操作,那么基于事件時間的窗口該在什么時候出發(fā)計算呢?換句話說,我們要等多久才能夠確定已經(jīng)接收到了特定時間點之前的所有事件,另一方面,由于網(wǎng)絡延遲等原因,會產(chǎn)生亂序的數(shù)據(jù),在進行窗口操作時,不能夠無限期的等待下去,需要一個機制來告訴窗口在某個特定時間來觸發(fā)window計算,即認為小于等于該時間點的數(shù)據(jù)都已經(jīng)到來了。這個機制就是watermark(水位線),可以用來處理亂序事件。
水位線是一個全局的進度指標,表示可以確定不會再有延遲的事件到來的某個時間點。從本質(zhì)上講,水位線提供了一個邏輯時鐘,用來通知系統(tǒng)當前的事件時間。比如,當一個算子接收到了W(T)時刻的水位線,就可以大膽的認為不會再接收到任何時間戳小于或等于W(T)的事件了。水位線對于基于事件時間的窗口和處理亂序數(shù)據(jù)是非常關鍵的,算子一旦接收到了某個水位線,就相當于接到一支穿云箭的信號:所有特定時間區(qū)間的數(shù)據(jù)都已集結(jié)完畢,可以進行窗口觸發(fā)計算。
既然已經(jīng)說了,事件是會存在亂序的,那這個亂序的程度究竟有多大呢,這個就不太好確定了,總之總會有些遲到的事件慢慢悠悠的到來。所以,水位線其實是一種在準確性與延遲之間的權(quán)衡,如果水位線設置的非常苛刻,即不允許有掉隊的數(shù)據(jù)出現(xiàn),雖然準確性提高了,但這在無形之中增加了數(shù)據(jù)處理的延遲。反之,如果水位線設置的非常激進,即允許有遲到的數(shù)據(jù)發(fā)生,那么雖然降低了數(shù)據(jù)處理的延遲,但數(shù)據(jù)的準確性會較低。
所以,水位線是中庸之道,過猶不及。在很多現(xiàn)實應用中,系統(tǒng)無法獲取足夠多的信息來確定完美的水位線,那么該怎么辦呢?Flink提供了某些機制來處理那些可能晚于水位線的遲到時間,用戶可以根據(jù)應用的需求不同,可以將這些漏網(wǎng)之魚(遲到的數(shù)據(jù))舍棄掉,或者寫入日志,或者利用他們修正之前的結(jié)果。
上面說到?jīng)]有完美的水位線,可能還是很抽象。接下來,我們再看一幅圖,從圖中可以很直觀地觀察真實的水位線與理想中的完美水位線之間的關系,如下圖:
上圖的淺灰色直虛線表示理想的水位線,深灰色的彎曲虛線表示現(xiàn)實中的水位線,黑色直線表示兩者之間的偏差。在理想狀態(tài)下,這種偏差為0,因為總是在時間發(fā)生時就會立即處理,即事件的真實時間與處理事件的時間是一致的。比如,12:01產(chǎn)生的事件剛好在12:01時被處理,12:02產(chǎn)生的事件剛好在12:02時被處理。但是現(xiàn)實總會有遲到的數(shù)據(jù)產(chǎn)生,比如網(wǎng)絡延遲的原因,所以真實的情況會像深灰色的彎曲虛線表示的那樣,即12:01產(chǎn)生的數(shù)據(jù)可能會在12:01之后被處理,12:02產(chǎn)生的數(shù)據(jù)在12:02時被處理,12:03時產(chǎn)生的數(shù)據(jù)會被在12:03之后處理。這種動態(tài)的偏差在分布式處理系統(tǒng)中是非常常見的。
水位線圖解
在上一小節(jié),通過語言描述對水位線的概念進行了詳細解讀,在本小節(jié)會通過圖解的方式解析水位線的含義,這樣更能加深對水位線的理解。如下圖所示:
如上圖,矩形表示一條記錄,三角表示該條記錄的時間戳(真實發(fā)生時間),圓圈表示水位線。可以看到上面的數(shù)據(jù)是亂序的,比如當算子接收到為2的水位線時,就可以認為時間戳小于等于2的數(shù)據(jù)都已經(jīng)到來了,此時可以觸發(fā)計算。同理,接收到為5的水位線時,就可以認為時間戳小于或等于5的數(shù)據(jù)都已經(jīng)到來了,此時可以觸發(fā)計算。
可以看出水位線是單調(diào)遞增的,并且和記錄的時間戳存在聯(lián)系,一個時間戳為T的水位線表示接下來所有記錄的時間戳一定都會大于T。
水位線的傳播
現(xiàn)在,或許你已經(jīng)對水位線是什么有了一個初步的認識,接下來將會介紹水位線是怎么在Flink內(nèi)部傳播的。關于水位線的傳播策略可以歸納為3點:
首先,水位線是以廣播的形式在算子之間進行傳播
Long.MAX_VALUE表示事件時間的結(jié)束,即未來不會有數(shù)據(jù)到來了
單個分區(qū)的輸入取最大值,多個分區(qū)的輸入取最小值
關于Long.MAX_VALUE的解釋,先看一段代碼,如下:
?/**??*?當一個source關閉時,會輸出一個Long.MAX_VALUE的水位線,當一個算子接收到該水位線時,
?*?相當于接收到一個信號:未來不會再有數(shù)據(jù)輸入了
?*/
@PublicEvolving
public?final?class?Watermark?extends?StreamElement?{
????//表示事件時間的結(jié)束
????public?static?final?Watermark?MAX_WATERMARK?=?new?Watermark(Long.MAX_VALUE);
????//省略的代碼
}
關于另外兩條策略的解釋,可以從下圖中得到:
如上圖,一個任務會為它的每個分區(qū)都維護一個分區(qū)水位線(partition watermark),當收到每個分區(qū)傳來的水位線時,任務首先會讓當前分區(qū)水位線的值與接收的水位線值相比較,如果新接收的水位線值大于當前分區(qū)水位線值,則會將對應的分區(qū)水位線值更新為較大的水位線值(如上圖中的2步驟),接著,任務會把事件時鐘調(diào)整為當前分區(qū)水位線值的最小值,如上圖步驟2 ,由于當前分區(qū)水位線的最小值為3,所以將事件時間時鐘更新為3,然后將值為3的水位線廣播到下游任務。步驟3與步驟4的處理邏輯同上。
同時我們可以注意到這種設計其實有一個局限,具體體現(xiàn)在沒有對分區(qū)(partition)是否來自于不同的流進行區(qū)分,比如對于兩條流或多條流的Union或Connect操作,同樣是按照全部分區(qū)水位線中最小值來更新事件時間時鐘,這就導致所有的輸入記錄都會按照基于同一個事件時間時鐘來處理,這種一刀切的做法對于同一個流的不同分區(qū)而言是無可厚非的,但是對于多條流而言,強制使用一個時鐘進行同步會對整個集群帶來較大的性能開銷,比如當兩個流的水位線相差很大是,其中的一個流要等待最慢的那條流,而較快的流的記錄會在狀態(tài)中緩存,直到事件時間時鐘到達允許處理它們的那個時間點。
水位線的生成方式
通常情況下,在接收到數(shù)據(jù)源之后應該馬上為其生成水位線,即越靠近數(shù)據(jù)源越好。Flink提供兩種方式生成水位線,其中一種方式為在數(shù)據(jù)源完成的,即利用SourceFunction在應用讀入數(shù)據(jù)流的時候分配時間戳與水位線。另一種方式是通過實現(xiàn)接口的自定義函數(shù),該方式又包括兩種實現(xiàn)方式:一種為周期性生成水位線,即實現(xiàn)AssignerWithPeriodicWatermarks接口,另一種為定點生成水位線,即實AssignerWithPunctuatedWatermarks接口。具體如下圖所示:
數(shù)據(jù)源方式
該方式主要是實現(xiàn)自定義數(shù)據(jù)源,數(shù)據(jù)源分配時間戳和水位線主要是通過內(nèi)部的SourceContext對象實現(xiàn)的,先看一下SourceFunction的源碼,如下:
public?interface?SourceFunction<T>?extends?Function,?Serializable?{????void?cancel();
????interface?SourceContext<T>?{
????????void?collect(T?element);
????????/**
????????*?用于輸出記錄并附屬一個與之關聯(lián)的時間戳
????????*/
????????@PublicEvolving
????????void?collectWithTimestamp(T?element,?long?timestamp);
????????/**
????????*?用于輸出傳入的水位線
????????*/
????????@PublicEvolving
????????void?emitWatermark(Watermark?mark);
????????/**
????????*?將自身標記為空閑狀態(tài)
????????*?某個某個分區(qū)不在產(chǎn)生數(shù)據(jù),會阻礙全局水位線前進,
????????*?因為收不到新的記錄,意味著不會發(fā)出新的水位線,
????????*?根據(jù)水位線的傳播策略,會導致整個應用都停止工作
????????*?Flink提供一種機制,將數(shù)據(jù)源函數(shù)暫時標記為空閑,
????????*?在空閑狀態(tài)下,Flink水位線的傳播機制會忽略掉空閑的數(shù)據(jù)流分區(qū)
????????*/
????????@PublicEvolving
????????void?markAsTemporarilyIdle();
????????Object?getCheckpointLock();
????????void?close();
????}
}
從上面對的代碼可以看出,通過SourceContext對象的方法可以實現(xiàn)時間戳與水位線的分配。
自定義函數(shù)的方式
使用自定義函數(shù)的方式分配時間戳,只需要調(diào)用assignTimestampsAndWatermarks()方法,傳入一個實現(xiàn)AssignerWithPeriodicWatermarks或者AssignerWithPunctuatedWatermarks接口的分配器即可,如下代碼所示:
StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment()????????env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
????????SingleOutputStreamOperator?userBehavior?=?env
????????????????.addSource(new?MysqlSource())
????????????????.assignTimestampsAndWatermarks(new?MyTimestampsAndWatermarks());
周期分配器(AssignerWithPeriodicWatermarks)
該分配器是實現(xiàn)了一個AssignerWithPeriodicWatermarks的用戶自定義函數(shù),通過重寫extractTimestamp()方法來提取時間戳,提取出來的時間戳會附加在各自的記錄上,查詢得到的水位線會注入到數(shù)據(jù)流中。
周期性的生成水位線是指以固定的時間間隔來發(fā)出水位線并推進事件時間的前進,關于默認的時間間隔在上文中也有提到,根據(jù)選擇的時間語義確定默認的時間間隔,如果使用Processing Time或者Event Time,默認的水位線間隔時間是200毫秒,當然用戶也可以自己設定時間間隔,關于如何設定,先看一段代碼,代碼來自于ExecutionConfig類:
????/**????*?設置生成水位線的時間間隔
????*?注:自動生成watermarks的時間間隔不能是負數(shù)
?????*/
????@PublicEvolving
????public?ExecutionConfig?setAutoWatermarkInterval(long?interval)?{
????????Preconditions.checkArgument(interval?>=?0,?"Auto?watermark?interval?must?not?be?negative.");
????????this.autoWatermarkInterval?=?interval;
????????return?this;
????}
所以,如果要調(diào)整默認的200毫秒的間隔,可以調(diào)用setAutoWatermarkInterval()方法,具體使用如下:
??//每3秒生成一次水位線env.getConfig().setAutoWatermarkInterval(3000);
上面指定了每隔3秒生成一次水位線,即每隔3秒會自動向流里注入一個水位線,在代碼層面,Flink會每隔3秒鐘調(diào)用一次AssignerWithPeriodicWatermarks的getCurrentWatermark()方法,每次調(diào)用該方法時,如果得到的值不為空并且大于上一個水位線的時間戳,那么就會向流中注入一個新的水位線。這項檢查可以有效地保證了事件時間的遞增的特性,一旦檢查失敗也就不會生成水位線。下面給出一個實現(xiàn)周期分配水位線的例子:
public?class?MyTimestampsAndWatermarks?implements?AssignerWithPeriodicWatermarks<UserBehavior>?{????//?定義1分鐘的容忍間隔時間,即允許數(shù)據(jù)的最大亂序時間
????private?long?maxOutofOrderness?=?60?*?1000;
????//?觀察到的最大時間戳
????private?long?currentMaxTs?=?Long.MIN_VALUE;??????
????@Nullable
????@Override
????public?Watermark?getCurrentWatermark()?{
????????//?生成具有1分鐘容忍度的水位線
????????return?new?Watermark(currentMaxTs?-?maxOutofOrderness);
????}
????@Override
????public?long?extractTimestamp(UserBehavior?element,?long?previousElementTimestamp)?{
????????//獲取當前記錄的時間戳
????????long?currentTs?=?element.timestamp;
????????//?更新最大的時間戳
????????currentMaxTs?=?Math.max(currentMaxTs,?currentTs);
????????//?返回記錄的時間戳
????????return?currentTs;
????}
}
通過查看TimestampAssignerd 繼承關系可以發(fā)現(xiàn)(繼承關系如下圖),除此之外,Flink還提供了兩種內(nèi)置的水位線分配器,分別為:AscendingTimestampExtractor和BoundedOutOfOrdernessTimestampExtractor兩個抽象類。
關于AscendingTimestampExtractor,一般是在數(shù)據(jù)集的時間戳是單調(diào)遞增的且沒有亂序時使用,該方法使用當前的時間戳生成水位線,使用方式如下:
SingleOutputStreamOperator?userBehavior?=?env????????????????.addSource(new?MysqlSource())
????????????????.assignTimestampsAndWatermarks(new?AscendingTimestampExtractor()?{@Overridepublic?long?extractAscendingTimestamp(UserBehavior?element)?{return?element.timestamp*1000;
????????????????????}
????????????????});
關于BoundedOutOfOrdernessTimestampExtractor,是在數(shù)據(jù)集中存在亂序數(shù)據(jù)的情況下使用,即數(shù)據(jù)有延遲(任意新到來的元素與已經(jīng)到來的時間戳最大的元素之間的時間差),這種方式可以接收一個表示最大預期延遲參數(shù),具體如下:
SingleOutputStreamOperator?userBehavior?=?env????????????????.addSource(new?MysqlSource())
????????????????.assignTimestampsAndWatermarks(new?BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10))?{@Overridepublic?long?extractTimestamp(UserBehavior?element)?{return?element.timestamp*1000;
????????????????????}
????????????????}?);
上述的代碼接收了一個10秒鐘延遲的參數(shù),這10秒鐘意味著如果當前元素的事件時間與到達的元素的最大時間戳的差值在10秒之內(nèi),那么該元素會被處理,如果差值超過10秒,表示其本應該參與的計算,已經(jīng)完成了,Flink稱之為遲到的數(shù)據(jù),Flink提供了不同的策略來處理這些遲到的數(shù)據(jù)。
定點水位線分配器(AssignerWithPunctuatedWatermarks)
該方式是基于某些事件(指示系統(tǒng)進度的特殊元祖或標記)觸發(fā)水位線的生成與發(fā)送,基于特定的事件向流中注入一個水位線,流中的每一個元素都有機會判斷是否生成一個水位線,如果得到的水位線不為空并且大于之前的水位線,就生成水位線并注入流中。
實現(xiàn)AssignerWithPunctuatedWatermarks接口,重寫checkAndGetNextWatermark()方法,該方法會在針對每個事件的extractTimestamp()方法后立即調(diào)用,以此來決定是否生成一個新的水位線,如果該方法返回一個非空并且大于之前值的水位線,就會將這個新的水位線發(fā)出。
下面將會實現(xiàn)一個簡單的定點水位線分配器
public?class?MyPunctuatedAssigner?implements?AssignerWithPunctuatedWatermarks<UserBehavior>?{????//?定義1分鐘的容忍間隔時間,即允許數(shù)據(jù)的最大亂序時間
????private?long?maxOutofOrderness?=?60?*?1000;??????
????@Nullable
????@Override
????public?Watermark?checkAndGetNextWatermark(UserBehavior?element,?long?extractedTimestamp)?{
????????//?如果讀取數(shù)據(jù)的用戶行為是購買,就生成水位線
????????if(element.action.equals("buy")){
???????????return?new?Watermark(extractedTimestamp?-?maxOutofOrderness);
????????}else{
????????????//?不發(fā)出水位線
????????????return?null;??
????????}??
????}
????@Override
????public?long?extractTimestamp(UserBehavior?element,?long?previousElementTimestamp)?{
????????return?element.timestamp;
????}
}
遲到的數(shù)據(jù)
上文已經(jīng)說過,現(xiàn)實中很難生成一個完美的水位線,水位線就是在延遲與準確性之前做的一種權(quán)衡。那么,如果生成的水位線過于緊迫,即水位線可能會大于后來數(shù)據(jù)的時間戳,這就意味著數(shù)據(jù)有延遲,關于延遲數(shù)據(jù)的處理,Flink提供了一些機制,具體如下:
直接將遲到的數(shù)據(jù)丟棄
將遲到的數(shù)據(jù)輸出到單獨的數(shù)據(jù)流中,即使用sideOutputLateData(new OutputTag<>())實現(xiàn)側(cè)輸出
根據(jù)遲到的事件更新并發(fā)出結(jié)果
由于篇幅限制,關于遲到數(shù)據(jù)的具體處理在本文先不做太多的討論,在后續(xù)的文章中會對其詳細進行說明。
總結(jié)
本文從Flink的時間語義開始說起,詳細介紹了三種時間語義的概念、特點及使用方式,接著對Flink處理亂序數(shù)據(jù)的一種機制---水位線進行詳細說明,主要描述了水位線的基本概念,傳播方式、生成方式,并對其中的細節(jié)部分進行了圖解,可以加深對水位線的理解。最后,簡單說明了一下Flink對于遲到數(shù)據(jù)的處理方式。
總結(jié)
以上是生活随笔為你收集整理的flink 三种时间机制_Flink的时间与watermarks详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 脉冲宽度调制pdm_NHWYM脉冲硬质氧
- 下一篇: 腐蚀rust研究台抽奖_超级石化推荐:中