Hadoop详解(八):MapReduce深度分析
1. MapReduce數據流向分析
MapReduce是一種并行數據處理框架,首先我們需要關注系統中數據流向問題,也就是從輸入到輸出過程中的數據傳輸問題。
1)從HDFS到Mpper節點輸入文件。在一般情況下,存儲數據的節點是Mapper運行的節點,不需要在節點之間進行數據傳輸,也就是盡量讓存儲靠近計算。但是由于用戶的數據文件往往不是均衡的分布在整個集群中,則MapReduce的計算槽位資源卻是均衡的分布在整個集群中的,因此某些計算節點就需要從數據存儲節點獲取數據到自己的計算節點,這樣就會存在數據從HDFS上的存儲節點到另一個計算節點的數據傳輸,為了提高集群資源的利用率,Hadoop會從距離計算節點最近的數據副本進行數據傳輸。
2)Mapper輸出到內存緩沖區。Mapper的輸出并不是直接寫入本地文件系統,而是先寫入內存緩沖區。
3)當緩沖區達到一定的閾值時就將緩沖區中的數據以臨時文件的形式寫入本地磁盤。默認的緩沖區大小是100MB,溢寫比例默認是0.8 (可通過spill.percent參數來調節)
當達到閾值時,溢寫線程就會啟動并鎖定這80MB內存執行溢寫過程,這一過程稱為spill。溢寫線程啟動的同時還會對這80MB的內存數據依據key的序列化字節做排序。當整個map任務結束后,會對這個map任務產生的所有臨時文件進行合并,并產生最終的輸出文件。
需要注意:在寫入內存緩沖區的同時執行Partition分區。
如果用戶作業設置了Combiner,那么在溢寫到磁盤之前會對Map輸出的鍵值對調用Combiner歸約,這樣可以減少溢寫到本地磁盤文件的數據量。
4)從Mapper端的本地文件系統流入Reduce端,也就是 Reduce中的Shuffle階段 分三種情況:
需要注意的是Reduce端的這個內存緩沖區也有一個閾值,當相應的Region文件大于這個閾值便寫入磁盤。
5)從Reduce端內存緩沖區流向本地磁盤的過程就是Reduce中Merge和Sort階段。Merge分為內存文件合并和磁盤文件合并,同時還會以key為鍵排序,最終生成已經對相同key的value進行聚集并排序好的輸出文件。
6)流向Reduce函數進行歸約處理
7)寫入HDFS中,生成輸出文件。
2. MapTask實現分析
MapTask的總邏輯流程,包括以下幾個階段:
2.1 Read階段
首先通過taskContext.getInputFormatClass()得到用戶指定的InputFormatClass來創建InputFormat對象實例;其次,創建InputSplit對象,這個對象負責對文件進行數據塊的邏輯切分;最后,創建RecordReader對象。InputFormat對象會提供getSplit()重要方法,通過getSplit()將輸入文件切分成若干個邏輯InputSplit實例對象會把InputSplit提供的輸入文件轉化為Mapper需要的keys/vaule鍵值對集合形式。
2.2 Map階段
對輸入的鍵值對調用用戶編寫的Map函數進行處理,輸出<key,value>鍵值對
2.3 Collector和Partitioner階段
收集Mapper輸出,在OutputCollector函數內部對鍵值對進行Partitioner分區,以便確定相應的Reducer處理,這個階段將最終的鍵值對集合輸出到內存緩沖區。
2.4 Spill階段
包含Sort和Combiner階段,當內存緩沖區達到閾值后寫入本地磁盤,在這個階段會對Mapper的輸出鍵值對進行Sort排序,如果設置了Combiner會執行Combiner函數。
Spill階段有兩個重要的邏輯,Sort和Combiner(如果用戶設置了Combiner)。
每個mapTask都有一個內存的環形緩存區(默認是100MB),存儲著map的輸出結果。當緩存區塊滿的時候需要將緩沖區的數據以一個臨時文件的方式存放到磁盤(Spill)。溢寫時由單線程來完成的,不影響往緩沖區寫map結果的線程(spill.percent,默認是0.8)
2.5 Merge階段
對Spill階段在本地磁盤生成的小文件進行多次合并,最終生成一個大文件。
(3),(4),(5)也稱Map端的Shuffle
3. ReduceTask 實現分析
ReduceTask的總邏輯流程,包括以下幾個階段:
3.1 Shuffle階段
這個階段就是Reduce中的Copy階段,運行Reducer的TaskTracker需要從各個Mapper節點遠程復制屬于自己處理的一段數據。
Mapper的輸出是寫入本地磁盤的,并且是按照partition分區號進行組織的,Reduce的輸入便是分布在集群中多個Mapper任務輸出數據中同一partition段的數據,Map任務可能會在不同的時間內完成,只要其中一個Map任務完成了,ReduceTask就開始復制它的數據了。
3.2 Merge階段
由于執行Shuffle階段時會從各個Mapper節點復制很多同一partition段的數據,因此需要進行多次合并,以防止ReduceTask節點上內存使用過多或小文件過多。
在Shuffle階段中啟動數據復制線程MapOutputCopier后就開始進行Merge階段。Merge包括兩種情況:基于內存的合并和基于磁盤的合并,其分別對應的線程為InMemFSMergeTheread和LocalFSMerge。
3.3 Sort階段
雖然每個Mapper的輸出是按照key排序好的,但是經過Shuffle和Merge階段后并不是統一有序的,因此還需要在Reduce端進行多輪歸并排序。
3.4 Reduce 階段
Reduce的輸入要求是按照key排序的,因此只有在Sort階段執行完成之后才可以對數據調用用戶編寫的Reduce類進行歸約處理。
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Hadoop详解(八):MapReduce深度分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hadoop详解(六):MapReduc
- 下一篇: Hadoop详解(九):Hadoop S