Flink面试题
一 基礎篇
Flink的執行圖有哪幾種?分別有什么作用
Flink中的執行圖一般是可以分為四類,按照生成順序分別為:StreamGraph-> JobGraph-> ExecutionGraph->物理執行圖。
1)StreamGraph
顧名思義,這里代表的是我們編寫的流程序圖。通過Stream API生成,這是執行圖的最原始拓撲數據結構。
2)JobGraph
StreamGraph在Client中經過算子chain鏈合并等優化,轉換為JobGraph拓撲圖,隨后被提交到JobManager中。
3)ExecutionGraph
JobManager中將JobGraph進一步轉換為ExecutionGraph,此時ExecutuonGraph根據算子配置的并行度轉變為并行化的Graph拓撲結構。
4)物理執行圖
比較偏物理執行概念,即JobManager進行Job調度,TaskManager最終部署Task的圖結構。
Flink的窗口機制
在流處理應用中,數據是連續不斷的,因此我們不可能等到所有數據都到了才開始處理。當然我們可以每來一個消息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內有多少用戶點擊了我們的網頁。在這種情況下,我們必須定義一個窗口,用來收集最近一分鐘內的數據,并對這個窗口內的數據進行計算。
窗口可以是時間驅動的(Time Window,例如:每30秒鐘),也可以是數據驅動的(Count Window,例如:每一百個元素)。一種經典的窗口分類可以分成:翻滾窗口(Tumbling Window,無重疊),滾動窗口(Sliding Window,有重疊),和會話窗口(Session Window,活動間隙)。
我們舉個具體的場景來形象地理解不同窗口的概念。假設,淘寶網會記錄每個用戶每次購買的商品個數,我們要做的是統計不同窗口中用戶購買商品的總數。下圖給出了幾種經典的窗口切分概述圖:
上圖中,raw data stream 代表用戶的購買行為流,圈中的數字代表該用戶本次購買的商品個數,事件是按時間分布的,所以可以看出事件之間是有time gap的。Flink 提供了上圖中所有的窗口類型,下面我們會逐一進行介紹。
Time Window
就如名字所說的,Time Window 是根據時間對數據流進行分組的。這里我們涉及到了流處理中的時間問題,時間問題和消息亂序問題是緊密關聯的,這是流處理中現存的難題之一,我們將在后續的 EventTime 和消息亂序處理 中對這部分問題進行深入探討。這里我們只需要知道 Flink 提出了三種時間的概念,分別是event time(事件時間:事件發生時的時間),ingestion time(攝取時間:事件進入流處理系統的時間),processing time(處理時間:消息被計算處理的時間)。Flink 中窗口機制和時間類型是完全解耦的,也就是說當需要改變時間類型時不需要更改窗口邏輯相關的代碼。
Tumbling Time Window
如上圖,我們需要統計每一分鐘中用戶購買的商品的總數,需要將用戶的行為事件按每一分鐘進行切分,這種切分被成為翻滾時間窗口(Tumbling Time Window)。翻滾窗口能將數據流切分成不重疊的窗口,每一個事件只能屬于一個窗口。通過使用 DataStream API,我們可以這樣實現:
Sliding Time Window
但是對于某些應用,它們需要的窗口是不間斷的,需要平滑地進行窗口聚合。比如,我們可以每30秒計算一次最近一分鐘用戶購買的商品總數。這種窗口我們稱為滑動時間窗口(Sliding Time Window)。在滑窗中,一個元素可以對應多個窗口。通過使用 DataStream API,我們可以這樣實現:
Count Window
Count Window 是根據元素個數對數據流進行分組的。
Tumbling Count Window
當我們想要每100個用戶購買行為事件統計購買總數,那么每當窗口中填滿100個元素了,就會對窗口進行計算,這種窗口我們稱之為翻滾計數窗口(Tumbling Count Window),上圖所示窗口大小為3個。通過使用 DataStream API,我們可以這樣實現:
Sliding Count Window
當然Count Window 也支持 Sliding Window,雖在上圖中未描述出來,但和Sliding Time Window含義是類似的,例如計算每10個元素計算一次最近100個元素的總和,代碼示例如下。
Session Window
在這種用戶交互事件流中,我們首先想到的是將事件聚合到會話窗口中(一段用戶持續活躍的周期),由非活躍的間隙分隔開。如上圖所示,就是需要計算每個用戶在活躍期間總共購買的商品數量,如果用戶30秒沒有活動則視為會話斷開(假設raw data stream是單個用戶的購買行為流)。Session Window 的示例代碼如下:
// Stream of (userId, buyCnts)val buyCnts: DataStream[(Int, Int)] = ...val sessionCnts: DataStream[(Int, Int)] = vehicleCnts.keyBy(0)// session window based on a 30 seconds session gap interval .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).sum(1)一般而言,window 是在無限的流上定義了一個有限的元素集合。這個集合可以是基于時間的,元素個數的,時間和個數結合的,會話間隙的,或者是自定義的。Flink 的 DataStream API 提供了簡潔的算子來滿足常用的窗口操作,同時提供了通用的窗口機制來允許用戶自己定義窗口分配邏輯。下面我們會對 Flink 窗口相關的 API 進行剖析。
Flink中的時間概念
Flink在流處理程序支持不同的時間概念。分別為Event Time/Processing Time/Ingestion Time,也就是事件時間、處理時間、提取時間。
從時間序列角度來說,發生的先后順序是:
事件時間(Event Time)----> 提取時間(Ingestion Time)----> 處理時間(Processing Time)復制
Event Time 是事件在現實世界中發生的時間,它通常由事件中的時間戳描述。
Ingestion Time 是數據進入Apache Flink流處理系統的時間,也就是Flink讀取數據源時間。
Processing Time 是數據流入到具體某個算子 (消息被計算處理) 時候相應的系統時間。也就是Flink程序處理該事件時當前系統時間。
但是我們講解時,會從后往前講解,把最重要的Event Time放在最后。
處理時間
是數據流入到具體某個算子時候相應的系統時間。
這個系統時間指的是執行相應操作的機器的系統時間。當一個流程序通過處理時間來運行時,所有基于時間的操作(如: 時間窗口)將使用各自操作所在的物理機的系統時間。
ProcessingTime 有最好的性能和最低的延遲。但在分布式計算環境或者異步環境中,ProcessingTime具有不確定性,相同數據流多次運行有可能產生不同的計算結果。因為它容易受到從記錄到達系統的速度(例如從消息隊列)到記錄在系統內的operator之間流動的速度的影響(停電,調度或其他)。
提取時間
IngestionTime是數據進入Apache Flink框架的時間,是在Source Operator中設置的。每個記錄將源的當前時間作為時間戳,并且后續基于時間的操作(如時間窗口)引用該時間戳。
提取時間在概念上位于事件時間和處理時間之間。與處理時間相比,它稍早一些。IngestionTime與ProcessingTime相比可以提供更可預測的結果,因為IngestionTime的時間戳比較穩定(在源處只記錄一次),所以同一數據在流經不同窗口操作時將使用相同的時間戳,而對于ProcessingTime同一數據在流經不同窗口算子會有不同的處理時間戳。
與事件時間相比,提取時間程序無法處理任何無序事件或后期數據,但程序不必指定如何生成水位線。
在內部,提取時間與事件時間非常相似,但具有自動時間戳分配和自動水位線生成功能。
事件時間
事件時間就是事件在真實世界的發生時間,即每個事件在產生它的設備上發生的時間(當地時間)。比如一個點擊事件的時間發生時間,是用戶點擊操作所在的手機或電腦的時間。
在進入Apache Flink框架之前EventTime通常要嵌入到記錄中,并且EventTime也可以從記錄中提取出來。在實際的網上購物訂單等業務場景中,大多會使用EventTime來進行數據計算。
Flink的watermark
Watermark是Apache Flink為了處理EventTime 窗口計算提出的一種機制,本質上也是一種時間戳。watermark是用于處理亂序事件或延遲數據的,這通常用watermark機制結合window來實現(Watermarks用來觸發window窗口計算)。
比如對于late element,我們不能無限期的等下去,必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了。這個特別的機制,就是watermark。 可以把Watermark看作是一種告訴Flink一個消息延遲多少的方式。定義了什么時候不再等待更早的數據。
1. 窗口觸發條件
上面談到了對數據亂序問題的處理機制是watermark+window,那么window什么時候該被觸發呢?
基于Event Time的事件處理,Flink默認的事件觸發條件為:
對于out-of-order及正常的數據而言
watermark的時間戳 > = window endTime
在 [window_start_time,window_end_time] 中有數據存在。
對于late element太多的數據而言
Event Time > watermark的時間戳
WaterMark相當于一個EndLine,一旦Watermarks大于了某個window的end_time,就意味著windows_end_time時間和WaterMark時間相同的窗口開始計算執行了。
就是說,我們根據一定規則,計算出Watermarks,并且設置一些延遲,給遲到的數據一些機會,也就是說正常來講,對于遲到的數據,我只等你一段時間,再不來就沒有機會了。
WaterMark時間可以用Flink系統現實時間,也可以用處理數據所攜帶的Event time。
使用Flink系統現實時間,在并行和多線程中需要注意的問題較少,因為都是以現實時間為標準。
如果使用處理數據所攜帶的Event time作為WaterMark時間,需要注意兩點:
因為數據到達并不是循序的,注意保存一個當前最大時間戳作為WaterMark時間
并行同步問題
2. WaterMark設定方法
標點水位線(Punctuated Watermark)
標點水位線(Punctuated Watermark)通過數據流中某些特殊標記事件來觸發新水位線的生成。這種方式下窗口的觸發與時間無關,而是決定于何時收到標記事件。
在實際的生產中Punctuated方式在TPS很高的場景下會產生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成。
定期水位線(Periodic Watermark)
周期性的(允許一定時間間隔或者達到一定的記錄條數)產生一個Watermark。水位線提升的時間間隔是由用戶設置的,在兩次水位線提升時隔內會有一部分消息流入,用戶可以根據這部分數據來計算出新的水位線。
在實際的生產中Periodic的方式必須結合時間和積累條數兩個維度繼續周期性產生Watermark,否則在極端情況下會有很大的延時。
舉個例子,最簡單的水位線算法就是取目前為止最大的事件時間,然而這種方式比較暴力,對亂序事件的容忍程度比較低,容易出現大量遲到事件。
3. 遲到事件
雖說水位線表明著早于它的事件不應該再出現,但是上如上文所講,接收到水位線以前的的消息是不可避免的,這就是所謂的遲到事件。實際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預計,導致窗口在它們到達之前已經關閉。
遲到事件出現時窗口已經關閉并產出了計算結果,因此處理的方法有3種:
重新激活已經關閉的窗口并重新計算以修正結果。
將遲到事件收集起來另外處理。
將遲到事件視為錯誤消息并丟棄。
Flink 默認的處理方式是第3種直接丟棄,其他兩種方式分別使用Side Output和Allowed Lateness。
Side Output機制可以將遲到事件單獨放入一個數據流分支,這會作為 window 計算結果的副產品,以便用戶獲取并對其進行特殊處理。
Allowed Lateness機制允許用戶設置一個允許的最大遲到時長。Flink 會在窗口關閉后一直保存窗口的狀態直至超過允許遲到時長,這期間的遲到事件不會被丟棄,而是默認會觸發窗口重新計算。因為保存窗口狀態需要額外內存,并且如果窗口計算使用了 ProcessWindowFunction API 還可能使得每個遲到事件觸發一次窗口的全量計算,代價比較大,所以允許遲到時長不宜設得太長,遲到事件也不宜過多,否則應該考慮降低水位線提高的速度或者調整算法。
這里總結機制為:
窗口window 的作用是為了周期性的獲取數據。
watermark的作用是防止數據出現亂序(經常),事件時間內獲取不到指定的全部數據,而做的一種保險方法。
allowLateNess是將窗口關閉時間再延遲一段時間。
sideOutPut是最后兜底操作,所有過期延遲數據,指定窗口已經徹底關閉了,就會把數據放到側輸出流。
4.例子
假如我們設置10s的時間窗口(window),那么0~10s,10~20s都是一個窗口,以0~10s為例,0為start-time,10為end-time。假如有4個數據的event-time分別是8(A),12.5(B),9(C),13.5(D),我們設置Watermarks為當前所有到達數據event-time的最大值減去延遲值3.5秒
當A到達的時候,Watermarks為max{8}-3.5=8-3.5 = 4.5 < 10,不會觸發計算
當B到達的時候,Watermarks為max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不會觸發計算
當C到達的時候,Watermarks為max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不會觸發計算
當D到達的時候,Watermarks為max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,觸發計算
觸發計算的時候,會將A,C(因為他們都小于10)都計算進去,其中C是遲到的。 max這個很關鍵,就是當前窗口內,所有事件的最大事件。 這里的延遲3.5s是我們假設一個數據到達的時候,比他早3.5s的數據肯定也都到達了,這個是需要根據經驗推算。假設加入D到達以后有到達了一個E,event-time=6,但是由于0~10的時間窗口已經開始計算了,所以E就丟了。 從這里上面E的丟失說明,水位線也不是萬能的,但是如果根據我們自己的生產經驗+側道輸出等方案,可以做到數據不丟失。
Flink分布式快照原理是什么
可靠性是分布式系統實現必須考慮的因素之一。Flink基于Chandy-Lamport分布式快照算法實現了一套可靠的Checkpoint機制,可以保證集群中某些節點出現故障時,能夠將整個作業恢復到故障之前某個狀態。同時,Checkpoint機制也是Flink實現Exactly-Once語義的基礎。
本文將介紹Flink的Checkpoint機制的原理,并從源碼層面了解Checkpoint機制是如何實現的(基于Flink 1.10)。
1. 為什么需要Checkpoint
Flink是有狀態的流計算處理引擎,每個算子Operator可能都需要記錄自己的運行數據,并在接收到新流入的元素后不斷更新自己的狀態數據。當分布式系統引入狀態計算后,為了保證計算結果的正確性(特別是對于流處理系統,不可能每次系統故障后都從頭開始計算),就必然要求系統具有容錯性。對于Flink來說,Flink作業運行在多個節點上,當出現節點宕機、網絡故障等問題,需要一個機制保證節點保存在本地的狀態不丟失。流處理中Exactly-Once語義的實現也要求作業從失敗恢復后的狀態要和失敗前的狀態一致。
那么怎么保證分布式環境下各節點狀態的容錯呢?通常這是通過定期對作業狀態和數據流進行快照實現的,常見的檢查點算法有比如Sync-and-Stop(SNS)算法、Chandy-Lamport(CL)算法。
Flink的Checkpoint機制是基于Chandy-Lamport算法的思想改進而來,引入了Checkpoint Barrier的概念,可以在不停止整個流處理系統的前提下,讓每個節點獨立建立檢查點保存自身快照,并最終達到整個作業全局快照的狀態。有了全局快照,當我們遇到故障或者重啟的時候就可以直接從快照中恢復,這就是Flink容錯的核心。
2. Checkpoint執行流程
Barrier是Flink分布式快照的核心概念之一,稱之為屏障或者數據柵欄(可以理解為快照的分界線)。Barrier是一種特殊的內部消息,在進行Checkpoint的時候Flink會在數據流源頭處周期性地注入Barrier,這些Barrier會作為數據流的一部分,一起流向下游節點并且不影響正常的數據流。Barrier的作用是將無界數據流從時間上切分成多個窗口,每個窗口對應一系列連續的快照中的一個,每個Barrier都帶有一個快照ID,一個Barrier生成之后,在這之前的數據都進入此快照,在這之后的數據則進入下一個快照。
如上圖,Barrier-n跟隨著數據流一起流動,當算子從輸入流接收到Barrier-n后,就會停止接收數據并對當前自身的狀態做一次快照,快照完成后再將Barrier-n以廣播的形式傳給下游節點。一旦作業的Sink算子接收到Barrier n后,會向JobMnager發送一個消息,確認Barrier-n對應的快照完成。當作業中的所有Sink算子都確認后,意味一次全局快照也就完成。
當一個算子有多個上游節點時,會接收到多個Barrier,這時候需要進行Barrier Align對齊操作。
如上圖,一個算子有兩個輸入流,當算子從一個上游數據流接收到一個Barrier-n后,它不會立即向下游廣播,而是先暫停對該數據流的處理,將到達的數據先緩存在Input Buffer中(因為這些數據屬于下一次快照而不是當前快照,緩存數據可以不阻塞該數據流),直到從另外一個數據流中接收到Barrier-n,才會進行快照處理并將Barrier-n向下游發送。從這個流程可以看出,如果開啟Barrier對齊后,算子由于需要等待所有輸入節點的Barrier到來出現暫停,對整體的性能也會有一定的影響。
綜上,Flink Checkpoint機制的核心思想實質上是通過Barrier來標記觸發快照的時間點和對應需要進行快照的數據集,將數據流處理和快照操作解耦開來,從而最大程度降低快照對系統性能的影響。
Flink的一致性和Checkpoint機制有緊密的關系:
當不開啟Checkpoint時,節點發生故障時可能會導致數據丟失,這就是At-Most-Once
當開啟Checkpoint但不進行Barrier對齊時,對于有多個輸入流的節點如果發生故障,會導致有一部分數據可能會被處理多次,這就是At-Least-Once
當開啟Checkpoint并進行Barrier對齊時,可以保證每條數據在故障恢復時只會被重放一次,這就是Exactly-Once
3. Checkpoint相關配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);默認情況下,Checkpoint機制是關閉的,需要通過enableCheckpointing(interval)來開啟,并指定每interval毫秒進行一次Checkpoint。
Checkpoint模式支持Exactly-Once和At-Least-Once,可以通過setCheckpointingMode來設置。
如果兩次Checkpoint的時間很短,會導致整個系統大部分資源都用于執行Checkpoint,影響正常作業的執行??梢酝ㄟ^setMinPauseBetweenCheckpoints來設置兩次Checkpoint之間的最小間隔。
setCheckpointTimeout可以給Checkpoint設置一個超時時間,當一次Checkpoint超過一定時間沒有完成,直接終止掉。
默認情況下,當一個Checkpoint還在執行時,不會觸發另一個Checkpoint,通過setMaxConcurrentCheckpoints可以設置最大并發Checkpoint數量。
enableExternalizedCheckpoints可以設置當用戶取消了作業后,是否保留遠程存儲上的Checkpoint數據,一般設置為RETAIN_ON_CANCELLATION。
保存多個Checkpoint
默認情況下,如果設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,可以從最近的這個Checkpoint來進行恢復。但是,如果我們希望保留多個Checkpoint,并能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,我們發現最近4個小時數據記錄處理有問題,希望將整個狀態還原到4小時之前。
Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數:
state.checkpoints.num-retained: 20
保留了最近的20個Checkpoint。如果希望會退到某個Checkpoint點,只需要指定對應的某個Checkpoint路徑即可實現。
從Checkpoint進行恢復
從指定的checkpoint處啟動,最近的一個/flink/checkpoints/workFlowCheckpoint/339439e2a3d89ead4d71ae3816615281/chk-1740584啟動,通常需要先停掉當前運行的flink-session,然后通過命令啟動:
../bin/flink run -p 10 -s /flink/checkpoints/workFlowCheckpoint/339439e2a3d89ead4d71ae3816615281/chk-1740584/_metadata -c com.code2144.helper_wink-1.0-SNAPSHOT.jar可以把命令放到腳本里面,每次直接執行checkpoint恢復腳本即可:
保存點機制 (Savepoints)
保存點機制 (Savepoints)是檢查點機制的一種特殊的實現,它允許通過手工的方式來觸發 Checkpoint,并將結果持久化存儲到指定路徑中,主要用于避免 Flink 集群在重啟或升級時導致狀態丟失。示例如下:
# 觸發指定id的作業的Savepoint,并將結果存儲到指定目錄下
bin/flink savepoint :jobId [:targetDirectory]手動savepoint
/app/local/flink-1.6.2/bin/flink savepoint 0409251eaff826ef2dd775b6a2d5e219 [hdfs://bigdata/path]成功觸發savepoint通常會提示:Savepoint completed. Path: hdfs://path...:
手動取消任務
與checkpoint異常停止或者手動Kill掉不一樣,對于savepoint通常是我們想要手動停止任務,然后更新代碼,可以使用flink cancel ...命令:
/app/local/flink-1.6.2/bin/flink cancel 0409251eaff826ef2dd775b6a2d5e219從指定savepoint啟動job
bin/flink run -p 8 -s hdfs:///flink/savepoints/savepoint-567452-9e3587e55980 -c com.code2144.helper_workflow.HelperWorkFlowStreaming jars/BSS-ONSS-Flink-1.0-SNAPSHOT.jarFlink的內存管理是如何做的
在介紹內存管理之前,先介紹一下JVM中的堆內存和堆外內存。
通常來說。JVM堆空間概念,簡單描述就是在程序中,關于對象實例|數組的創建、使用和釋放的內存,都會在JVM中的一塊被稱作為"JVM堆"內存區域內進行管理分配。
Flink程序在創建對象后,JVM會在堆內內存中分配一定大小的空間,創建Class對象并返回對象引用,Flink保存對象引用,同時記錄占用的內存信息。而堆外內存如果你有過Java相關編程經歷的話,相信對堆外內存的使用并不陌生。其底層調用基于C的JDK Unsafe類方法,通過指針直接進行內存的操作,包括內存空間的申請、使用、刪除釋放等。
介紹完了堆內內存和堆外內存的概念,下面我們來看下Flink的內存管理。
1)JobManager內存管理
JobManager進程總內存包括JVM堆內內存、JVM堆外內存以及JVM MetaData內存,其中涉及的內存配置參數為:
# JobManager總進程內存 jobmanager.memory.process.size:# 作業管理器的 JVM 堆內存大小 jobmanager.memory.heap.size:#作業管理器的堆外內存大小。此選項涵蓋所有堆外內存使用。 jobmanager.memory.off-heap.size: 復制代碼2)TaskManager內存管理
TaskManager內存同樣包含JVM堆內內存、JVM堆外內存以及JVM MetaData內存三大塊。其中JVM堆內內存又包含Framework Heap和Task Heap,即框架堆內存和任務Task堆內存。
JVM堆外內存包含Memory memory托管內存,主要用于保存排序、結果緩存、狀態后端數據等。另一塊為Direct Memory直接內存,包含如下:
Framework Off-Heap Memory:Flink框架的堆外內存,即Flink中TaskManager的自身內存,和slot無關。
Task Off-Heap:Task的堆外內存
Network Memory:網絡內存
其中涉及的內存配置參數為:
// tm的框架堆內內存 taskmanager.memory.framework.heap.size=// tm的任務堆內內存 taskmanager.memory.task.heap.size// Flink管理的原生托管內存 taskmanager.memory.managed.size= taskmanager.memory.managed.fraction=// Flink 框架堆外內存 taskmanager.memory.framework.off-heap.size=// Task 堆外內存 taskmanager.memory.task.off-heap.size=// 網絡數據交換所使用的堆外內存大小 taskmanager.memory.network.min: 64mb taskmanager.memory.network.max: 1gb taskmanager.memory.network.fraction: 0.1復制代碼Flink/Spark/Hive SQL的執行原理
這里我把三個組件SQL執行原理放到了一起,通過對比加深一下印象。
1)Hive SQL的執行原理
Hive SQL是Hive提供的SQL查詢引擎,底層由MapReduce實現。Hive根據輸入的SQL語句執行詞法分析、語法樹構建、編譯、邏輯計劃、優化邏輯計劃以及物理計劃等過程,轉化為Map Task和Reduce Task最終交由Mapreduce引擎執行。
執行引擎。具有mapreduce的一切特性,適合大批量數據離線處理,相較于Spark而言,速度較慢且IO操作頻繁
有完整的hql語法,支持基本sql語法、函數和udf
對表數據存儲格式有要求,不同存儲、壓縮格式性能不同
2)Spark SQL的執行原理
Spark SQL底層基于Spark引擎,使用Antlr解析語法,編譯生成邏輯計劃和物理計劃,過程和Hive SQL執行過程類似,只不過Spark SQL產生的物理計劃為Spark程序。
輸入編寫的Spark SQL
SqlParser分析器。進行語法檢查、詞義分析,生成未綁定的Logical Plan邏輯計劃(未綁定查詢數據的元數據信息,比如查詢什么文件,查詢那些列等)
Analyzer解析器。查詢元數據信息并綁定,生成完整的邏輯計劃。此時可以知道具體的數據位置和對象,Logical Plan 形如from table -> filter column -> select 形式的樹結構
Optimizer優化器。選擇最好的一個Logical Plan,并優化其中的不合理的地方。常見的例如謂詞下推、剪枝、合并等優化操作
Planner使用Planing Strategies將邏輯計劃轉化為物理計劃,并根據最佳策略選擇出的物理計劃作為最終的執行計劃
調用Spark Plan Execution執行引擎執行Spark RDD任務
3)Flink SQL的執行原理
一條SQL從提交到Calcite解析,優化,到最后的Flink執行,一般分以下過程:
1. Sql Parser: 將sql語句通過java cc解析成AST(語法樹),在calcite中用SqlNode表示AST;
2. Sql Validator: 結合數字字典(catalog)去驗證sql語法;
3. 生成Logical Plan: 將sqlNode表示的AST轉換成LogicalPlan, 用relNode表示;
4. 生成 optimized LogicalPlan: 先基于calcite rules 去優化logical Plan,基于flink定制的一些優化rules去優化logical Plan;
5. 生成Flink PhysicalPlan: 這里也是基于flink里頭的rules將,將optimized LogicalPlan轉成成Flink的物理執行計劃;
6. 將物理執行計劃轉成Flink ExecutionPlan: 就是調用相應的tanslateToPlan方法轉換和利用CodeGen元編程成Flink的各種算子。
Table API 來提交任務的話,基本流程和運行SQL類似,稍微不同的是:table api parser: flink會把table api表達的計算邏輯也表示成一顆樹,用treeNode去表式; 在這棵樹上的每個節點的計算邏輯用Expression來表示。
簡單說一下SQL優化:RBO(基于規則)
RBO主要是開發人員在使用SQL的過程中,有些發現有些通用的規則,可以顯著提高SQL執行的效率,比如最經典的filter下推:
將Filter下推到Join之前執行,這樣做的好處是減少了Join的數量,同時降低了CPU,內存,網絡等方面的開銷,提高效率。
SQL優化的發展,則可以分為兩個階段,即RBO(基于規則),和CBO(基于代價)
RBO和CBO的區別大概在于: RBO只為應用提供的rule,而CBO會根據給出的Cost信息,智能應用rule,求出一個Cost最低的執行計劃。需要糾正很多人誤區的一點是,CBO其實也是基于rule的,接觸到RBO和CBO這兩個概念的時候,很容易將他們對立起來。但實際上CBO,可以理解為就是加上Cost的RBO。
Flink SQL 引擎的工作流總結如圖所示。
從圖中可以看出,一段查詢 SQL / 使用TableAPI 編寫的程序(以下簡稱 TableAPI 代碼)從輸入到編譯為可執行的 JobGraph 主要經歷如下幾個階段
將 SQL文本 / TableAPI 代碼轉化為邏輯執行計劃(Logical Plan)
Logical Plan 通過優化器優化為物理執行計劃(Physical Plan)
通過代碼生成技術生成 Transformations 后進一步編譯為可執行的 JobGraph 提交運行
Flink的背壓,怎么解決
Flink背壓是生產應用中常見的情況,當程序存在數據傾斜、內存不足狀況經常會發生背壓,我將從如下幾個方面去分析。
1)Flink背壓表現
1)運行開始時正常,后面出現大量Task任務等待
2)少量Task任務開始報checkpoint超時問題
3)大量Kafka數據堆積,無法消費
4)Flink UI的BackPressure頁面出現紅色High標識
2) 反壓一般有哪些情況
一般可以細分兩種情況:
當前Task任務處理速度慢,比如task任務中調用算法處理等復雜邏輯,導致上游申請不到足夠內存。
下游Task任務處理速度慢,比如多次collect()輸出到下游,導致當前節點無法申請足夠的內存。
3) 頻繁反壓的影響是什么
頻繁反壓會導致流處理作業數據延遲增加,同時還會影響到Checkpoint。
Checkpoint時需要進行Barrier對齊,此時若某個Task出現反壓,Barrier流動速度會下降,導致Checkpoint變慢甚至超時,任務整體也變慢。
長期或頻繁出現反壓才需要處理,如果由于網絡波動或者GC出現的偶爾反壓可以不必處理。4)Flink的反壓機制
背壓時一般下游速度慢于上游速度,數據久積成疾,需要做限流。但是無法提前預估下游實際速度,且存在網絡波動情況。
需要保持上下游動態反饋,如果下游速度慢,則上游限速;否則上游提速。實現動態自動反壓的效果。
下面看下Flink內部是怎么實現反壓機制的。
1)每個TaskManager維護共享Network BufferPool(Task共享內存池),初始化時向Off-heap Memory中申請內存。
2)每個Task創建自身的Local BufferPool(Task本地內存池),并和Network BufferPool交換內存。
3)上游Record Writer向 Local BufferPool申請buffer(內存)寫數據。如果Local BufferPool沒有足夠內存則向Network BufferPool申請,使用完之后將申請的內存返回Pool。
4)Netty Buffer拷貝buffer并經過Socket Buffer發送到網絡,后續下游端按照相似機制處理。
5)當下游申請buffer失敗時,表示當前節點內存不夠,則逐層發送反壓信號給上游,上游慢慢停止數據發送,直到下游再次恢復。
5)反壓如何處理
查看Flink UI界面,定位哪些Task出現反壓問題
查看代碼和數據,檢查是否出現數據傾斜
如果發生數據傾斜,進行預聚合key或拆分數據
加大執行內存,調整并發度和分區數
其他方式。。。
由于篇幅有限,更多Flink反壓內容請查看我的相關文章:萬字趣解Flink背壓
Flink的exactly-once怎么保障
精準一次消費需要整個系統各環節均保持強一致性,包括可靠的數據源端(數據可重復讀取、不丟失) 、可靠的消費端(Flink)、可靠的輸出端(冪等性、事務)。
Flink保持精準一次消費主要依靠checkpoint一致性快照和二階段提交機制。
1)數據源端
Flink內置FlinkKafkaConsumer類,不依賴于 kafka 內置的消費組offset管理,在內部自行記錄并維護 kafka consumer 的offset。
(1)管理offset(手動提交)并保存到checkpoint中(2)FlinkKafkaConsumer API內部集成Flink的Checkpoint機制,自動實現精確一次的處理語義。
從源碼中看到stateBackend中把offset state恢復到restoredState,然后從fetcher拉取最新的offset數據,隨后將offset存入到stateBackend中;最后更新xcheckpoint。
2)Flink消費端
Flink內部采用一致性快照機制來保障Exactly-Once的一致性語義。
通過間隔時間自動執行一致性檢查點(Checkpoints)程序,b并異步插入barrier檢查點分界線。整個流程所有的operator均會進行barrier對齊->數據完成確認->checkpoints狀態保存,從而保證數據被精確一次處理。
3)輸出端
Flink內置二階段事務提交機制和目標源支持冪等寫入。
冪等寫入就是多次寫入會產生相同的結果,結果具有不可變性。在Flink中saveAsTextFile算子就是一種比較典型的冪等寫入。
二階段提交則對于每個checkpoint創建事務,先預提交數據到sink中,然后等所有的checkpoint全部完成后再真正提交請求到sink, 并把狀態改為已確認,從而保證數據僅被處理一次。
為checkpoint創建事務,等到所有的checkpoint全部真正的完成后,才把計算結果寫入到sink中。Flink怎么處理遲到數據
Flink內置watermark機制,可在一定程度上允許數據延遲
程序可在watermark的基礎上再配置最大延遲時間
開啟側輸出流,將延遲的數據輸出到側輸出流
程序內部控制,延遲過高的數據單獨進行后續處理
Flink的雙流JOIN
Flink雙流JOIN主要分為兩大類。一類是基于原生State的Connect算子操作,另一類是基于窗口的JOIN操作。其中基于窗口的JOIN可細分為window join和interval join兩種。
實現原理:底層原理依賴Flink的State狀態存儲,通過將數據存儲到State中進行關聯join, 最終輸出結果。1)基于Window Join的雙流JOIN實現機制
通俗理解,將兩條實時流中元素分配到同一個時間窗口中完成Join。兩條實時流數據緩存在Window State中,當窗口觸發計算時執行join操作。
join算子操作
兩條流數據按照關聯主鍵在(滾動、滑動、會話)窗口內進行inner join, 底層基于State存儲,并支持處理時間和事件時間兩種時間特征,看下源碼:
windows窗口、state存儲和雙層for循環執行join()實現雙流JOIN操作,但是此時僅支持inner join類型。
coGroup算子操作
coGroup算子也是基于window窗口機制,不過coGroup算子比Join算子更加靈活,可以按照用戶指定的邏輯匹配左流或右流數據并輸出,達到left join和right join的目的。
orderDetailStream.coGroup(orderStream).where(r -> r.getOrderId()).equalTo(r -> r.getOrderId()).window(TumblingProcessingTimeWindows.of(Time.seconds(60))).apply(new CoGroupFunction<OrderDetail, Order, Tuple2<String, Long>>() {@Overridepublic void coGroup(Iterable<OrderDetail> orderDetailRecords, Iterable<Order> orderRecords, Collector<Tuple2<String, Long>> collector) {for (OrderDetail orderDetaill : orderDetailRecords) {boolean flag = false;for (Order orderRecord : orderRecords) {// 右流中有對應的記錄collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), orderDetailRecords.getGoods_price()));flag = true;}if (!flag) {// 右流中沒有對應的記錄collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), null));}}}}).print(); 復制代碼2)基于Interval Join的雙流JOIN實現機制
Interval Join根據右流相對左流偏移的時間區間(interval)作為關聯窗口,在偏移區間窗口中完成join操作。
滿足數據流stream2在數據流stream1的 interval(low, high)偏移區間內關聯join。interval越大,關聯上的數據就越多,超出interval的數據不再關聯。
實現原理:interval join也是利用Flink的state存儲數據,不過此時存在state失效機制ttl,觸發數據清理操作。 val env = ... // kafka 訂單流val orderStream = ... // kafka 訂單明細流val orderDetailStream = ...orderStream.keyBy(_.1)// 調用intervalJoin關聯.intervalJoin(orderDetailStream._2)// 設定時間上限和下限.between(Time.milliseconds(-30), Time.milliseconds(30)) .process(new ProcessWindowFunction())class ProcessWindowFunction extends ProcessJoinFunction...{override def processElement(...) {collector.collect((r1, r2) => r1 + " : " + r2)} } 復制代碼訂單流在流入程序后,等候(low,high)時間間隔內的訂單明細流數據進行join, 否則繼續處理下一個流。interval join目前也僅支持inner join。
3)基于Connect的雙流JOIN實現機制
對兩個DataStream執行connect操作,將其轉化為ConnectedStreams, 生成的Streams可以調用不同方法在兩個實時流上執行,且雙流之間可以共享狀態。
兩個數據流被connect之后,只是被放在了同一個流中,內部依然保持各自的數據和形式,兩個流相互獨立。
[DataStream1, DataStream2] -> ConnectedStreams[1,2]我們可以在Connect算子底層的ConnectedStreams中編寫代碼,自行實現雙流JOIN的邏輯處理。
1)調用connect算子,根據orderid進行分組,并使用process算子分別對兩條流進行處理。
2)process方法內部進行狀態編程, 初始化訂單、訂單明細和定時器的ValueState狀態。
3)為每個進入的數據流保存state狀態并創建定時器。在時間窗口內另一個流達到時進行join并輸出,完成后刪除定時器。
4)未及時達到的數據流觸發定時器輸出到側輸出流,左流先到而右流未到,則輸出左流,反之輸出右連流。
4)Flink雙流JOIN問題處理總結
1)為什么我的雙流join時間到了卻不觸發,一直沒有輸出
2)state數據保存多久,會內存爆炸嗎
3)我的雙流join傾斜怎么辦
4)想實現多流join怎么辦
5)join過程延遲、沒關聯上的數據會丟失嗎
Flink數據傾斜怎么處理
數據傾斜一般都是數據Key分配不均,比如某一類型key數量過多,導致shuffle過程分到某節點數據量過大,內存無法支撐。
1)數據傾斜可能的情況
那我們怎么發現數據傾斜了呢?一般是監控某任務Job執行情況,可以去Yarn UI或者Flink UI觀察,一般會出現如下狀況:
發現某subTask執行時間過慢
傳輸數據量和其他task相差過大
BackPressure頁面出現反壓問題(紅色High標識)
結合以上的排查定位到具體的task中執行的算子,一般常見于Keyed類型算子:比如groupBy()、rebance()等產生shuffle過程的操作。
2)數據傾斜的處理方法
數據拆分。如果能定位的數據傾斜的key,總結其規律特征。比如發現包含某字符,則可以在代碼中把該部分數據key拆分出來,單獨處理后拼接。
key二次聚合。兩次聚合,第一次將key加前綴聚合,分散單點壓力;隨后去除前綴后再次聚合,得到最終結果。
調整參數。加大TaskManager內存、keyby均衡等參數,一般效果不是很好。
自定義分區或聚合邏輯。繼承分區劃分、聚合計算接口,根據數據特征和自定義邏輯,調整數據分區并均勻打散數據key。
Flink數據重復怎么辦
一般來說Flink可以開啟exactly-once機制,可保證精準一次消費。但是如果存在數據處理過程異常導致數據重復,可以借助一些工具或者程序來處理。
建議數據量不大的話可以使用flink自身的state或者借助bitmap結構;稍微大點可以用布隆過濾器或hyperlog工具;其次使用外部介質(redis或hbase)設計好key就行自動去重,只不過會增加處理過程。
總結一下Flink的去重方式:
內存去重。采用Hashset等數據結構,讀取數據中類似主鍵等唯一性標識字段,在內存中存儲并進行去重判斷。
使用Redis Key去重。借助Redis的Hset等特殊數據類型,自動完成Key去重。
DataFrame/SQL場景,使用group by、over()、window開窗等SQL函數去重
利用groupByKey等聚合算子去重
Flink實時數倉架構,為什么這么設計
實時數倉數據規整為層級存儲,每層獨立加工。整體遵循由下向上建設思想,最大化數據賦能。
1)數倉分層設計
數據源: 分為日志數據和業務數據兩大類,包括結構化和非結構化數據。
數倉類型:根據及時性分為離線數倉和實時數倉
技術棧:
采集(Sqoop、Flume、CDC)
存儲(Hive、Hbase、Mysql、Kafka、數據湖)
加工(Hive、Spark、Flink)
OLAP查詢(Kylin、Clickhous、ES、Dorisdb)等。
2)數倉架構設計
整體采用Lambda架構。保留實時、離線兩條處理流程,即最終會同時構建實時數倉和離線數倉。
1. 技術實現
使用Flink和Kafka、Hive為主要技術棧
實時技術流程。通過實時采集程序同步數據到Kafka消息隊列
Flink實時讀取Kafka數據,回寫到kafka ods貼源層topic
Flink實時讀取Kafka的ods層數據,進行實時清洗和加工,結果寫入到kafka dwd明細層topic
同樣的步驟,Flink讀取dwd層數據寫入到kafka dws匯總層topic
離線技術流程和前面章節一致
實時olap引擎查詢分析、報表展示
2. 優缺點
兩套技術流程,全面保障實時性和歷史數據完整性
同時維護兩套技術架構,維護成本高,技術難度大
相同數據源處理兩次且存儲兩次,產生大量數據冗余和操作重復
容易產生數據不一致問題
3)數據流程設計
整體從上而下,數據經過采集 -> 數倉明細加工、匯總 -> 應用步驟,提供實時數倉服務。
總結
- 上一篇: Git上修改分支名称
- 下一篇: 支付宝 二维码/账号/转账码/生成方式/