2022年最强大数据面试宝典(全文50000字)
此套面試題來自于各大廠的真實面試題及常問的知識點,如果能理解吃透這些問題,你的大數據能力將會大大提升,進入大廠指日可待
復習大數據面試題,看這一套就夠了!
本文目錄:
一、Hadoop
二、Hive
三、Spark
四、Kafka
五、HBase
六、Flink
七、數倉業務方面
八、算法
本文PDF版文檔預覽如下
點擊獲取本文PDF版:2022年最強大數據面試寶典PDF版
Hadoop
Hadoop中常問的就三塊,第一:分布式存儲(HDFS);第二:分布式計算框架(MapReduce);第三:資源調度框架(YARN)。
1. 請說下HDFS讀寫流程
這個問題雖然見過無數次,面試官問過無數次,還是有不少面試者不能完整的說出來,所以請務必記住。并且很多問題都是從HDFS讀寫流程中引申出來的。
HDFS寫流程:
Client客戶端發送上傳請求,通過RPC與NameNode建立通信,NameNode檢查該用戶是否有上傳權限,以及上傳的文件是否在HDFS對應的目錄下重名,如果這兩者有任意一個不滿足,則直接報錯,如果兩者都滿足,則返回給客戶端一個可以上傳的信息;
Client根據文件的大小進行切分,默認128M一塊,切分完成之后給NameNode發送請求第一個block塊上傳到哪些服務器上;
NameNode收到請求之后,根據網絡拓撲和機架感知以及副本機制進行文件分配,返回可用的DataNode的地址;
注:Hadoop在設計時考慮到數據的安全與高效, 數據文件默認在HDFS上存放三份, 存儲策略為本地一份,同機架內其它某一節點上一份, 不同機架的某一節點上一份。
客戶端收到地址之后與服務器地址列表中的一個節點如A進行通信,本質上就是RPC調用,建立pipeline,A收到請求后會繼續調用B,B在調用C,將整個pipeline建立完成,逐級返回Client;
Client開始向A上發送第一個block(先從磁盤讀取數據然后放到本地內存緩存),以packet(數據包,64kb)為單位,A收到一個packet就會發送給B,然后B發送給C,A每傳完一個packet就會放入一個應答隊列等待應答;
數據被分割成一個個的packet數據包在pipeline上依次傳輸,在pipeline反向傳輸中,逐個發送ack(命令正確應答),最終由pipeline中第一個DataNode節點A將pipelineack發送給Client;
當一個block傳輸完成之后, Client再次請求NameNode上傳第二個block,NameNode重新選擇三臺DataNode給Client。
HDFS讀流程:
Client向NameNode發送RPC請求。請求文件block的位置;
NameNode收到請求之后會檢查用戶權限以及是否有這個文件,如果都符合,則會視情況返回部分或全部的block列表,對于每個block,NameNode都會返回含有該block副本的DataNode地址;這些返回的DataNode地址,會按照集群拓撲結構得出DataNode與客戶端的距離,然后進行排序,排序兩個規則:網絡拓撲結構中距離 Client 近的排靠前;心跳機制中超時匯報的DataNode狀態為STALE,這樣的排靠后;
Client選取排序靠前的DataNode來讀取block,如果客戶端本身就是DataNode,那么將從本地直接獲取數據(短路讀取特性);
底層上本質是建立Socket Stream(FSDataInputStream),重復的調用父類DataInputStream的read方法,直到這個塊上的數據讀取完畢;
當讀完列表的block后,若文件讀取還沒有結束,客戶端會繼續向NameNode 獲取下一批的block列表;
讀取完一個block都會進行checksum驗證,如果讀取DataNode時出現錯誤,客戶端會通知NameNode,然后再從下一個擁有該block副本的DataNode 繼續讀;
read方法是并行的讀取block信息,不是一塊一塊的讀取;NameNode只是返回Client請求包含塊的DataNode地址,并不是返回請求塊的數據;
最終讀取來所有的block會合并成一個完整的最終文件;
2. HDFS在讀取文件的時候,如果其中一個塊突然損壞了怎么辦
客戶端讀取完DataNode上的塊之后會進行checksum驗證,也就是把客戶端讀取到本地的塊與HDFS上的原始塊進行校驗,如果發現校驗結果不一致,客戶端會通知NameNode,然后再從下一個擁有該block副本的DataNode繼續讀。
3. HDFS在上傳文件的時候,如果其中一個DataNode突然掛掉了怎么辦
客戶端上傳文件時與DataNode建立pipeline管道,管道的正方向是客戶端向DataNode發送的數據包,管道反向是DataNode向客戶端發送ack確認,也就是正確接收到數據包之后發送一個已確認接收到的應答。
當DataNode突然掛掉了,客戶端接收不到這個DataNode發送的ack確認,客戶端會通知NameNode,NameNode檢查該塊的副本與規定的不符,NameNode會通知DataNode去復制副本,并將掛掉的DataNode作下線處理,不再讓它參與文件上傳與下載。
4. NameNode在啟動的時候會做哪些操作
NameNode數據存儲在內存和本地磁盤,本地磁盤數據存儲在fsimage鏡像文件和edits編輯日志文件。
首次啟動NameNode:
格式化文件系統,為了生成fsimage鏡像文件;
啟動NameNode:
-
讀取fsimage文件,將文件內容加載進內存
-
等待DataNade注冊與發送block report
啟動DataNode:
-
向NameNode注冊
-
發送block report
-
檢查fsimage中記錄的塊的數量和block report中的塊的總數是否相同
對文件系統進行操作(創建目錄,上傳文件,刪除文件等):
-
此時內存中已經有文件系統改變的信息,但是磁盤中沒有文件系統改變的信息,此時會將這些改變信息寫入edits文件中,edits文件中存儲的是文件系統元數據改變的信息。
第二次啟動NameNode:
讀取fsimage和edits文件;
將fsimage和edits文件合并成新的fsimage文件;
創建新的edits文件,內容開始為空;
啟動DataNode。
5. Secondary NameNode了解嗎,它的工作機制是怎樣的
Secondary NameNode是合并NameNode的edit logs到fsimage文件中;
它的具體工作機制:
Secondary NameNode詢問NameNode是否需要checkpoint。直接帶回NameNode是否檢查結果;
Secondary NameNode請求執行checkpoint;
NameNode滾動正在寫的edits日志;
將滾動前的編輯日志和鏡像文件拷貝到Secondary NameNode;
Secondary NameNode加載編輯日志和鏡像文件到內存,并合并;
生成新的鏡像文件fsimage.chkpoint;
拷貝fsimage.chkpoint到NameNode;
NameNode將fsimage.chkpoint重新命名成fsimage;
所以如果NameNode中的元數據丟失,是可以從Secondary NameNode恢復一部分元數據信息的,但不是全部,因為NameNode正在寫的edits日志還沒有拷貝到Secondary NameNode,這部分恢復不了。
6. Secondary NameNode不能恢復NameNode的全部數據,那如何保證NameNode數據存儲安全
這個問題就要說NameNode的高可用了,即 NameNode HA。
一個NameNode有單點故障的問題,那就配置雙NameNode,配置有兩個關鍵點,一是必須要保證這兩個NameNode的元數據信息必須要同步的,二是一個NameNode掛掉之后另一個要立馬補上。
元數據信息同步在 HA 方案中采用的是“共享存儲”。每次寫文件時,需要將日志同步寫入共享存儲,這個步驟成功才能認定寫文件成功。然后備份節點定期從共享存儲同步日志,以便進行主備切換。
監控NameNode狀態采用zookeeper,兩個NameNode節點的狀態存放在zookeeper中,另外兩個NameNode節點分別有一個進程監控程序,實施讀取zookeeper中有NameNode的狀態,來判斷當前的NameNode是不是已經down機。如果Standby的NameNode節點的ZKFC發現主節點已經掛掉,那么就會強制給原本的Active NameNode節點發送強制關閉請求,之后將備用的NameNode設置為Active。
如果面試官再問HA中的 共享存儲 是怎么實現的知道嗎?
可以進行解釋下:NameNode 共享存儲方案有很多,比如Linux HA, VMware FT, QJM等,目前社區已經把由Clouderea公司實現的基于QJM(Quorum Journal Manager)的方案合并到HDFS的trunk之中并且作為默認的共享存儲實現。
基于QJM的共享存儲系統主要用于保存EditLog,并不保存FSImage文件。FSImage文件還是在NameNode的本地磁盤上。
QJM共享存儲的基本思想來自于Paxos算法,采用多個稱為JournalNode的節點組成的JournalNode集群來存儲EditLog。每個JournalNode保存同樣的EditLog副本。每次NameNode寫EditLog的時候,除了向本地磁盤寫入 EditLog 之外,也會并行地向JournalNode集群之中的每一個JournalNode發送寫請求,只要大多數的JournalNode節點返回成功就認為向JournalNode集群寫入EditLog成功。如果有2N+1臺JournalNode,那么根據大多數的原則,最多可以容忍有N臺JournalNode節點掛掉。
7. 在NameNode HA中,會出現腦裂問題嗎?怎么解決腦裂
假設 NameNode1 當前為 Active 狀態,NameNode2 當前為 Standby 狀態。如果某一時刻 NameNode1 對應的 ZKFailoverController 進程發生了“假死”現象,那么 Zookeeper 服務端會認為 NameNode1 掛掉了,根據前面的主備切換邏輯,NameNode2 會替代 NameNode1 進入 Active 狀態。但是此時 NameNode1 可能仍然處于 Active 狀態正常運行,這樣 NameNode1 和 NameNode2 都處于 Active 狀態,都可以對外提供服務。這種情況稱為腦裂。
腦裂對于NameNode這類對數據一致性要求非常高的系統來說是災難性的,數據會發生錯亂且無法恢復。zookeeper社區對這種問題的解決方法叫做 fencing,中文翻譯為隔離,也就是想辦法把舊的 Active NameNode 隔離起來,使它不能正常對外提供服務。
在進行 fencing 的時候,會執行以下的操作:
首先嘗試調用這個舊 Active NameNode 的 HAServiceProtocol RPC 接口的 transitionToStandby 方法,看能不能把它轉換為 Standby 狀態。
如果 transitionToStandby 方法調用失敗,那么就執行 Hadoop 配置文件之中預定義的隔離措施,Hadoop 目前主要提供兩種隔離措施,通常會選擇 sshfence:
-
sshfence:通過 SSH 登錄到目標機器上,執行命令 fuser 將對應的進程殺死;
-
shellfence:執行一個用戶自定義的 shell 腳本來將對應的進程隔離。
8. 小文件過多會有什么危害,如何避免
Hadoop上大量HDFS元數據信息存儲在NameNode內存中,因此過多的小文件必定會壓垮NameNode的內存。
每個元數據對象約占150byte,所以如果有1千萬個小文件,每個文件占用一個block,則NameNode大約需要2G空間。如果存儲1億個文件,則NameNode需要20G空間。
顯而易見的解決這個問題的方法就是合并小文件,可以選擇在客戶端上傳時執行一定的策略先合并,或者是使用Hadoop的CombineFileInputFormat\<K,V\>實現小文件的合并。
9. 請說下HDFS的組織架構
Client:客戶端
-
切分文件。文件上傳HDFS的時候,Client將文件切分成一個一個的Block,然后進行存儲
-
與NameNode交互,獲取文件的位置信息
-
與DataNode交互,讀取或者寫入數據
-
Client提供一些命令來管理HDFS,比如啟動關閉HDFS、訪問HDFS目錄及內容等
NameNode:名稱節點,也稱主節點,存儲數據的元數據信息,不存儲具體的數據
-
管理HDFS的名稱空間
-
管理數據塊(Block)映射信息
-
配置副本策略
-
處理客戶端讀寫請求
DataNode:數據節點,也稱從節點。NameNode下達命令,DataNode執行實際的操作
-
存儲實際的數據塊
-
執行數據塊的讀/寫操作
Secondary NameNode:并非NameNode的熱備。當NameNode掛掉的時候,它并不能馬上替換NameNode并提供服務
-
輔助NameNode,分擔其工作量
-
定期合并Fsimage和Edits,并推送給NameNode
-
在緊急情況下,可輔助恢復NameNode
10. 請說下MR中Map Task的工作機制
簡單概述:
inputFile通過split被切割為多個split文件,通過Record按行讀取內容給map(自己寫的處理邏輯的方法) ,數據被map處理完之后交給OutputCollect收集器,對其結果key進行分區(默認使用的hashPartitioner),然后寫入buffer,每個map task 都有一個內存緩沖區(環形緩沖區),存放著map的輸出結果,當緩沖區快滿的時候需要將緩沖區的數據以一個臨時文件的方式溢寫到磁盤,當整個map task 結束后再對磁盤中這個maptask產生的所有臨時文件做合并,生成最終的正式輸出文件,然后等待reduce task的拉取。
詳細步驟:
讀取數據組件 InputFormat (默認 TextInputFormat) 會通過 getSplits 方法對輸入目錄中的文件進行邏輯切片規劃得到 block,有多少個 block就對應啟動多少個 MapTask。
將輸入文件切分為 block 之后,由 RecordReader 對象 (默認是LineRecordReader) 進行讀取,以 \n 作為分隔符, 讀取一行數據, 返回 <key,value>, Key 表示每行首字符偏移值,Value 表示這一行文本內容。
讀取 block 返回 <key,value>, 進入用戶自己繼承的 Mapper 類中,執行用戶重寫的 map 函數,RecordReader 讀取一行這里調用一次。
Mapper 邏輯結束之后,將 Mapper 的每條結果通過 context.write 進行collect數據收集。在 collect 中,會先對其進行分區處理,默認使用 HashPartitioner。
接下來,會將數據寫入內存,內存中這片區域叫做環形緩沖區(默認100M),緩沖區的作用是 批量收集 Mapper 結果,減少磁盤 IO 的影響。我們的 Key/Value 對以及 Partition 的結果都會被寫入緩沖區。當然,寫入之前,Key 與 Value 值都會被序列化成字節數組。
當環形緩沖區的數據達到溢寫比列(默認0.8),也就是80M時,溢寫線程啟動,需要對這 80MB 空間內的 Key 做排序 (Sort)。排序是 MapReduce 模型默認的行為,這里的排序也是對序列化的字節做的排序。
合并溢寫文件,每次溢寫會在磁盤上生成一個臨時文件 (寫之前判斷是否有 Combiner),如果 Mapper 的輸出結果真的很大,有多次這樣的溢寫發生,磁盤上相應的就會有多個臨時文件存在。當整個數據處理結束之后開始對磁盤中的臨時文件進行 Merge 合并,因為最終的文件只有一個寫入磁盤,并且為這個文件提供了一個索引文件,以記錄每個reduce對應數據的偏移量。
11. 請說下MR中Reduce Task的工作機制
簡單描述:
Reduce 大致分為 copy、sort、reduce 三個階段,重點在前兩個階段。
copy 階段包含一個 eventFetcher 來獲取已完成的 map 列表,由 Fetcher 線程去 copy 數據,在此過程中會啟動兩個 merge 線程,分別為 inMemoryMerger 和 onDiskMerger,分別將內存中的數據 merge 到磁盤和將磁盤中的數據進行 merge。待數據 copy 完成之后,copy 階段就完成了。
開始進行 sort 階段,sort 階段主要是執行 finalMerge 操作,純粹的 sort 階段,完成之后就是 reduce 階段,調用用戶定義的 reduce 函數進行處理。
詳細步驟:
Copy階段:簡單地拉取數據。Reduce進程啟動一些數據copy線程(Fetcher),通過HTTP方式請求maptask獲取屬于自己的文件(map task 的分區會標識每個map task屬于哪個reduce task ,默認reduce task的標識從0開始)。
Merge階段:在遠程拷貝數據的同時,ReduceTask啟動了兩個后臺線程對內存和磁盤上的文件進行合并,以防止內存使用過多或磁盤上文件過多。
merge有三種形式:內存到內存;內存到磁盤;磁盤到磁盤。默認情況下第一種形式不啟用。當內存中的數據量到達一定閾值,就直接啟動內存到磁盤的merge。與map端類似,這也是溢寫的過程,這個過程中如果你設置有Combiner,也是會啟用的,然后在磁盤中生成了眾多的溢寫文件。內存到磁盤的merge方式一直在運行,直到沒有map端的數據時才結束,然后啟動第三種磁盤到磁盤的merge方式生成最終的文件。
合并排序:把分散的數據合并成一個大的數據后,還會再對合并后的數據排序。
對排序后的鍵值對調用reduce方法:鍵相等的鍵值對調用一次reduce方法,每次調用會產生零個或者多個鍵值對,最后把這些輸出的鍵值對寫入到HDFS文件中。
12. 請說下MR中Shuffle階段
shuffle階段分為四個步驟:依次為:分區,排序,規約,分組,其中前三個步驟在map階段完成,最后一個步驟在reduce階段完成。
shuffle 是 Mapreduce 的核心,它分布在 Mapreduce 的 map 階段和 reduce 階段。一般把從 Map 產生輸出開始到 Reduce 取得數據作為輸入之前的過程稱作 shuffle。
Collect階段:將 MapTask 的結果輸出到默認大小為 100M 的環形緩沖區,保存的是 key/value,Partition 分區信息等。
Spill階段:當內存中的數據量達到一定的閥值的時候,就會將數據寫入本地磁盤,在將數據寫入磁盤之前需要對數據進行一次排序的操作,如果配置了 combiner,還會將有相同分區號和 key 的數據進行排序。
MapTask階段的Merge:把所有溢出的臨時文件進行一次合并操作,以確保一個 MapTask 最終只產生一個中間數據文件。
Copy階段:ReduceTask 啟動 Fetcher 線程到已經完成 MapTask 的節點上復制一份屬于自己的數據,這些數據默認會保存在內存的緩沖區中,當內存的緩沖區達到一定的閥值的時候,就會將數據寫到磁盤之上。
ReduceTask階段的Merge:在 ReduceTask 遠程復制數據的同時,會在后臺開啟兩個線程對內存到本地的數據文件進行合并操作。
Sort階段:在對數據進行合并的同時,會進行排序操作,由于 MapTask 階段已經對數據進行了局部的排序,ReduceTask 只需保證 Copy 的數據的最終整體有效性即可。
Shuffle 中的緩沖區大小會影響到 mapreduce 程序的執行效率,原則上說,緩沖區越大,磁盤io的次數越少,執行速度就越快。
緩沖區的大小可以通過參數調整, 參數:mapreduce.task.io.sort.mb 默認100M
13. Shuffle階段的數據壓縮機制了解嗎
在shuffle階段,可以看到數據通過大量的拷貝,從map階段輸出的數據,都要通過網絡拷貝,發送到reduce階段,這一過程中,涉及到大量的網絡IO,如果數據能夠進行壓縮,那么數據的發送量就會少得多。
hadoop當中支持的壓縮算法:
gzip、bzip2、LZO、LZ4、Snappy,這幾種壓縮算法綜合壓縮和解壓縮的速率,谷歌的Snappy是最優的,一般都選擇Snappy壓縮。谷歌出品,必屬精品。
14. 在寫MR時,什么情況下可以使用規約
規約(combiner)是不能夠影響任務的運行結果的局部匯總,適用于求和類,不適用于求平均值,如果reduce的輸入參數類型和輸出參數的類型是一樣的,則規約的類可以使用reduce類,只需要在驅動類中指明規約的類即可。
15. YARN集群的架構和工作原理知道多少
YARN的基本設計思想是將MapReduce V1中的JobTracker拆分為兩個獨立的服務:ResourceManager和ApplicationMaster。
ResourceManager負責整個系統的資源管理和分配,ApplicationMaster負責單個應用程序的的管理。
ResourceManager: RM是一個全局的資源管理器,負責整個系統的資源管理和分配,它主要由兩個部分組成:調度器(Scheduler)和應用程序管理器(Application Manager)。
調度器根據容量、隊列等限制條件,將系統中的資源分配給正在運行的應用程序,在保證容量、公平性和服務等級的前提下,優化集群資源利用率,讓所有的資源都被充分利用應用程序管理器負責管理整個系統中的所有的應用程序,包括應用程序的提交、與調度器協商資源以啟動ApplicationMaster、監控ApplicationMaster運行狀態并在失敗時重啟它。
ApplicationMaster: 用戶提交的一個應用程序會對應于一個ApplicationMaster,它的主要功能有:
-
與RM調度器協商以獲得資源,資源以Container表示。
-
將得到的任務進一步分配給內部的任務。
-
與NM通信以啟動/停止任務。
-
監控所有的內部任務狀態,并在任務運行失敗的時候重新為任務申請資源以重啟任務。
NodeManager: NodeManager是每個節點上的資源和任務管理器,一方面,它會定期地向RM匯報本節點上的資源使用情況和各個Container的運行狀態;另一方面,他接收并處理來自AM的Container啟動和停止請求。
Container: Container是YARN中的資源抽象,封裝了各種資源。一個應用程序會分配一個Container,這個應用程序只能使用這個Container中描述的資源。不同于MapReduceV1中槽位slot的資源封裝,Container是一個動態資源的劃分單位,更能充分利用資源。
16. YARN的任務提交流程是怎樣的
當jobclient向YARN提交一個應用程序后,YARN將分兩個階段運行這個應用程序:一是啟動ApplicationMaster;第二個階段是由ApplicationMaster創建應用程序,為它申請資源,監控運行直到結束。 具體步驟如下:
用戶向YARN提交一個應用程序,并指定ApplicationMaster程序、啟動ApplicationMaster的命令、用戶程序。
RM為這個應用程序分配第一個Container,并與之對應的NM通訊,要求它在這個Container中啟動應用程序ApplicationMaster。
ApplicationMaster向RM注冊,然后拆分為內部各個子任務,為各個內部任務申請資源,并監控這些任務的運行,直到結束。
AM采用輪詢的方式向RM申請和領取資源。
RM為AM分配資源,以Container形式返回。
AM申請到資源后,便與之對應的NM通訊,要求NM啟動任務。
NodeManager為任務設置好運行環境,將任務啟動命令寫到一個腳本中,并通過運行這個腳本啟動任務。
各個任務向AM匯報自己的狀態和進度,以便當任務失敗時可以重啟任務。
應用程序完成后,ApplicationMaster向ResourceManager注銷并關閉自己。
17. YARN的資源調度三種模型了解嗎
在Yarn中有三種調度器可以選擇:FIFO Scheduler ,Capacity Scheduler,Fair Scheduler。
Apache版本的hadoop默認使用的是Capacity Scheduler調度方式。CDH版本的默認使用的是Fair Scheduler調度方式
FIFO Scheduler(先來先服務):
FIFO Scheduler把應用按提交的順序排成一個隊列,這是一個先進先出隊列,在進行資源分配的時候,先給隊列中最頭上的應用進行分配資源,待最頭上的應用需求滿足后再給下一個分配,以此類推。
FIFO Scheduler是最簡單也是最容易理解的調度器,也不需要任何配置,但它并不適用于共享集群。大的應用可能會占用所有集群資源,這就導致其它應用被阻塞,比如有個大任務在執行,占用了全部的資源,再提交一個小任務,則此小任務會一直被阻塞。
Capacity Scheduler(能力調度器):
對于Capacity調度器,有一個專門的隊列用來運行小任務,但是為小任務專門設置一個隊列會預先占用一定的集群資源,這就導致大任務的執行時間會落后于使用FIFO調度器時的時間。
Fair Scheduler(公平調度器):
在Fair調度器中,我們不需要預先占用一定的系統資源,Fair調度器會為所有運行的job動態的調整系統資源。
比如:當第一個大job提交時,只有這一個job在運行,此時它獲得了所有集群資源;當第二個小任務提交后,Fair調度器會分配一半資源給這個小任務,讓這兩個任務公平的共享集群資源。
需要注意的是,在Fair調度器中,從第二個任務提交到獲得資源會有一定的延遲,因為它需要等待第一個任務釋放占用的Container。小任務執行完成之后也會釋放自己占用的資源,大任務又獲得了全部的系統資源。最終的效果就是Fair調度器即得到了高的資源利用率又能保證小任務及時完成。
Hive
1. Hive內部表和外部表的區別
未被external修飾的是內部表,被external修飾的為外部表。
區別:
內部表數據由Hive自身管理,外部表數據由HDFS管理;
內部表數據存儲的位置是hive.metastore.warehouse.dir(默認:/user/hive/warehouse),外部表數據的存儲位置由自己制定(如果沒有LOCATION,Hive將在HDFS上的/user/hive/warehouse文件夾下以外部表的表名創建一個文件夾,并將屬于這個表的數據存放在這里);
刪除內部表會直接刪除元數據(metadata)及存儲數據;刪除外部表僅僅會刪除元數據,HDFS上的文件并不會被刪除。
2. Hive有索引嗎
Hive支持索引(3.0版本之前),但是Hive的索引與關系型數據庫中的索引并不相同,比如,Hive不支持主鍵或者外鍵。并且Hive索引提供的功能很有限,效率也并不高,因此Hive索引很少使用。
-
索引適用的場景:
適用于不更新的靜態字段。以免總是重建索引數據。每次建立、更新數據后,都要重建索引以構建索引表。
-
Hive索引的機制如下:
hive在指定列上建立索引,會產生一張索引表(Hive的一張物理表),里面的字段包括:索引列的值、該值對應的HDFS文件路徑、該值在文件中的偏移量。
Hive 0.8版本后引入bitmap索引處理器,這個處理器適用于去重后,值較少的列(例如,某字段的取值只可能是幾個枚舉值) 因為索引是用空間換時間,索引列的取值過多會導致建立bitmap索引表過大。
注意:Hive中每次有數據時需要及時更新索引,相當于重建一個新表,否則會影響數據查詢的效率和準確性,Hive官方文檔已經明確表示Hive的索引不推薦被使用,在新版本的Hive中已經被廢棄了。
擴展:Hive是在0.7版本之后支持索引的,在0.8版本后引入bitmap索引處理器,在3.0版本開始移除索引的功能,取而代之的是2.3版本開始的物化視圖,自動重寫的物化視圖替代了索引的功能。
3. 運維如何對Hive進行調度
將hive的sql定義在腳本當中;
使用azkaban或者oozie進行任務的調度;
監控任務調度頁面。
4. ORC、Parquet等列式存儲的優點
ORC和Parquet都是高性能的存儲方式,這兩種存儲格式總會帶來存儲和性能上的提升。
Parquet:
Parquet支持嵌套的數據模型,類似于Protocol Buffers,每一個數據模型的schema包含多個字段,每一個字段有三個屬性:重復次數、數據類型和字段名。
重復次數可以是以下三種:required(只出現1次),repeated(出現0次或多次),optional(出現0次或1次)。每一個字段的數據類型可以分成兩種: group(復雜類型)和primitive(基本類型)。
Parquet中沒有Map、Array這樣的復雜數據結構,但是可以通過repeated和group組合來實現的。
由于Parquet支持的數據模型比較松散,可能一條記錄中存在比較深的嵌套關系,如果為每一條記錄都維護一個類似的樹狀結可能會占用較大的存儲空間,因此Dremel論文中提出了一種高效的對于嵌套數據格式的壓縮算法:Striping/Assembly算法。通過Striping/Assembly算法,parquet可以使用較少的存儲空間表示復雜的嵌套格式,并且通常Repetition level和Definition level都是較小的整數值,可以通過RLE算法對其進行壓縮,進一步降低存儲空間。
Parquet文件是以二進制方式存儲的,是不可以直接讀取和修改的,Parquet文件是自解析的,文件中包括該文件的數據和元數據。
ORC:
ORC文件是自描述的,它的元數據使用Protocol Buffers序列化,并且文件中的數據盡可能的壓縮以降低存儲空間的消耗。
和Parquet類似,ORC文件也是以二進制方式存儲的,所以是不可以直接讀取,ORC文件也是自解析的,它包含許多的元數據,這些元數據都是同構ProtoBuffer進行序列化的。
ORC會盡可能合并多個離散的區間盡可能的減少I/O次數。
ORC中使用了更加精確的索引信息,使得在讀取數據時可以指定從任意一行開始讀取,更細粒度的統計信息使得讀取ORC文件跳過整個row group,ORC默認會對任何一塊數據和索引信息使用ZLIB壓縮,因此ORC文件占用的存儲空間也更小。
在新版本的ORC中也加入了對Bloom Filter的支持,它可以進一 步提升謂詞下推的效率,在Hive 1.2.0版本以后也加入了對此的支 持。
5. 數據建模用的哪些模型?
1. 星型模型
星形模式
星形模式(Star Schema)是最常用的維度建模方式。星型模式是以事實表為中心,所有的維度表直接連接在事實表上,像星星一樣。 星形模式的維度建模由一個事實表和一組維表成,且具有以下特點:
a. 維表只和事實表關聯,維表之間沒有關聯;
b. 每個維表主鍵為單列,且該主鍵放置在事實表中,作為兩邊連接的外鍵;
c. 以事實表為核心,維表圍繞核心呈星形分布。
2. 雪花模型
雪花模式
雪花模式(Snowflake Schema)是對星形模式的擴展。雪花模式的維度表可以擁有其他維度表的,雖然這種模型相比星型更規范一些,但是由于這種模型不太容易理解,維護成本比較高,而且性能方面需要關聯多層維表,性能比星型模型要低。
3. 星座模型
星座模型
星座模式是星型模式延伸而來,星型模式是基于一張事實表的,而星座模式是基于多張事實表的,而且共享維度信息。前面介紹的兩種維度建模方法都是多維表對應單事實表,但在很多時候維度空間內的事實表不止一個,而一個維表也可能被多個事實表用到。在業務發展后期,絕大部分維度建模都采用的是星座模式。
數倉建模詳細介紹可查看:通俗易懂數倉建模
6. 為什么要對數據倉庫分層?
-
用空間換時間,通過大量的預處理來提升應用系統的用戶體驗(效率),因此數據倉庫會存在大量冗余的數據。
-
如果不分層的話,如果源業務系統的業務規則發生變化將會影響整個數據清洗過程,工作量巨大。
-
通過數據分層管理可以簡化數據清洗的過程,因為把原來一步的工作分到了多個步驟去完成,相當于把一個復雜的工作拆成了多個簡單的工作,把一個大的黑盒變成了一個白盒,每一層的處理邏輯都相對簡單和容易理解,這樣我們比較容易保證每一個步驟的正確性,當數據發生錯誤的時候,往往我們只需要局部調整某個步驟即可。
數據倉庫詳細介紹可查看:萬字詳解整個數據倉庫建設體系
7. 使用過Hive解析JSON串嗎
Hive處理json數據總體來說有兩個方向的路走:
將json以字符串的方式整個入Hive表,然后通過使用UDF函數解析已經導入到hive中的數據,比如使用LATERAL VIEW json_tuple的方法,獲取所需要的列名。
在導入之前將json拆成各個字段,導入Hive表的數據是已經解析過的。這將需要使用第三方的 SerDe。
詳細介紹可查看:Hive解析Json數組超全講解
8. sort by 和 order by 的區別
order by 會對輸入做全局排序,因此只有一個reducer(多個reducer無法保證全局有序)只有一個reducer,會導致當輸入規模較大時,需要較長的計算時間。
sort by不是全局排序,其在數據進入reducer前完成排序. 因此,如果用sort by進行排序,并且設置mapred.reduce.tasks>1, 則sort by只保證每個reducer的輸出有序,不保證全局有序。
9. 數據傾斜怎么解決
數據傾斜問題主要有以下幾種:
空值引發的數據傾斜
不同數據類型引發的數據傾斜
不可拆分大文件引發的數據傾斜
數據膨脹引發的數據傾斜
表連接時引發的數據傾斜
確實無法減少數據量引發的數據傾斜
以上傾斜問題的具體解決方案可查看:Hive千億級數據傾斜解決方案
注意:對于 left join 或者 right join 來說,不會對關聯的字段自動去除null值,對于 inner join 來說,會對關聯的字段自動去除null值。
小伙伴們在閱讀時注意下,在上面的文章(Hive千億級數據傾斜解決方案)中,有一處sql出現了上述問題(舉例的時候原本是想使用left join的,結果手誤寫成了join)。此問題由公眾號讀者發現,感謝這位讀者指正。
10. Hive 小文件過多怎么解決
1. 使用 hive 自帶的 concatenate 命令,自動合并小文件
使用方法:
#對于非分區表 alter?table?A?concatenate;#對于分區表 alter?table?B?partition(day=20201224)?concatenate;注意:
1、concatenate 命令只支持 RCFILE 和 ORC 文件類型。
2、使用concatenate命令合并小文件時不能指定合并后的文件數量,但可以多次執行該命令。
3、當多次使用concatenate后文件數量不在變化,這個跟參數 mapreduce.input.fileinputformat.split.minsize=256mb 的設置有關,可設定每個文件的最小size。
2. 調整參數減少Map數量
設置map輸入合并小文件的相關參數(執行Map前進行小文件合并):
在mapper中將多個文件合成一個split作為輸入(CombineHiveInputFormat底層是Hadoop的CombineFileInputFormat方法):
set?hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;?--?默認每個Map最大輸入大小(這個值決定了合并后文件的數量):
set?mapred.max.split.size=256000000;???--?256M一個節點上split的至少大小(這個值決定了多個DataNode上的文件是否需要合并):
set?mapred.min.split.size.per.node=100000000;??--?100M一個交換機下split的至少大小(這個值決定了多個交換機上的文件是否需要合并):
set?mapred.min.split.size.per.rack=100000000;??--?100M3. 減少Reduce的數量
reduce 的個數決定了輸出的文件的個數,所以可以調整reduce的個數控制hive表的文件數量。
hive中的分區函數 distribute by 正好是控制MR中partition分區的,可以通過設置reduce的數量,結合分區函數讓數據均衡的進入每個reduce即可:
#設置reduce的數量有兩種方式,第一種是直接設置reduce個數 set?mapreduce.job.reduces=10;#第二種是設置每個reduce的大小,Hive會根據數據總大小猜測確定一個reduce個數 set?hive.exec.reducers.bytes.per.reducer=5120000000;?--?默認是1G,設置為5G#執行以下語句,將數據均衡的分配到reduce中 set?mapreduce.job.reduces=10; insert?overwrite?table?A?partition(dt) select?*?from?B distribute?by?rand();對于上述語句解釋:如設置reduce數量為10,使用 rand(), 隨機生成一個數 x % 10 , 這樣數據就會隨機進入 reduce 中,防止出現有的文件過大或過小。
4. 使用hadoop的archive將小文件歸檔
Hadoop Archive簡稱HAR,是一個高效地將小文件放入HDFS塊中的文件存檔工具,它能夠將多個小文件打包成一個HAR文件,這樣在減少namenode內存使用的同時,仍然允許對文件進行透明的訪問。
#用來控制歸檔是否可用 set?hive.archive.enabled=true; #通知Hive在創建歸檔時是否可以設置父目錄 set?hive.archive.har.parentdir.settable=true; #控制需要歸檔文件的大小 set?har.partfile.size=1099511627776;使用以下命令進行歸檔: ALTER?TABLE?A?ARCHIVE?PARTITION(dt='2021-05-07',?hr='12');對已歸檔的分區恢復為原文件: ALTER?TABLE?A?UNARCHIVE?PARTITION(dt='2021-05-07',?hr='12');注意:
歸檔的分區可以查看不能 insert overwrite,必須先 unarchive
Hive 小文件問題具體可查看:解決hive小文件過多問題
11. Hive優化有哪些
1. 數據存儲及壓縮:
針對hive中表的存儲格式通常有orc和parquet,壓縮格式一般使用snappy。相比與textfile格式表,orc占有更少的存儲。因為hive底層使用MR計算架構,數據流是hdfs到磁盤再到hdfs,而且會有很多次,所以使用orc數據格式和snappy壓縮策略可以降低IO讀寫,還能降低網絡傳輸量,這樣在一定程度上可以節省存儲,還能提升hql任務執行效率;
2. 通過調參優化:
并行執行,調節parallel參數;
調節jvm參數,重用jvm;
設置map、reduce的參數;開啟strict mode模式;
關閉推測執行設置。
3. 有效地減小數據集將大表拆分成子表;結合使用外部表和分區表。
4. SQL優化
-
大表對大表:盡量減少數據集,可以通過分區表,避免掃描全表或者全字段;
-
大表對小表:設置自動識別小表,將小表放入內存中去執行。
Hive優化詳細剖析可查看:Hive企業級性能優化
Spark
1. Spark 的運行流程?
Spark運行流程
具體運行流程如下:
SparkContext 向資源管理器注冊并向資源管理器申請運行 Executor
資源管理器分配 Executor,然后資源管理器啟動 Executor
Executor 發送心跳至資源管理器
SparkContext 構建 DAG 有向無環圖
將 DAG 分解成 Stage(TaskSet)
把 Stage 發送給 TaskScheduler
Executor 向 SparkContext 申請 Task
TaskScheduler 將 Task 發送給 Executor 運行
同時 SparkContext 將應用程序代碼發放給 Executor
Task 在 Executor 上運行,運行完畢釋放所有資源
2. Spark 有哪些組件?
master:管理集群和節點,不參與計算。
worker:計算節點,進程本身不參與計算,和 master 匯報。
Driver:運行程序的 main 方法,創建 spark context 對象。
spark context:控制整個 application 的生命周期,包括 dagsheduler 和 task scheduler 等組件。
client:用戶提交程序的入口。
3. Spark 中的 RDD 機制理解嗎?
rdd 分布式彈性數據集,簡單的理解成一種數據結構,是 spark 框架上的通用貨幣。所有算子都是基于 rdd 來執行的,不同的場景會有不同的 rdd 實現類,但是都可以進行互相轉換。rdd 執行過程中會形成 dag 圖,然后形成 lineage 保證容錯性等。從物理的角度來看 rdd 存儲的是 block 和 node 之間的映射。
RDD 是 spark 提供的核心抽象,全稱為彈性分布式數據集。
RDD 在邏輯上是一個 hdfs 文件,在抽象上是一種元素集合,包含了數據。它是被分區的,分為多個分區,每個分區分布在集群中的不同結點上,從而讓 RDD 中的數據可以被并行操作(分布式數據集)
比如有個 RDD 有 90W 數據,3 個 partition,則每個分區上有 30W 數據。RDD 通常通過 Hadoop 上的文件,即 HDFS 或者 HIVE 表來創建,還可以通過應用程序中的集合來創建;RDD 最重要的特性就是容錯性,可以自動從節點失敗中恢復過來。即如果某個結點上的 RDD partition 因為節點故障,導致數據丟失,那么 RDD 可以通過自己的數據來源重新計算該 partition。這一切對使用者都是透明的。
RDD 的數據默認存放在內存中,但是當內存資源不足時,spark 會自動將 RDD 數據寫入磁盤。比如某結點內存只能處理 20W 數據,那么這 20W 數據就會放入內存中計算,剩下 10W 放到磁盤中。RDD 的彈性體現在于 RDD 上自動進行內存和磁盤之間權衡和切換的機制。
4. RDD 中 reduceBykey 與 groupByKey 哪個性能好,為什么?
reduceByKey:reduceByKey 會在結果發送至 reducer 之前會對每個 mapper 在本地進行 merge,有點類似于在 MapReduce 中的 combiner。這樣做的好處在于,在 map 端進行一次 reduce 之后,數據量會大幅度減小,從而減小傳輸,保證 reduce 端能夠更快的進行結果計算。
groupByKey:groupByKey 會對每一個 RDD 中的 value 值進行聚合形成一個序列(Iterator),此操作發生在 reduce 端,所以勢必會將所有的數據通過網絡進行傳輸,造成不必要的浪費。同時如果數據量十分大,可能還會造成 OutOfMemoryError。
所以在進行大量數據的 reduce 操作時候建議使用 reduceByKey。不僅可以提高速度,還可以防止使用 groupByKey 造成的內存溢出問題。
5. 介紹一下 cogroup rdd 實現原理,你在什么場景下用過這個 rdd?
cogroup:對多個(2~4)RDD 中的 KV 元素,每個 RDD 中相同 key 中的元素分別聚合成一個集合。
與 reduceByKey 不同的是:reduceByKey 針對一個 RDD中相同的 key 進行合并。而 cogroup 針對多個 RDD中相同的 key 的元素進行合并。
cogroup 的函數實現:這個實現根據要進行合并的兩個 RDD 操作,生成一個 CoGroupedRDD 的實例,這個 RDD 的返回結果是把相同的 key 中兩個 RDD 分別進行合并操作,最后返回的 RDD 的 value 是一個 Pair 的實例,這個實例包含兩個 Iterable 的值,第一個值表示的是 RDD1 中相同 KEY 的值,第二個值表示的是 RDD2 中相同 key 的值。
由于做 cogroup 的操作,需要通過 partitioner 進行重新分區的操作,因此,執行這個流程時,需要執行一次 shuffle 的操作(如果要進行合并的兩個 RDD 的都已經是 shuffle 后的 rdd,同時他們對應的 partitioner 相同時,就不需要執行 shuffle)。
場景:表關聯查詢或者處理重復的 key。
6. 如何區分 RDD 的寬窄依賴?
窄依賴:父 RDD 的一個分區只會被子 RDD 的一個分區依賴;
寬依賴:父 RDD 的一個分區會被子 RDD 的多個分區依賴(涉及到 shuffle)。
7. 為什么要設計寬窄依賴?
對于窄依賴:
窄依賴的多個分區可以并行計算;
窄依賴的一個分區的數據如果丟失只需要重新計算對應的分區的數據就可以了。
對于寬依賴:
劃分 Stage(階段)的依據:對于寬依賴,必須等到上一階段計算完成才能計算下一階段。
8. DAG 是什么?
DAG(Directed Acyclic Graph 有向無環圖)指的是數據轉換執行的過程,有方向,無閉環(其實就是 RDD 執行的流程);
原始的 RDD 通過一系列的轉換操作就形成了 DAG 有向無環圖,任務執行時,可以按照 DAG 的描述,執行真正的計算(數據被操作的一個過程)。
9. DAG 中為什么要劃分 Stage?
并行計算。
一個復雜的業務邏輯如果有 shuffle,那么就意味著前面階段產生結果后,才能執行下一個階段,即下一個階段的計算要依賴上一個階段的數據。那么我們按照 shuffle 進行劃分(也就是按照寬依賴就行劃分),就可以將一個 DAG 劃分成多個 Stage/階段,在同一個 Stage 中,會有多個算子操作,可以形成一個 pipeline 流水線,流水線內的多個平行的分區可以并行執行。
10. 如何劃分 DAG 的 stage?
對于窄依賴,partition 的轉換處理在 stage 中完成計算,不劃分(將窄依賴盡量放在在同一個 stage 中,可以實現流水線計算)。
對于寬依賴,由于有 shuffle 的存在,只能在父 RDD 處理完成后,才能開始接下來的計算,也就是說需要要劃分 stage。
11. DAG 劃分為 Stage 的算法了解嗎?
核心算法:回溯算法
從后往前回溯/反向解析,遇到窄依賴加入本 Stage,遇見寬依賴進行 Stage 切分。
Spark 內核會從觸發 Action 操作的那個 RDD 開始從后往前推,首先會為最后一個 RDD 創建一個 Stage,然后繼續倒推,如果發現對某個 RDD 是寬依賴,那么就會將寬依賴的那個 RDD 創建一個新的 Stage,那個 RDD 就是新的 Stage 的最后一個 RDD。 然后依次類推,繼續倒推,根據窄依賴或者寬依賴進行 Stage 的劃分,直到所有的 RDD 全部遍歷完成為止。
具體劃分算法請參考:AMP 實驗室發表的論文
《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》
http://xueshu.baidu.com/usercenter/paper/show?paperid=b33564e60f0a7e7a1889a9da10963461&site=xueshu_se
12. 對于 Spark 中的數據傾斜問題你有什么好的方案?
前提是定位數據傾斜,是 OOM 了,還是任務執行緩慢,看日志,看 WebUI
解決方法,有多個方面:
-
避免不必要的 shuffle,如使用廣播小表的方式,將 reduce-side-join 提升為 map-side-join
-
分拆發生數據傾斜的記錄,分成幾個部分進行,然后合并 join 后的結果
-
改變并行度,可能并行度太少了,導致個別 task 數據壓力大
-
兩階段聚合,先局部聚合,再全局聚合
-
自定義 paritioner,分散 key 的分布,使其更加均勻
13. Spark 中的 OOM 問題?
map 類型的算子執行中內存溢出如 flatMap,mapPatitions
-
原因:map 端過程產生大量對象導致內存溢出:這種溢出的原因是在單個 map 中產生了大量的對象導致的針對這種問題。
解決方案:
-
增加堆內內存。
-
在不增加內存的情況下,可以減少每個 Task 處理數據量,使每個 Task 產生大量的對象時,Executor 的內存也能夠裝得下。具體做法可以在會產生大量對象的 map 操作之前調用 repartition 方法,分區成更小的塊傳入 map。
shuffle 后內存溢出如 join,reduceByKey,repartition。
-
shuffle 內存溢出的情況可以說都是 shuffle 后,單個文件過大導致的。在 shuffle 的使用,需要傳入一個 partitioner,大部分 Spark 中的 shuffle 操作,默認的 partitioner 都是 HashPatitioner,默認值是父 RDD 中最大的分區數.這個參數 spark.default.parallelism 只對 HashPartitioner 有效.如果是別的 partitioner 導致的 shuffle 內存溢出就需要重寫 partitioner 代碼了.
driver 內存溢出
-
用戶在 Dirver 端口生成大對象,比如創建了一個大的集合數據結構。解決方案:將大對象轉換成 Executor 端加載,比如調用 sc.textfile 或者評估大對象占用的內存,增加 dirver 端的內存
-
從 Executor 端收集數據(collect)回 Dirver 端,建議將 driver 端對 collect 回來的數據所作的操作,轉換成 executor 端 rdd 操作。
14. Spark 中數據的位置是被誰管理的?
每個數據分片都對應具體物理位置,數據的位置是被blockManager管理,無論數據是在磁盤,內存還是 tacyan,都是由 blockManager 管理。
15. Spaek 程序執行,有時候默認為什么會產生很多 task,怎么修改默認 task 執行個數?
輸入數據有很多 task,尤其是有很多小文件的時候,有多少個輸入 block 就會有多少個 task 啟動;
spark 中有 partition 的概念,每個 partition 都會對應一個 task,task 越多,在處理大規模數據的時候,就會越有效率。不過 task 并不是越多越好,如果平時測試,或者數據量沒有那么大,則沒有必要 task 數量太多。
參數可以通過 spark_home/conf/spark-default.conf 配置文件設置:
針對 spark sql 的 task 數量:spark.sql.shuffle.partitions=50
非 spark sql 程序設置生效:spark.default.parallelism=10
16. 介紹一下 join 操作優化經驗?
這道題常考,這里只是給大家一個思路,簡單說下!面試之前還需做更多準備。
join 其實常見的就分為兩類: map-side join 和 reduce-side join。
當大表和小表 join 時,用 map-side join 能顯著提高效率。
將多份數據進行關聯是數據處理過程中非常普遍的用法,不過在分布式計算系統中,這個問題往往會變的非常麻煩,因為框架提供的 join 操作一般會將所有數據根據 key 發送到所有的 reduce 分區中去,也就是 shuffle 的過程。造成大量的網絡以及磁盤 IO 消耗,運行效率極其低下,這個過程一般被稱為 reduce-side-join。
如果其中有張表較小的話,我們則可以自己實現在 map 端實現數據關聯,跳過大量數據進行 shuffle 的過程,運行時間得到大量縮短,根據不同數據可能會有幾倍到數十倍的性能提升。
在大數據量的情況下,join 是一中非常昂貴的操作,需要在 join 之前應盡可能的先縮小數據量。
對于縮小數據量,有以下幾條建議:
若兩個 RDD 都有重復的 key,join 操作會使得數據量會急劇的擴大。所有,最好先使用 distinct 或者 combineByKey 操作來減少 key 空間或者用 cogroup 來處理重復的 key,而不是產生所有的交叉結果。在 combine 時,進行機智的分區,可以避免第二次 shuffle。
如果只在一個 RDD 出現,那你將在無意中丟失你的數據。所以使用外連接會更加安全,這樣你就能確保左邊的 RDD 或者右邊的 RDD 的數據完整性,在 join 之后再過濾數據。
如果我們容易得到 RDD 的可以的有用的子集合,那么我們可以先用 filter 或者 reduce,如何在再用 join。
17. Spark 與 MapReduce 的 Shuffle 的區別?
相同點:都是將 mapper(Spark 里是 ShuffleMapTask)的輸出進行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一個 stage 里的 ShuffleMapTask,也可能是 ResultTask)
不同點:
-
MapReduce 默認是排序的,spark 默認不排序,除非使用 sortByKey 算子。
-
MapReduce 可以劃分成 split,map()、spill、merge、shuffle、sort、reduce()等階段,spark 沒有明顯的階段劃分,只有不同的 stage 和算子操作。
-
MR 落盤,Spark 不落盤,spark 可以解決 mr 落盤導致效率低下的問題。
18. Spark SQL 執行的流程?
這個問題如果深挖還挺復雜的,這里簡單介紹下總體流程:
parser:基于 antlr 框架對 sql 解析,生成抽象語法樹。
變量替換:通過正則表達式找出符合規則的字符串,替換成系統緩存環境的變量
SQLConf 中的spark.sql.variable.substitute,默認是可用的;參考SparkSqlParser
parser:將 antlr 的 tree 轉成 spark catalyst 的 LogicPlan,也就是 未解析的邏輯計劃;詳細參考AstBuild, ParseDriver
analyzer:通過分析器,結合 catalog,把 logical plan 和實際的數據綁定起來,將 未解析的邏輯計劃 生成 邏輯計劃;詳細參考QureyExecution
緩存替換:通過 CacheManager,替換有相同結果的 logical plan(邏輯計劃)
logical plan 優化,基于規則的優化;優化規則參考 Optimizer,優化執行器 RuleExecutor
生成 spark plan,也就是物理計劃;參考QueryPlanner和SparkStrategies
spark plan 準備階段
構造 RDD 執行,涉及 spark 的 wholeStageCodegenExec 機制,基于 janino 框架生成 java 代碼并編譯
19. Spark SQL 是如何將數據寫到 Hive 表的?
-
方式一:是利用 Spark RDD 的 API 將數據寫入 hdfs 形成 hdfs 文件,之后再將 hdfs 文件和 hive 表做加載映射。
-
方式二:利用 Spark SQL 將獲取的數據 RDD 轉換成 DataFrame,再將 DataFrame 寫成緩存表,最后利用 Spark SQL 直接插入 hive 表中。而對于利用 Spark SQL 寫 hive 表官方有兩種常見的 API,第一種是利用 JavaBean 做映射,第二種是利用 StructType 創建 Schema 做映射。
20. 通常來說,Spark 與 MapReduce 相比,Spark 運行效率更高。請說明效率更高來源于 Spark 內置的哪些機制?
基于內存計算,減少低效的磁盤交互;
高效的調度算法,基于 DAG;
容錯機制 Linage。
重點部分就是 DAG 和 Lingae
21. Hadoop 和 Spark 的相同點和不同點?
Hadoop 底層使用 MapReduce 計算架構,只有 map 和 reduce 兩種操作,表達能力比較欠缺,而且在 MR 過程中會重復的讀寫 hdfs,造成大量的磁盤 io 讀寫操作,所以適合高時延環境下批處理計算的應用;
Spark 是基于內存的分布式計算架構,提供更加豐富的數據集操作類型,主要分成轉化操作和行動操作,包括 map、reduce、filter、flatmap、groupbykey、reducebykey、union 和 join 等,數據分析更加快速,所以適合低時延環境下計算的應用;
spark 與 hadoop 最大的區別在于迭代式計算模型?;?mapreduce 框架的 Hadoop 主要分為 map 和 reduce 兩個階段,兩個階段完了就結束了,所以在一個 job 里面能做的處理很有限;spark 計算模型是基于內存的迭代式計算模型,可以分為 n 個階段,根據用戶編寫的 RDD 算子和程序,在處理完一個階段后可以繼續往下處理很多個階段,而不只是兩個階段。所以 spark 相較于 mapreduce,計算模型更加靈活,可以提供更強大的功能。
但是 spark 也有劣勢,由于 spark 基于內存進行計算,雖然開發容易,但是真正面對大數據的時候,在沒有進行調優的情況下,可能會出現各種各樣的問題,比如 OOM 內存溢出等情況,導致 spark 程序可能無法運行起來,而 mapreduce 雖然運行緩慢,但是至少可以慢慢運行完。
22. Hadoop 和 Spark 使用場景?
Hadoop/MapReduce 和 Spark 最適合的都是做離線型的數據分析,但 Hadoop 特別適合是單次分析的數據量“很大”的情景,而 Spark 則適用于數據量不是很大的情景。
一般情況下,對于中小互聯網和企業級的大數據應用而言,單次分析的數量都不會“很大”,因此可以優先考慮使用 Spark。
業務通常認為 Spark 更適用于機器學習之類的“迭代式”應用,80GB 的壓縮數據(解壓后超過 200GB),10 個節點的集群規模,跑類似“sum+group-by”的應用,MapReduce 花了 5 分鐘,而 spark 只需要 2 分鐘。
23. Spark 如何保證宕機迅速恢復?
適當增加 spark standby master
編寫 shell 腳本,定期檢測 master 狀態,出現宕機后對 master 進行重啟操作
24. RDD 持久化原理?
spark 非常重要的一個功能特性就是可以將 RDD 持久化在內存中。
調用 cache()和 persist()方法即可。cache()和 persist()的區別在于,cache()是 persist()的一種簡化方式,cache()的底層就是調用 persist()的無參版本 persist(MEMORY_ONLY),將數據持久化到內存中。
如果需要從內存中清除緩存,可以使用 unpersist()方法。RDD 持久化是可以手動選擇不同的策略的。在調用 persist()時傳入對應的 StorageLevel 即可。
25. Checkpoint 檢查點機制?
應用場景:當 spark 應用程序特別復雜,從初始的 RDD 開始到最后整個應用程序完成有很多的步驟,而且整個應用運行時間特別長,這種情況下就比較適合使用 checkpoint 功能。
原因:對于特別復雜的 Spark 應用,會出現某個反復使用的 RDD,即使之前持久化過但由于節點的故障導致數據丟失了,沒有容錯機制,所以需要重新計算一次數據。
Checkpoint 首先會調用 SparkContext 的 setCheckPointDIR()方法,設置一個容錯的文件系統的目錄,比如說 HDFS;然后對 RDD 調用 checkpoint()方法。之后在 RDD 所處的 job 運行結束之后,會啟動一個單獨的 job,來將 checkpoint 過的 RDD 數據寫入之前設置的文件系統,進行高可用、容錯的類持久化操作。
檢查點機制是我們在 spark streaming 中用來保障容錯性的主要機制,它可以使 spark streaming 階段性的把應用數據存儲到諸如 HDFS 等可靠存儲系統中,以供恢復時使用。具體來說基于以下兩個目的服務:
控制發生失敗時需要重算的狀態數。Spark streaming 可以通過轉化圖的譜系圖來重算狀態,檢查點機制則可以控制需要在轉化圖中回溯多遠。
提供驅動器程序容錯。如果流計算應用中的驅動器程序崩潰了,你可以重啟驅動器程序并讓驅動器程序從檢查點恢復,這樣 spark streaming 就可以讀取之前運行的程序處理數據的進度,并從那里繼續。
26. Checkpoint 和持久化機制的區別?
最主要的區別在于持久化只是將數據保存在 BlockManager 中,但是 RDD 的 lineage(血緣關系,依賴關系)是不變的。但是 checkpoint 執行完之后,rdd 已經沒有之前所謂的依賴 rdd 了,而只有一個強行為其設置的 checkpointRDD,checkpoint 之后 rdd 的 lineage 就改變了。
持久化的數據丟失的可能性更大,因為節點的故障會導致磁盤、內存的數據丟失。但是 checkpoint 的數據通常是保存在高可用的文件系統中,比如 HDFS 中,所以數據丟失可能性比較低
27. Spark Streaming 以及基本工作原理?
Spark streaming 是 spark core API 的一種擴展,可以用于進行大規模、高吞吐量、容錯的實時數據流的處理。
它支持從多種數據源讀取數據,比如 Kafka、Flume、Twitter 和 TCP Socket,并且能夠使用算子比如 map、reduce、join 和 window 等來處理數據,處理后的數據可以保存到文件系統、數據庫等存儲中。
Spark streaming 內部的基本工作原理是:接受實時輸入數據流,然后將數據拆分成 batch,比如每收集一秒的數據封裝成一個 batch,然后將每個 batch 交給 spark 的計算引擎進行處理,最后會生產處一個結果數據流,其中的數據也是一個一個的 batch 組成的。
28. DStream 以及基本工作原理?
DStream 是 spark streaming 提供的一種高級抽象,代表了一個持續不斷的數據流。
DStream 可以通過輸入數據源來創建,比如 Kafka、flume 等,也可以通過其他 DStream 的高階函數來創建,比如 map、reduce、join 和 window 等。
DStream 內部其實不斷產生 RDD,每個 RDD 包含了一個時間段的數據。
Spark streaming 一定是有一個輸入的 DStream 接收數據,按照時間劃分成一個一個的 batch,并轉化為一個 RDD,RDD 的數據是分散在各個子節點的 partition 中。
29. Spark Streaming 整合 Kafka 的兩種模式?
receiver 方式:將數據拉取到 executor 中做操作,若數據量大,內存存儲不下,可以通過 WAL,設置了本地存儲,保證數據不丟失,然后使用 Kafka 高級 API 通過 zk 來維護偏移量,保證消費數據。receiver 消費的數據偏移量是在 zk 獲取的,此方式效率低,容易出現數據丟失。
-
receiver 方式的容錯性:在默認的配置下,這種方式可能會因為底層的失敗而丟失數據。如果要啟用高可靠機制,讓數據零丟失,就必須啟用 Spark Streaming 的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的 Kafka 數據寫入分布式文件系統(比如 HDFS)上的預寫日志中。所以,即使底層節點出現了失敗,也可以使用預寫日志中的數據進行恢復。
-
Kafka 中的 topic 的 partition,與 Spark 中的 RDD 的 partition 是沒有關系的。在 1、KafkaUtils.createStream()中,提高 partition 的數量,只會增加 Receiver 方式中讀取 partition 的線程的數量。不會增加 Spark 處理數據的并行度。 可以創建多個 Kafka 輸入 DStream,使用不同的 consumer group 和 topic,來通過多個 receiver 并行接收數據。
基于 Direct 方式:使用 Kafka 底層 Api,其消費者直接連接 kafka 的分區上,因為 createDirectStream 創建的 DirectKafkaInputDStream 每個 batch 所對應的 RDD 的分區與 kafka 分區一一對應,但是需要自己維護偏移量,即用即取,不會給內存造成太大的壓力,效率高。
-
優點:簡化并行讀取:如果要讀取多個 partition,不需要創建多個輸入 DStream 然后對它們進行 union 操作。Spark 會創建跟 Kafka partition 一樣多的 RDD partition,并且會并行從 Kafka 中讀取數據。所以在 Kafka partition 和 RDD partition 之間,有一個一對一的映射關系。
-
高性能:如果要保證零數據丟失,在基于 receiver 的方式中,需要開啟 WAL 機制。這種方式其實效率低下,因為數據實際上被復制了兩份,Kafka 自己本身就有高可靠的機制,會對數據復制一份,而這里又會復制一份到 WAL 中。而基于 direct 的方式,不依賴 Receiver,不需要開啟 WAL 機制,只要 Kafka 中作了數據的復制,那么就可以通過 Kafka 的副本進行恢復。
receiver 與和 direct 的比較:
-
基于 receiver 的方式,是使用 Kafka 的高階 API 來在 ZooKeeper 中保存消費過的 offset 的。這是消費 Kafka 數據的傳統方式。這種方式配合著 WAL 機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為 Spark 和 ZooKeeper 之間可能是不同步的。
-
基于 direct 的方式,使用 Kafka 的低階 API,Spark Streaming 自己就負責追蹤消費的 offset,并保存在 checkpoint 中。Spark 自己一定是同步的,因此可以保證數據是消費一次且僅消費一次。
-
Receiver 方式是通過 zookeeper 來連接 kafka 隊列,Direct 方式是直接連接到 kafka 的節點上獲取數據。
30. Spark 主備切換機制原理知道嗎?
Master 實際上可以配置兩個,Spark 原生的 standalone 模式是支持 Master 主備切換的。當 Active Master 節點掛掉以后,我們可以將 Standby Master 切換為 Active Master。
Spark Master 主備切換可以基于兩種機制,一種是基于文件系統的,一種是基于 ZooKeeper 的。
基于文件系統的主備切換機制,需要在 Active Master 掛掉之后手動切換到 Standby Master 上;
而基于 Zookeeper 的主備切換機制,可以實現自動切換 Master。
31. Spark 解決了 Hadoop 的哪些問題?
MR:抽象層次低,需要使用手工代碼來完成程序編寫,使用上難以上手;
Spark:Spark 采用 RDD 計算模型,簡單容易上手。
MR:只提供 map 和 reduce 兩個操作,表達能力欠缺;
Spark:Spark 采用更加豐富的算子模型,包括 map、flatmap、groupbykey、reducebykey 等;
MR:一個 job 只能包含 map 和 reduce 兩個階段,復雜的任務需要包含很多個 job,這些 job 之間的管理以來需要開發者自己進行管理;
Spark:Spark 中一個 job 可以包含多個轉換操作,在調度時可以生成多個 stage,而且如果多個 map 操作的分區不變,是可以放在同一個 task 里面去執行;
MR:中間結果存放在 hdfs 中;
Spark:Spark 的中間結果一般存在內存中,只有當內存不夠了,才會存入本地磁盤,而不是 hdfs;
MR:只有等到所有的 map task 執行完畢后才能執行 reduce task;
Spark:Spark 中分區相同的轉換構成流水線在一個 task 中執行,分區不同的需要進行 shuffle 操作,被劃分成不同的 stage 需要等待前面的 stage 執行完才能執行。
MR:只適合 batch 批處理,時延高,對于交互式處理和實時處理支持不夠;
Spark:Spark streaming 可以將流拆成時間間隔的 batch 進行處理,實時計算。
32. 數據傾斜的產生和解決辦法?
數據傾斜以為著某一個或者某幾個 partition 的數據特別大,導致這幾個 partition 上的計算需要耗費相當長的時間。
在 spark 中同一個應用程序劃分成多個 stage,這些 stage 之間是串行執行的,而一個 stage 里面的多個 task 是可以并行執行,task 數目由 partition 數目決定,如果一個 partition 的數目特別大,那么導致這個 task 執行時間很長,導致接下來的 stage 無法執行,從而導致整個 job 執行變慢。
避免數據傾斜,一般是要選用合適的 key,或者自己定義相關的 partitioner,通過加鹽或者哈希值來拆分這些 key,從而將這些數據分散到不同的 partition 去執行。
如下算子會導致 shuffle 操作,是導致數據傾斜可能發生的關鍵點所在:groupByKey;reduceByKey;aggregaByKey;join;cogroup;
33. 你用 Spark Sql 處理的時候, 處理過程中用的 DataFrame 還是直接寫的 Sql?為什么?
這個問題的宗旨是問你 spark sql 中 dataframe 和 sql 的區別,從執行原理、操作方便程度和自定義程度來分析 這個問題。
34. Spark Master HA 主從切換過程不會影響到集群已有作業的運行,為什么?
不會的。
因為程序在運行之前,已經申請過資源了,driver 和 Executors 通訊,不需要和 master 進行通訊的。
35. Spark Master 使用 Zookeeper 進行 HA,有哪些源數據保存到 Zookeeper 里面?
spark 通過這個參數 spark.deploy.zookeeper.dir 指定 master 元數據在 zookeeper 中保存的位置,包括 Worker,Driver 和 Application 以及 Executors。standby 節點要從 zk 中,獲得元數據信息,恢復集群運行狀態,才能對外繼續提供服務,作業提交資源申請等,在恢復前是不能接受請求的。
注:Master 切換需要注意 2 點:
1、在 Master 切換的過程中,所有的已經在運行的程序皆正常運行! 因為 Spark Application 在運行前就已經通過 Cluster Manager 獲得了計算資源,所以在運行時 Job 本身的 調度和處理和 Master 是沒有任何關系。
2、在 Master 的切換過程中唯一的影響是不能提交新的 Job:一方面不能夠提交新的應用程序給集群, 因為只有 Active Master 才能接受新的程序的提交請求;另外一方面,已經運行的程序中也不能夠因 Action 操作觸發新的 Job 的提交請求。
36. 如何實現Spark Streaming讀取Flume中的數據?
可以這樣說:
-
前期經過技術調研,查看官網相關資料,發現sparkStreaming整合flume有2種模式,一種是拉模式,一種是推模式,然后在簡單的聊聊這2種模式的特點,以及如何部署實現,需要做哪些事情,最后對比兩種模式的特點,選擇那種模式更好。
-
推模式:Flume將數據Push推給Spark Streaming
-
拉模式:Spark Streaming從flume 中Poll拉取數據
37. 在實際開發的時候是如何保證數據不丟失的?
可以這樣說:
-
flume那邊采用的channel是將數據落地到磁盤中,保證數據源端安全性(可以在補充一下,flume在這里的channel可以設置為memory內存中,提高數據接收處理的效率,但是由于數據在內存中,安全機制保證不了,故選擇channel為磁盤存儲。整個流程運行有一點的延遲性)
-
sparkStreaming通過拉模式整合的時候,使用了FlumeUtils這樣一個類,該類是需要依賴一個額外的jar包(spark-streaming-flume_2.10)
-
要想保證數據不丟失,數據的準確性,可以在構建StreamingConext的時候,利用StreamingContext.getOrCreate(checkpoint, creatingFunc: () => StreamingContext)來創建一個StreamingContext,使用StreamingContext.getOrCreate來創建StreamingContext對象,傳入的第一個參數是checkpoint的存放目錄,第二參數是生成StreamingContext對象的用戶自定義函數。如果checkpoint的存放目錄存在,則從這個目錄中生成StreamingContext對象;如果不存在,才會調用第二個函數來生成新的StreamingContext對象。在creatingFunc函數中,除了生成一個新的StreamingContext操作,還需要完成各種操作,然后調用ssc.checkpoint(checkpointDirectory)來初始化checkpoint功能,最后再返回StreamingContext對象。
這樣,在StreamingContext.getOrCreate之后,就可以直接調用start()函數來啟動(或者是從中斷點繼續運行)流式應用了。如果有其他在啟動或繼續運行都要做的工作,可以在start()調用前執行。
38. RDD有哪些缺陷?
不支持細粒度的寫和更新操作,Spark寫數據是粗粒度的,所謂粗粒度,就是批量寫入數據,目的是為了提高效率。但是Spark讀數據是細粒度的,也就是說可以一條條的讀。
不支持增量迭代計算,如果對Flink熟悉,可以說下Flink支持增量迭代計算。
Kafka
1. 為什么要使用 kafka?
緩沖和削峰:上游數據時有突發流量,下游可能扛不住,或者下游沒有足夠多的機器來保證冗余,kafka在中間可以起到一個緩沖的作用,把消息暫存在kafka中,下游服務就可以按照自己的節奏進行慢慢處理。
解耦和擴展性:項目開始的時候,并不能確定具體需求。消息隊列可以作為一個接口層,解耦重要的業務流程。只需要遵守約定,針對數據編程即可獲取擴展能力。
冗余:可以采用一對多的方式,一個生產者發布消息,可以被多個訂閱topic的服務消費到,供多個毫無關聯的業務使用。
健壯性:消息隊列可以堆積請求,所以消費端業務即使短時間死掉,也不會影響主要業務的正常進行。
異步通信:很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
2. Kafka消費過的消息如何再消費?
kafka消費消息的offset是定義在zookeeper中的, 如果想重復消費kafka的消息,可以在redis中自己記錄offset的checkpoint點(n個),當想重復消費消息時,通過讀取redis中的checkpoint點進行zookeeper的offset重設,這樣就可以達到重復消費消息的目的了
3. kafka的數據是放在磁盤上還是內存上,為什么速度會快?
kafka使用的是磁盤存儲。
速度快是因為:
順序寫入:因為硬盤是機械結構,每次讀寫都會尋址->寫入,其中尋址是一個“機械動作”,它是耗時的。所以硬盤 “討厭”隨機I/O, 喜歡順序I/O。為了提高讀寫硬盤的速度,Kafka就是使用順序I/O。
Memory Mapped Files(內存映射文件):64位操作系統中一般可以表示20G的數據文件,它的工作原理是直接利用操作系統的Page來實現文件到物理內存的直接映射。完成映射之后你對物理內存的操作會被同步到硬盤上。
Kafka高效文件存儲設計: Kafka把topic中一個parition大文件分成多個小文件段,通過多個小文件段,就容易定期清除或刪除已經消費完文件,減少磁盤占用。通過索引信息可以快速定位 message和確定response的 大 小。通過index元數據全部映射到memory(內存映射文件), 可以避免segment file的IO磁盤操作。通過索引文件稀疏存儲,可以大幅降低index文件元數據占用空間大小。
注:
Kafka解決查詢效率的手段之一是將數據文件分段,比如有100條Message,它們的offset是從0到99。假設將數據文件分成5段,第一段為0-19,第二段為20-39,以此類推,每段放在一個單獨的數據文件里面,數據文件以該段中 小的offset命名。這樣在查找指定offset的 Message的時候,用二分查找就可以定位到該Message在哪個段中。
為數據文件建 索引數據文件分段 使得可以在一個較小的數據文件中查找對應offset的Message 了,但是這依然需要順序掃描才能找到對應offset的Message。 為了進一步提高查找的效率,Kafka為每個分段后的數據文件建立了索引文件,文件名與數據文件的名字是一樣的,只是文件擴展名為.index。
4. Kafka數據怎么保障不丟失?
分三個點說,一個是生產者端,一個消費者端,一個broker端。
生產者數據的不丟失
kafka的ack機制:在kafka發送數據的時候,每次發送消息都會有一個確認反饋機制,確保消息正常的能夠被收到,其中狀態有0,1,-1。
如果是同步模式:
ack設置為0,風險很大,一般不建議設置為0。即使設置為1,也會隨著leader宕機丟失數據。所以如果要嚴格保證生產端數據不丟失,可設置為-1。
如果是異步模式:
也會考慮ack的狀態,除此之外,異步模式下的有個buffer,通過buffer來進行控制數據的發送,有兩個值來進行控制,時間閾值與消息的數量閾值,如果buffer滿了數據還沒有發送出去,有個選項是配置是否立即清空buffer??梢栽O置為-1,永久阻塞,也就數據不再生產。異步模式下,即使設置為-1。也可能因為程序員的不科學操作,操作數據丟失,比如kill -9,但這是特別的例外情況。
注:
ack=0:producer不等待broker同步完成的確認,繼續發送下一條(批)信息。
ack=1(默認):producer要等待leader成功收到數據并得到確認,才發送下一條message。
ack=-1:producer得到follwer確認,才發送下一條數據。
消費者數據的不丟失
通過offset commit 來保證數據的不丟失,kafka自己記錄了每次消費的offset數值,下次繼續消費的時候,會接著上次的offset進行消費。
而offset的信息在kafka0.8版本之前保存在zookeeper中,在0.8版本之后保存到topic中,即使消費者在運行過程中掛掉了,再次啟動的時候會找到offset的值,找到之前消費消息的位置,接著消費,由于 offset 的信息寫入的時候并不是每條消息消費完成后都寫入的,所以這種情況有可能會造成重復消費,但是不會丟失消息。
唯一例外的情況是,我們在程序中給原本做不同功能的兩個consumer組設置 KafkaSpoutConfig.bulider.setGroupid的時候設置成了一樣的groupid,這種情況會導致這兩個組共享同一份數據,就會產生組A消費partition1,partition2中的消息,組B消費partition3的消息,這樣每個組消費的消息都會丟失,都是不完整的。 為了保證每個組都獨享一份消息數據,groupid一定不要重復才行。
kafka集群中的broker的數據不丟失
每個broker中的partition我們一般都會設置有replication(副本)的個數,生產者寫入的時候首先根據分發策略(有partition按partition,有key按key,都沒有輪詢)寫入到leader中,follower(副本)再跟leader同步數據,這樣有了備份,也可以保證消息數據的不丟失。
5. 采集數據為什么選擇kafka?
采集層 主要可以使用Flume, Kafka等技術。
Flume:Flume 是管道流方式,提供了很多的默認實現,讓用戶通過參數部署,及擴展API.
Kafka:Kafka是一個可持久化的分布式的消息隊列。 Kafka 是一個非常通用的系統。你可以有許多生產者和很多的消費者共享多個主題Topics。
相比之下,Flume是一個專用工具被設計為旨在往HDFS,HBase發送數據。它對HDFS有特殊的優化,并且集成了Hadoop的安全特性。
所以,Cloudera 建議如果數據被多個系統消費的話,使用kafka;如果數據被設計給Hadoop使用,使用Flume。
6. kafka 重啟是否會導致數據丟失?
kafka是將數據寫到磁盤的,一般數據不會丟失。
但是在重啟kafka過程中,如果有消費者消費消息,那么kafka如果來不及提交offset,可能會造成數據的不準確(丟失或者重復消費)。
7. kafka 宕機了如何解決?
先考慮業務是否受到影響
kafka 宕機了,首先我們考慮的問題應該是所提供的服務是否因為宕機的機器而受到影響,如果服務提供沒問題,如果實現做好了集群的容災機制,那么這塊就不用擔心了。
節點排錯與恢復
想要恢復集群的節點,主要的步驟就是通過日志分析來查看節點宕機的原因,從而解決,重新恢復節點。
8. 為什么Kafka不支持讀寫分離?
在 Kafka 中,生產者寫入消息、消費者讀取消息的操作都是與 leader 副本進行交互的,從 而實現的是一種主寫主讀的生產消費模型。 Kafka 并不支持主寫從讀,因為主寫從讀有 2 個很明顯的缺點:
數據一致性問題:數據從主節點轉到從節點必然會有一個延時的時間窗口,這個時間 窗口會導致主從節點之間的數據不一致。某一時刻,在主節點和從節點中 A 數據的值都為 X, 之后將主節點中 A 的值修改為 Y,那么在這個變更通知到從節點之前,應用讀取從節點中的 A 數據的值并不為最新的 Y,由此便產生了數據不一致的問題。
延時問題:類似 Redis 這種組件,數據從寫入主節點到同步至從節點中的過程需要經歷 網絡→主節點內存→網絡→從節點內存 這幾個階段,整個過程會耗費一定的時間。而在 Kafka 中,主從同步會比 Redis 更加耗時,它需要經歷 網絡→主節點內存→主節點磁盤→網絡→從節 點內存→從節點磁盤 這幾個階段。對延時敏感的應用而言,主寫從讀的功能并不太適用。
而kafka的主寫主讀的優點就很多了:
可以簡化代碼的實現邏輯,減少出錯的可能;
將負載粒度細化均攤,與主寫從讀相比,不僅負載效能更好,而且對用戶可控;
沒有延時的影響;
在副本穩定的情況下,不會出現數據不一致的情況。
9. kafka數據分區和消費者的關系?
每個分區只能由同一個消費組內的一個消費者(consumer)來消費,可以由不同的消費組的消費者來消費,同組的消費者則起到并發的效果。
10. kafka的數據offset讀取流程
連接ZK集群,從ZK中拿到對應topic的partition信息和partition的Leader的相關信息
連接到對應Leader對應的broker
consumer將?自?己保存的offset發送給Leader
Leader根據offset等信息定位到segment(索引?文件和?日志?文件)
根據索引?文件中的內容,定位到?日志?文件中該偏移量量對應的開始位置讀取相應?長度的數據并返回給consumer
11. kafka內部如何保證順序,結合外部組件如何保證消費者的順序?
kafka只能保證partition內是有序的,但是partition間的有序是沒辦法的。愛奇藝的搜索架構,是從業務上把需要有序的打到同?個partition。
12. Kafka消息數據積壓,Kafka消費能力不足怎么處理?
如果是Kafka消費能力不足,則可以考慮增加Topic的分區數,并且同時提升消費組的消費者數量,消費者數=分區數。(兩者缺一不可)
如果是下游的數據處理不及時:提高每批次拉取的數量。批次拉取數據過少(拉取數據/處理時間<生產速度),使處理的數據小于生產的數據,也會造成數據積壓。
13. Kafka單條日志傳輸大小
kafka對于消息體的大小默認為單條最大值是1M但是在我們應用場景中, 常常會出現一條消息大于1M,如果不對kafka進行配置。則會出現生產者無法將消息推送到kafka或消費者無法去消費kafka里面的數據, 這時我們就要對kafka進行以下配置:server.properties
replica.fetch.max.bytes:?1048576??broker可復制的消息的最大字節數,?默認為1M message.max.bytes:?1000012???kafka?會接收單個消息size的最大限制,?默認為1M左右注意:message.max.bytes必須小于等于replica.fetch.max.bytes,否則就會導致replica之間數據同步失敗。
Hbase
1. Hbase是怎么寫數據的?
Client寫入 -> 存入MemStore,一直到MemStore滿 -> Flush成一個StoreFile,直至增長到一定閾值 -> 觸發Compact合并操作 -> 多個StoreFile合并成一個StoreFile,同時進行版本合并和數據刪除 -> 當StoreFiles Compact后,逐步形成越來越大的StoreFile -> 單個StoreFile大小超過一定閾值后(默認10G),觸發Split操作,把當前Region Split成2個Region,Region會下線,新Split出的2個孩子Region會被HMaster分配到相應的HRegionServer 上,使得原先1個Region的壓力得以分流到2個Region上
由此過程可知,HBase只是增加數據,沒有更新和刪除操作,用戶的更新和刪除都是邏輯層面的,在物理層面,更新只是追加操作,刪除只是標記操作。
用戶寫操作只需要進入到內存即可立即返回,從而保證I/O高性能。
2. HDFS和HBase各自使用場景
首先一點需要明白:Hbase是基于HDFS來存儲的。
HDFS:
一次性寫入,多次讀取。
保證數據的一致性。
主要是可以部署在許多廉價機器中,通過多副本提高可靠性,提供了容錯和恢復機制。
HBase:
瞬間寫入量很大,數據庫不好支撐或需要很高成本支撐的場景。
數據需要長久保存,且量會持久增長到比較大的場景。
HBase不適用與有 join,多級索引,表關系復雜的數據模型。
大數據量(100s TB級數據)且有快速隨機訪問的需求。如:淘寶的交易歷史記錄。數據量巨大無容置疑,面向普通用戶的請求必然要即時響應。
業務場景簡單,不需要關系數據庫中很多特性(例如交叉列、交叉表,事務,連接等等)。
3. Hbase的存儲結構
Hbase 中的每張表都通過行鍵(rowkey)按照一定的范圍被分割成多個子表(HRegion),默認一個HRegion 超過256M 就要被分割成兩個,由HRegionServer管理,管理哪些 HRegion 由 Hmaster 分配。 HRegion 存取一個子表時,會創建一個 HRegion 對象,然后對表的每個列族(Column Family)創建一個 store 實例, 每個 store 都會有 0 個或多個 StoreFile 與之對應,每個 StoreFile 都會對應一個HFile,HFile 就是實際的存儲文件,一個 HRegion 還擁有一個 MemStore實例。
4. 熱點現象(數據傾斜)怎么產生的,以及解決方法有哪些
熱點現象:
某個小的時段內,對HBase的讀寫請求集中到極少數的Region上,導致這些region所在的RegionServer處理請求量驟增,負載量明顯偏大,而其他的RgionServer明顯空閑。
熱點現象出現的原因:
HBase中的行是按照rowkey的字典順序排序的,這種設計優化了scan操作,可以將相關的行以及會被一起讀取的行存取在臨近位置,便于scan。然而糟糕的rowkey設計是熱點的源頭。
熱點發生在大量的client直接訪問集群的一個或極少數個節點(訪問可能是讀,寫或者其他操作)。大量訪問會使熱點region所在的單個機器超出自身承受能力,引起性能下降甚至region不可用,這也會影響同一個RegionServer上的其他region,由于主機無法服務其他region的請求。
熱點現象解決辦法:
為了避免寫熱點,設計rowkey使得不同行在同一個region,但是在更多數據情況下,數據應該被寫入集群的多個region,而不是一個。常見的方法有以下這些:
加鹽:在rowkey的前面增加隨機數,使得它和之前的rowkey的開頭不同。分配的前綴種類數量應該和你想使用數據分散到不同的region的數量一致。加鹽之后的rowkey就會根據隨機生成的前綴分散到各個region上,以避免熱點。
哈希:哈??梢允关撦d分散到整個集群,但是讀卻是可以預測的。使用確定的哈希可以讓客戶端重構完整的rowkey,可以使用get操作準確獲取某一個行數據
反轉:第三種防止熱點的方法時反轉固定長度或者數字格式的rowkey。這樣可以使得rowkey中經常改變的部分(最沒有意義的部分)放在前面。這樣可以有效的隨機rowkey,但是犧牲了rowkey的有序性。反轉rowkey的例子以手機號為rowkey,可以將手機號反轉后的字符串作為rowkey,這樣的就避免了以手機號那樣比較固定開頭導致熱點問題
時間戳反轉:一個常見的數據處理問題是快速獲取數據的最近版本,使用反轉的時間戳作為rowkey的一部分對這個問題十分有用,可以用 Long.Max_Value - timestamp 追加到key的末尾,例如[key][reverse_timestamp],[key]的最新值可以通過scan [key]獲得[key]的第一條記錄,因為HBase中rowkey是有序的,第一條記錄是最后錄入的數據。
-
比如需要保存一個用戶的操作記錄,按照操作時間倒序排序,在設計rowkey的時候,可以這樣設計[userId反轉] [Long.Max_Value - timestamp],在查詢用戶的所有操作記錄數據的時候,直接指定反轉后的userId,startRow是[userId反轉][000000000000],stopRow是[userId反轉][Long.Max_Value - timestamp]
-
如果需要查詢某段時間的操作記錄,startRow是[user反轉][Long.Max_Value - 起始時間],stopRow是[userId反轉][Long.Max_Value - 結束時間]
HBase建表預分區:創建HBase表時,就預先根據可能的RowKey劃分出多個region而不是默認的一個,從而可以將后續的讀寫操作負載均衡到不同的region上,避免熱點現象。
5. HBase的 rowkey 設計原則
長度原則:100字節以內,8的倍數最好,可能的情況下越短越好。因為HFile是按照 keyvalue 存儲的,過長的rowkey會影響存儲效率;其次,過長的rowkey在memstore中較大,影響緩沖效果,降低檢索效率。最后,操作系統大多為64位,8的倍數,充分利用操作系統的最佳性能。
散列原則:高位散列,低位時間字段。避免熱點問題。
唯一原則:分利用這個排序的特點,將經常讀取的數據存儲到一塊,將最近可能會被訪問 的數據放到一塊。
6. HBase的列簇設計
原則:在合理范圍內能盡量少的減少列簇就盡量減少列簇,因為列簇是共享region的,每個列簇數據相差太大導致查詢效率低下。
最優:將所有相關性很強的 key-value 都放在同一個列簇下,這樣既能做到查詢效率最高,也能保持盡可能少的訪問不同的磁盤文件。以用戶信息為例,可以將必須的基本信息存放在一個列族,而一些附加的額外信息可以放在另一列族。
7. HBase 中 compact 用途是什么,什么時候觸發,分為哪兩種,有什么區別
在 hbase 中每當有 memstore 數據 flush 到磁盤之后,就形成一個 storefile,當 storeFile的數量達到一定程度后,就需要將 storefile 文件來進行 compaction 操作。
Compact 的作用:
合并文件
清除過期,多余版本的數據
提高讀寫數據的效率 4 HBase 中實現了兩種 compaction 的方式:minor and major. 這兩種 compaction 方式的 區別是:
Minor 操作只用來做部分文件的合并操作以及包括 minVersion=0 并且設置 ttl 的過 期版本清理,不做任何刪除數據、多版本數據的清理工作。
Major 操作是對 Region 下的 HStore 下的所有 StoreFile 執行合并操作,最終的結果 是整理合并出一個文件。
Flink
1. 簡單介紹一下Flink
Flink是一個面向流處理和批處理的分布式數據計算引擎,能夠基于同一個Flink運行,可以提供流處理和批處理兩種類型的功能。 在 Flink 的世界觀中,一切都是由流組成的,離線數據是有界的流;實時數據是一個沒有界限的流:這就是所謂的有界流和無界流。
2. Flink的運行必須依賴Hadoop組件嗎
Flink可以完全獨立于Hadoop,在不依賴Hadoop組件下運行。但是做為大數據的基礎設施,Hadoop體系是任何大數據框架都繞不過去的。Flink可以集成眾多Hadooop 組件,例如Yarn、Hbase、HDFS等等。例如,Flink可以和Yarn集成做資源調度,也可以讀寫HDFS,或者利用HDFS做檢查點。
3. Flink集群運行時角色
Flink 運行時由兩種類型的進程組成:一個 JobManager 和一個或者多個 TaskManager。
Client 不是運行時和程序執行的一部分,而是用于準備數據流并將其發送給 JobManager。之后,客戶端可以斷開連接(分離模式),或保持連接來接收進程報告(附加模式)??蛻舳丝梢宰鳛橛|發執行 Java/Scala 程序的一部分運行,也可以在命令行進程 ./bin/flink run ... 中運行。
可以通過多種方式啟動 JobManager 和 TaskManager:直接在機器上作為 standalone 集群啟動、在容器中啟動、或者通過YARN等資源框架管理并啟動。TaskManager 連接到 JobManagers,宣布自己可用,并被分配工作。
JobManager:
JobManager 具有許多與協調 Flink 應用程序的分布式執行有關的職責:它決定何時調度下一個 task(或一組 task)、對完成的 task 或執行失敗做出反應、協調 checkpoint、并且協調從失敗中恢復等等。這個進程由三個不同的組件組成:
-
ResourceManager
ResourceManager 負責 Flink 集群中的資源提供、回收、分配,管理 task slots。
-
Dispatcher
Dispatcher 提供了一個 REST 接口,用來提交 Flink 應用程序執行,并為每個提交的作業啟動一個新的 JobMaster。它還運行 Flink WebUI 用來提供作業執行信息。
-
JobMaster
JobMaster 負責管理單個JobGraph的執行。Flink 集群中可以同時運行多個作業,每個作業都有自己的 JobMaster。
TaskManagers:
TaskManager(也稱為 worker)執行作業流的 task,并且緩存和交換數據流。
必須始終至少有一個 TaskManager。在 TaskManager 中資源調度的最小單位是 task slot。TaskManager 中 task slot 的數量表示并發處理 task 的數量。請注意一個 task slot 中可以執行多個算子。
4. Flink相比Spark Streaming有什么區別
1. 架構模型
Spark Streaming 在運行時的主要角色包括:Master、Worker、Driver、Executor,Flink 在運行時主要包含:Jobmanager、Taskmanager 和 Slot。
2. 任務調度
Spark Streaming 連續不斷的生成微小的數據批次,構建有向無環圖 DAG,Spark Streaming 會依次創建 DStreamGraph、JobGenerator、JobScheduler。
Flink 根據用戶提交的代碼生成 StreamGraph,經過優化生成 JobGraph,然后提交給 JobManager 進行處理,JobManager 會根據 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 調度最核心的數據結構,JobManager 根據 ExecutionGraph 對 Job 進行調度。
3. 時間機制
Spark Streaming 支持的時間機制有限,只支持處理時間。Flink 支持了流處理程序在時間上的三個定義:處理時間、事件時間、注入時間。同時也支持 watermark 機制來處理滯后數據。
4. 容錯機制
對于 Spark Streaming 任務,我們可以設置 checkpoint,然后假如發生故障并重啟,我們可以從上次 checkpoint 之處恢復,但是這個行為只能使得數據不丟失,可能會重復處理,不能做到恰一次處理語義。
Flink 則使用兩階段提交協議來解決這個問題。
5. 介紹下Flink的容錯機制(checkpoint)
Checkpoint機制是Flink可靠性的基石,可以保證Flink集群在某個算子因為某些原因(如 異常退出)出現故障時,能夠將整個應用流圖的狀態恢復到故障之前的某一狀態,保證應用流圖狀態的一致性。Flink的Checkpoint機制原理來自“Chandy-Lamport algorithm”算法。
每個需要Checkpoint的應用在啟動時,Flink的JobManager為其創建一個 CheckpointCoordinator(檢查點協調器),CheckpointCoordinator全權負責本應用的快照制作。
CheckpointCoordinator(檢查點協調器),CheckpointCoordinator全權負責本應用的快照制作。
CheckpointCoordinator(檢查點協調器) 周期性的向該流應用的所有source算子發送 barrier(屏障)。
當某個source算子收到一個barrier時,便暫停數據處理過程,然后將自己的當前狀態制作成快照,并保存到指定的持久化存儲中,最后向CheckpointCoordinator報告自己快照制作情況,同時向自身所有下游算子廣播該barrier,恢復數據處理
下游算子收到barrier之后,會暫停自己的數據處理過程,然后將自身的相關狀態制作成快照,并保存到指定的持久化存儲中,最后向CheckpointCoordinator報告自身快照情況,同時向自身所有下游算子廣播該barrier,恢復數據處理。
每個算子按照步驟3不斷制作快照并向下游廣播,直到最后barrier傳遞到sink算子,快照制作完成。
當CheckpointCoordinator收到所有算子的報告之后,認為該周期的快照制作成功; 否則,如果在規定的時間內沒有收到所有算子的報告,則認為本周期快照制作失敗。
文章推薦:
Flink可靠性的基石-checkpoint機制詳細解析
6. Flink checkpoint與Spark Streaming的有什么區別或優勢嗎
spark streaming 的 checkpoint 僅僅是針對 driver 的故障恢復做了數據和元數據的 checkpoint。而 flink 的 checkpoint 機制 要復雜了很多,它采用的是輕量級的分布式快照,實現了每個算子的快照,及流動中的數據的快照。
7. Flink是如何保證Exactly-once語義的
Flink通過實現兩階段提交和狀態保存來實現端到端的一致性語義。分為以下幾個步驟:
開始事務(beginTransaction)創建一個臨時文件夾,來寫把數據寫入到這個文件夾里面
預提交(preCommit)將內存中緩存的數據寫入文件并關閉
正式提交(commit)將之前寫完的臨時文件放入目標目錄下。這代表著最終的數據會有一些延遲
丟棄(abort)丟棄臨時文件
若失敗發生在預提交成功后,正式提交前??梢愿鶕顟B來提交預提交的數據,也可刪除預提交的數據。
兩階段提交協議詳解:八張圖搞懂Flink的Exactly-once
8. 如果下級存儲不支持事務,Flink怎么保證exactly-once
端到端的exactly-once對sink要求比較高,具體實現主要有冪等寫入和事務性寫入兩種方式。
冪等寫入的場景依賴于業務邏輯,更常見的是用事務性寫入。而事務性寫入又有預寫日志(WAL)和兩階段提交(2PC)兩種方式。
如果外部系統不支持事務,那么可以用預寫日志的方式,把結果數據先當成狀態保存,然后在收到 checkpoint 完成的通知時,一次性寫入 sink 系統。
9. Flink常用的算子有哪些
分兩部分:
數據讀取,這是Flink流計算應用的起點,常用算子有:
-
從內存讀:fromElements
-
從文件讀:readTextFile
-
Socket 接入 :socketTextStream
-
自定義讀取:createInput
處理數據的算子,常用的算子包括:Map(單輸入單輸出)、FlatMap(單輸入、多輸出)、Filter(過濾)、KeyBy(分組)、Reduce(聚合)、Window(窗口)、Connect(連接)、Split(分割)等。
推薦閱讀:一文學完Flink流計算常用算子(Flink算子大全)
10. Flink任務延時高,如何入手
在 Flink 的后臺任務管理中,我們可以看到 Flink 的哪個算子和 task 出現了反壓。最主要的手段是資源調優和算子調優。資源調優即是對作業中的 Operator 的并發數(parallelism)、CPU(core)、堆內存(heap_memory)等參數進行調優。作業參數調優包括:并行度的設置,State 的設置,checkpoint 的設置。
11. Flink是如何處理反壓的
Flink 內部是基于 producer-consumer 模型來進行消息傳遞的,Flink的反壓設計也是基于這個模型。Flink 使用了高效有界的分布式阻塞隊列,就像 Java 通用的阻塞隊列(BlockingQueue)一樣。下游消費者消費變慢,上游就會受到阻塞。
12. 如何排查生產環境中的反壓問題
1. 反壓出現的場景
反壓經常出現在促銷、熱門活動等場景。短時間內流量陡增造成數據的堆積或者消費速度變慢。
它們有一個共同的特點:數據的消費速度小于數據的生產速度。
2. 反壓監控方法
通過Flink Web UI發現反壓問題。
Flink 的 TaskManager 會每隔 50 ms 觸發一次反壓狀態監測,共監測 100 次,并將計算結果反饋給 JobManager,最后由 JobManager 進行計算反壓的比例,然后進行展示。
這個比例展示邏輯如下:
OK: 0 <= Ratio <= 0.10,表示狀態良好正;
LOW: 0.10 < Ratio <= 0.5,表示有待觀察;
HIGH: 0.5 < Ratio <= 1,表示要處理了(增加并行度/subTask/檢查是否有數據傾斜/增加內存)。
0.01,代表100次中有一次阻塞在內部調用。
3. flink反壓的實現方式
Flink任務的組成由基本的“流”和“算子”構成,“流”中的數據在“算子”間進行計算和轉換時,會被放入分布式的阻塞隊列中。當消費者的阻塞隊列滿時,則會降低生產者的數據生產速度
4. 反壓問題定位和處理
Flink會因為數據堆積和處理速度變慢導致checkpoint超時,而checkpoint是Flink保證數據一致性的關鍵所在,最終會導致數據的不一致發生。
數據傾斜:可以在 Flink 的后臺管理頁面看到每個 Task 處理數據的大小。當數據傾斜出現時,通常是簡單地使用類似 KeyBy 等分組聚合函數導致的,需要用戶將熱點 Key 進行預處理,降低或者消除熱點 Key 的影。
GC:不合理的設置 TaskManager 的垃圾回收參數會導致嚴重的 GC 問題,我們可以通過 -XX:+PrintGCDetails 參數查看 GC 的日志。
代碼本身:開發者錯誤地使用 Flink 算子,沒有深入了解算子的實現機制導致性能問題。我們可以通過查看運行機器節點的 CPU 和內存情況定位問題。
13. Flink中的狀態存儲
Flink在做計算的過程中經常需要存儲中間狀態,來避免數據丟失和狀態恢復。選擇的狀態存儲策略不同,會影響狀態持久化如何和 checkpoint 交互。Flink提供了三種狀態存儲方式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。
14. Operator Chains(算子鏈)這個概念你了解嗎
為了更高效地分布式執行,Flink 會盡可能地將 operator 的 subtask 鏈接(chain)在一起形成 task。每個 task 在一個線程中執行。將 operators 鏈接成 task 是非常有效的優化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數據在緩沖區的交換,減少了延遲的同時提高整體的吞吐量。這就是我們所說的算子鏈。
15. Flink的內存管理是如何做的
Flink 并不是將大量對象存在堆上,而是將對象都序列化到一個預分配的內存塊上。此外,Flink大量的使用了堆外內存。如果需要處理的數據超出了內存限制,則會將部分數據存儲到硬盤上。Flink 為了直接操作二進制數據實現了自己的序列化框架。
16. 如何處理生產環境中的數據傾斜問題
1. flink數據傾斜的表現:
任務節點頻繁出現反壓,增加并行度也不能解決問題;
部分節點出現OOM異常,是因為大量的數據集中在某個節點上,導致該節點內存被爆,任務失敗重啟。
2. 數據傾斜產生的原因:
業務上有嚴重的數據熱點,比如滴滴打車的訂單數據中北京、上海等幾個城市的訂單量遠遠超過其他地區;
技術上大量使用了 KeyBy、GroupBy 等操作,錯誤的使用了分組 Key,人為產生數據熱點。
3. 解決問題的思路:
業務上要盡量避免熱點 key 的設計,例如我們可以把北京、上海等熱點城市分成不同的區域,并進行單獨處理;
技術上出現熱點時,要調整方案打散原來的 key,避免直接聚合;此外 Flink 還提供了大量的功能可以避免數據傾斜。
17. Flink中的Time有哪幾種
Flink中的時間有三種類型,如下圖所示:
-
Event Time:是事件創建的時間。它通常由事件中的時間戳描述,例如采集的日志數據中,每一條日志都會記錄自己的生成時間,Flink通過時間戳分配器訪問事件時間戳。
-
Ingestion Time:是數據進入Flink的時間。
-
Processing Time:是每一個執行基于時間操作的算子的本地系統時間,與機器相關,默認的時間屬性就是Processing Time。
例如,一條日志進入Flink的時間為2021-01-22 10:00:00.123,到達Window的系統時間為2021-01-22 10:00:01.234,日志的內容如下:
2021-01-06 18:37:15.624 INFO Fail over to rm2
對于業務來說,要統計1min內的故障日志個數,哪個時間是最有意義的?—— eventTime,因為我們要根據日志的生成時間進行統計。
18. Flink對于遲到數據是怎么處理的
Flink中 WaterMark 和 Window 機制解決了流式數據的亂序問題,對于因為延遲而順序有誤的數據,可以根據eventTime進行業務處理,對于延遲的數據Flink也有自己的解決辦法,主要的辦法是給定一個允許延遲的時間,在該時間范圍內仍可以接受處理延遲數據
設置允許延遲的時間是通過allowedLateness(lateness: Time)設置
保存延遲數據則是通過sideOutputLateData(outputTag: OutputTag[T])保存
獲取延遲數據是通過DataStream.getSideOutput(tag: OutputTag[X])獲取
文章推薦:
Flink 中極其重要的 Time 與 Window 詳細解析
19. Flink中window出現數據傾斜怎么解決
window 產生數據傾斜指的是數據在不同的窗口內堆積的數據量相差過多。本質上產生這種情況的原因是數據源頭發送的數據量速度不同導致的。出現這種情況一般通過兩種方式來解決:
-
在數據進入窗口前做預聚合
-
重新設計窗口聚合的 key
20. Flink CEP編程中當狀態沒有到達的時候會將數據保存在哪里
在流式處理中,CEP 當然是要支持 EventTime 的,那么相對應的也要支持數據的遲到現象,也就是watermark的處理邏輯。CEP對未匹配成功的事件序列的處理,和遲到數據是類似的。在 Flink CEP的處理邏輯中,狀態沒有滿足的和遲到的數據,都會存儲在一個Map數據結構中,也就是說,如果我們限定判斷事件序列的時長為5分鐘,那么內存中就會存儲5分鐘的數據,這在我看來,也是對內存的極大損傷之一。
推薦閱讀:一文學會Flink CEP
21. Flink設置并行度的方式
們在實際生產環境中可以從四個不同層面設置并行度:
操作算子層面(Operator Level)
執行環境層面(Execution Environment Level)
客戶端層面(Client Level)
系統層面(System Level)
全局配置在flink-conf.yaml文件中,parallelism.default,默認是1:可以設置默認值大一點
需要注意的優先級:算子層面>環境層面>客戶端層面>系統層面。
22. Flink中Task如何做到數據交換
在一個 Flink Job 中,數據需要在不同的 task 中進行交換,整個數據交換是有 TaskManager 負責的,TaskManager 的網絡組件首先從緩沖 buffer 中收集 records,然后再發送。Records 并不是一個一個被發送的,是積累一個批次再發送,batch 技術可以更加高效的利用網絡資源。
23. Flink的內存管理是如何做的
Flink 并不是將大量對象存在堆上,而是將對象都序列化到一個預分配的內存塊上。此外,Flink大量的使用了堆外內存。如果需要處理的數據超出了內存限制,則會將部分數據存儲到硬盤上。Flink 為了直接操作二進制數據實現了自己的序列化框架。
24. 介紹下Flink的序列化
Flink 摒棄了 Java 原生的序列化方法,以獨特的方式處理數據類型和序列化,包含自己的類型描述符,泛型類型提取和類型序列化框架。
TypeInformation 是所有類型描述符的基類。它揭示了該類型的一些基本屬性,并且可以生成序列化器。
TypeInformation 支持以下幾種類型:
-
BasicTypeInfo: 任意 Java 基本類型或 String 類型
-
BasicArrayTypeInfo: 任意 Java 基本類型數組或 String 數組
-
WritableTypeInfo: 任意 Hadoop Writable 接口的實現類
-
TupleTypeInfo: 任意的 Flink Tuple 類型(支持 Tuple1 to Tuple25)。Flink tuples 是固定長度固定類型的 Java Tuple 實現
-
CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)
-
PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java 對象的所有成員變量,要么是 public 修飾符定義,要么有 getter/setter 方法
-
GenericTypeInfo: 任意無法匹配之前幾種類型的類
25. Flink海量數據高效去重
基于狀態后端。
基于HyperLogLog:不是精準的去重。
基于布隆過濾器(BloomFilter);快速判斷一個key是否存在于某容器,不存在就直接返回。
基于BitMap;用一個bit位來標記某個元素對應的Value,而Key即是該元素。由于采用了Bit為單位來存儲數據,因此可以大大節省存儲空間。
基于外部數據庫;選擇使用Redis或者HBase存儲數據,我們只需要設計好存儲的Key即可,不需要關心Flink任務重啟造成的狀態丟失問題。
26. Flink SQL的是如何實現的
構建抽象語法樹的事情交給了 Calcite 去做。SQL query 會經過 Calcite 解析器轉變成 SQL 節點樹,通過驗證后構建成 Calcite 的抽象語法樹(也就是圖中的 Logical Plan)。另一邊,Table API 上的調用會構建成 Table API 的抽象語法樹,并通過 Calcite 提供的 RelBuilder 轉變成 Calcite 的抽象語法樹。然后依次被轉換成邏輯執行計劃和物理執行計劃。
在提交任務后會分發到各個 TaskManager 中運行,在運行時會使用 Janino 編譯器編譯代碼后運行。
業務方面
1. ODS層采用什么壓縮方式和存儲格式?
壓縮采用Snappy,存儲采用orc,壓縮比是100g數據壓縮完10g左右。
2. DWD層做了哪些事?
數據清洗
-
空值去除
-
過濾核心字段無意義的數據,比如訂單表中訂單id為null,支付表中支付id為空
-
對手機號、身份證號等敏感數據脫敏
-
對業務數據傳過來的表進行維度退化和降維。
-
將用戶行為寬表和業務表進行數據一致性處理
清洗的手段
-
Sql、mr、rdd、kettle、Python(項目中采用sql進行清除)
3. DWS層做了哪些事?
DWS層有3-5張寬表(處理100-200個指標 70%以上的需求)
具體寬表名稱:用戶行為寬表,用戶購買商品明細行為寬表,商品寬表,購物車寬表,物流寬表、登錄注冊、售后等。
哪個寬表最寬?大概有多少個字段? 最寬的是用戶行為寬表。大概有60-100個字段
1. 在處理大數據過程中,如何保證得到期望值
保證在數據采集的時候不丟失數據,這個尤為重要,如果在數據采集的時候就已經不準確,后面很難達到期望值
在數據處理的時候不丟失數據,例如sparkstreaming處理kafka數據的時候,要保證數據不丟失,這個尤為重要
前兩步中,如果無法保證數據的完整性,那么就要通過離線計算進行數據的校對,這樣才能保證我們能夠得到期望值
2. 你感覺數倉建設中最重要的是什么
數倉建設中,最重要的是數據準確性,數據的真正價值在于數據驅動決策,通過數據指導運營,在一個不準確的數據驅動下,得到的一定是錯誤的數據分析,影響的是公司的業務發展決策,最終導致公司的策略調控失敗。
3. 數據倉庫建模怎么做的
數倉建設中最常用模型--Kimball維度建模詳解
4. 數據質量怎么監控
單表數據量監控
一張表的記錄數在一個已知的范圍內,或者上下浮動不會超過某個閾值
SQL結果:var 數據量 = select count(*)from 表 where 時間等過濾條件
報警觸發條件設置:如果數據量不在[數值下限, 數值上限], 則觸發報警
同比增加:如果((本周的數據量 -上周的數據量)/上周的數據量*100)不在 [比例下線,比例上限],則觸發報警
環比增加:如果((今天的數據量 - 昨天的數據量)/昨天的數據量*100)不在 [比例下線,比例上限],則觸發報警
報警觸發條件設置一定要有。如果沒有配置的閾值,不能做監控 日活、周活、月活、留存(日周月)、轉化率(日、周、月)GMV(日、周、月) 復購率(日周月)
單表空值檢測
某個字段為空的記錄數在一個范圍內,或者占總量的百分比在某個閾值范圍內
目標字段:選擇要監控的字段,不能選“無”
SQL結果:var 異常數據量 = select count(*) from 表 where 目標字段 is null
單次檢測:如果(異常數據量)不在[數值下限, 數值上限],則觸發報警
單表重復值檢測
一個或多個字段是否滿足某些規則
目標字段:第一步先正常統計條數;select count(*) form 表;
第二步,去重統計;select count(*) from 表 group by 某個字段
第一步的值和第二步的值做減法,看是否在上下線閥值之內
單次檢測:如果(異常數據量)不在[數值下限, 數值上限], 則觸發報警
跨表數據量對比
主要針對同步流程,監控兩張表的數據量是否一致
SQL結果:count(本表) - count(關聯表)
閾值配置與“空值檢測”相同
5. 數據分析方法論了解過哪些?
數據商業分析的目標是利用大數據為所有職場人員做出迅捷,高質,高效的決策提供可規?;慕鉀Q方案。商業分析是創造價值的數據科學。
數據商業分析中會存在很多判斷:
觀察數據當前發生了什么?
比如想知道線上渠道A、B各自帶來了多少流量,新上線的產品有多少用戶喜歡,新注冊流中注冊的人數有多少。這些都需要通過數據來展示結果。
理解為什么發生?
我們需要知道渠道A為什么比渠道B好,這些是要通過數據去發現的。也許某個關鍵字帶來的流量轉化率比其他都要低,這時可以通過信息、知識、數據沉淀出發生的原因是什么。
預測未來會發生什么?
在對渠道A、B有了判斷之后,根據以往的知識預測未來會發生什么。在投放渠道C、D的時候,猜測渠道C比渠道D好,當上線新的注冊流、新的優化,可以知道哪一個節點比較容易出問題,這些都是通過數據進行預測的過程。
商業決策
所有工作中最有意義的還是商業決策,通過數據來判斷應該做什么。這是商業分析最終的目的。
算法
大數據面試中考察的算法相對容易一些,??嫉挠信判蛩惴?#xff0c;查找算法,二叉樹等,下面講解一些最容易考的算法。
1. 排序算法
十種常見排序算法可以分為兩大類:
-
比較類排序:通過比較來決定元素間的相對次序,由于其時間復雜度不能突破O(nlogn),因此也稱為非線性時間比較類排序。
-
非比較類排序:不通過比較來決定元素間的相對次序,它可以突破基于比較排序的時間下界,以線性時間運行,因此也稱為線性時間非比較類排序。
算法復雜度:
相關概念:
-
穩定:如果a原本在b前面,而a=b,排序之后a仍然在b的前面。
-
不穩定:如果a原本在b的前面,而a=b,排序之后 a 可能會出現在 b 的后面。
-
時間復雜度:對排序數據的總的操作次數。反映當n變化時,操作次數呈現什么規律。
-
空間復雜度:是指算法在計算機內執行時所需存儲空間的度量,它也是數據規模n的函數。
下面講解大數據中最??嫉膬煞N:快排和歸并
1) 快速排序
快速排序的基本思想:通過一趟排序將待排記錄分隔成獨立的兩部分,其中一部分記錄的關鍵字均比另一部分的關鍵字小,則可分別對這兩部分記錄繼續進行排序,以達到整個序列有序。
算法描述
快速排序使用分治法來把一個串(list)分為兩個子串(sub-lists)。具體算法描述如下:
-
從數列中挑出一個元素,稱為 “基準”(pivot);
-
重新排序數列,所有元素比基準值小的擺放在基準前面,所有元素比基準值大的擺在基準的后面(相同的數可以到任一邊)。在這個分區退出之后,該基準就處于數列的中間位置。這個稱為分區(partition)操作;
-
遞歸地(recursive)把小于基準值元素的子數列和大于基準值元素的子數列排序。
代碼實現:
function?quickSort(arr,?left,?right)?{var?len?=?arr.length,partitionIndex,left?=?typeof?left?!=?'number'???0?:?left,right?=?typeof?right?!=?'number'???len?-?1?:?right;if?(left?<?right)?{partitionIndex?=?partition(arr,?left,?right);quickSort(arr,?left,?partitionIndex-1);quickSort(arr,?partitionIndex+1,?right);}return?arr; }function?partition(arr,?left?,right)?{?????//?分區操作var?pivot?=?left,??????????????????????//?設定基準值(pivot)index?=?pivot?+?1;for?(var?i?=?index;?i?<=?right;?i++)?{if?(arr[i]?<?arr[pivot])?{swap(arr,?i,?index);index++;}???????}swap(arr,?pivot,?index?-?1);return?index-1; }function?swap(arr,?i,?j)?{var?temp?=?arr[i];arr[i]?=?arr[j];arr[j]?=?temp; }2) 歸并排序
歸并排序是建立在歸并操作上的一種有效的排序算法。該算法是采用分治法(Divide and Conquer)的一個非常典型的應用。將已有序的子序列合并,得到完全有序的序列;即先使每個子序列有序,再使子序列段間有序。若將兩個有序表合并成一個有序表,稱為2-路歸并。
算法描述
-
把長度為n的輸入序列分成兩個長度為n/2的子序列;
-
對這兩個子序列分別采用歸并排序;
-
將兩個排序好的子序列合并成一個最終的排序序列。
代碼實現:
function?mergeSort(arr)?{var?len?=?arr.length;if?(len?<?2)?{return?arr;}var?middle?=?Math.floor(len?/?2),left?=?arr.slice(0,?middle),right?=?arr.slice(middle);return?merge(mergeSort(left),?mergeSort(right)); }function?merge(left,?right)?{var?result?=?[];while?(left.length>0?&&?right.length>0)?{if?(left[0]?<=?right[0])?{result.push(left.shift());}?else?{result.push(right.shift());}}while?(left.length)result.push(left.shift());while?(right.length)result.push(right.shift());return?result; }2. 查找算法
七大查找算法:1. 順序查找、2. 二分查找、3. 插值查找、4. 斐波那契查找、5. 樹表查找、6. 分塊查找、7. 哈希查找
這些查找算法中二分查找是最容易考察的,下面講解二分查找算法。
1) 二分查找
二分查找也稱折半查找(Binary Search),它是一種效率較高的查找方法。但是,折半查找要求線性表必須采用順序存儲結構,而且表中元素按關鍵字有序排列,注意必須要是有序排列。
代碼實現:
使用遞歸
不使用遞歸實現(while循環)
3. 二叉樹實現及遍歷
定義:二叉樹,是一種特殊的樹,二叉樹的任意一個節點的度都不大于2,不包含度的節點稱之為葉子。
遍歷方式:二叉樹的遍歷方式有三種,中序遍歷,先序遍歷,后序遍歷。
將一個數組中的數以二叉樹的存儲結構存儲,并遍歷打印:
代碼實現:?
import?java.util.ArrayList; import?java.util.List;public?class?bintree?{public?bintree?left;public?bintree?right;public?bintree?root; //????數據域private?Object?data;//????存節點public?List<bintree>?datas;public?bintree(bintree?left,?bintree?right,?Object?data){this.left=left;this.right=right;this.data=data;} //????將初始的左右孩子值為空public?bintree(Object?data){this(null,null,data);}public?bintree()?{}public?void?creat(Object[]?objs){datas=new?ArrayList<bintree>();//????????將一個數組的值依次轉換為Node節點for(Object?o:objs){datas.add(new?bintree(o));} //????????第一個數為根節點root=datas.get(0); //????????建立二叉樹for?(int?i?=?0;?i?<objs.length/2;?i++)?{ //????????????左孩子datas.get(i).left=datas.get(i*2+1); //????????????右孩子if(i*2+2<datas.size()){//避免偶數的時候?下標越界datas.get(i).right=datas.get(i*2+2);}}} //先序遍歷 public?void?preorder(bintree?root){if(root!=null){System.out.println(root.data);preorder(root.left);preorder(root.right);}} //中序遍歷public?void?inorder(bintree?root){if(root!=null){inorder(root.left);System.out.println(root.data);inorder(root.right);}} //????后序遍歷public?void?afterorder(bintree?root){if(root!=null){System.out.println(root.data);afterorder(root.left);afterorder(root.right);}}public?static?void?main(String[]?args)?{bintree?bintree=new?bintree();Object?[]a={2,4,5,7,1,6,12,32,51,22};bintree.creat(a);bintree.preorder(bintree.root);} }點擊獲取本文PDF版:2022年最強大數據面試寶典PDF版
?
總結
以上是生活随笔為你收集整理的2022年最强大数据面试宝典(全文50000字)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 机器学习笔记(四)BP神经网络模型
- 下一篇: 证明连续随机变量形式Jensen不等式