MapReduce运行机制-Map阶段
MapTask 運行機制
整個Map階段流程大體如上圖所示。
簡單概述:inputFile通過split被邏輯切分為多個split文件,通過Record按行讀取內容給map(用戶自己實現的)進行處理,數據被map處理結束之后交給OutputCollector收集器,對其結果key進行分區(默認使用hash分區),然后寫入buffer,每個map task都有
一個內存緩沖區,存儲著map的輸出結果,當緩沖區快滿的時候需要將緩沖區的數據以一個臨時文件的方式存放到磁盤,當整個map task結束后再對磁盤中這個map task產生的所有臨時文件做合并,生成最終的正式輸出文件,然后等待reduce task來拉數據?
詳細步驟
1. 讀取數據組件 InputFormat (默認 TextInputFormat) 會通過 getSplits 方法對輸入目錄中文件進行邏輯切片規劃得到 splits, 有多少個 split 就對應啟動多少個MapTask . split 與 block 的對應關系默認是一對一
2. 將輸入文件切分為 splits 之后, 由 RecordReader 對象 (默認是LineRecordReader)進行讀取, 以 \n 作為分隔符, 讀取一行數據, 返回 <key,value> . Key 表示每行首字符偏移值, Value 表示這一行文本內容
3. 讀取 split 返回 <key,value> , 進入用戶自己繼承的 Mapper 類中,執行用戶重寫的 map 函數, RecordReader 讀取一行這里調用一次
4. Mapper 邏輯結束之后, 將 Mapper 的每條結果通過 context.write 進行collect數據收集. 在 collect 中, 會先對其進行分區處理,默認使用 HashPartitioner
MapReduce 提供 Partitioner 接口, 它的作用就是根據 Key 或 Value 及Reducer 的數量來決定當前的這對輸出數據最終應該交由哪個 Reduce task處理, 默認對 Key Hash 后再以 Reducer 數量取模. 默認的取模方式只是為了平均 Reducer 的處理能力, 如果用戶自己對 Partitioner 有需求, 可以訂制并設置到 Job 上
5. 接下來, 會將數據寫入內存, 內存中這片區域叫做環形緩沖區, 緩沖區的作用是批量收集Mapper 結果, 減少磁盤 IO 的影響. 我們的 Key/Value 對以及 Partition 的結果都會被寫入緩沖區. 當然, 寫入之前,Key 與 Value 值都會被序列化成字節數組
環形緩沖區其實是一個數組, 數組中存放著 Key, Value 的序列化數據和 Key,Value 的元數據信息, 包括 Partition, Key 的起始位置, Value 的起始位置以及Value 的長度. 環形結構是一個抽象概念
緩沖區是有大小限制, 默認是 100MB. 當 Mapper 的輸出結果很多時, 就可能會撐爆內存, 所以需要在一定條件下將緩沖區中的數據臨時寫入磁盤, 然后重新利用這塊緩沖區. 這個從內存往磁盤寫數據的過程被稱為 Spill, 中文可譯為溢寫. 這個溢寫是由單獨線程來完成, 不影響往緩沖區寫 Mapper 結果的線程.溢寫線程啟動時不應該阻止 Mapper 的結果輸出, 所以整個緩沖區有個溢寫的
比例 spill.percent . 這個比例默認是 0.8, 也就是當緩沖區的數據已經達到閾值 buffer size * spill percent = 100MB * 0.8 = 80MB , 溢寫線程啟動,鎖定這 80MB 的內存, 執行溢寫過程. Mapper 的輸出結果還可以往剩下的20MB 內存中寫, 互不影響
6. 當溢寫線程啟動后, 需要對這 80MB 空間內的 Key 做排序 (Sort). 排序是 MapReduce模型默認的行為, 這里的排序也是對序列化的字節做的排序
如果 Job 設置過 Combiner, 那么現在就是使用 Combiner 的時候了. 將有相同 Key 的 Key/Value 對的 Value 加起來, 減少溢寫到磁盤的數據量.Combiner 會優化 MapReduce 的中間結果, 所以它在整個模型中會多次使用
那哪些場景才能使用 Combiner 呢? 從這里分析, Combiner 的輸出是Reducer 的輸入, Combiner 絕不能改變最終的計算結果. Combiner 只應該用于那種 Reduce 的輸入 Key/Value 與輸出 Key/Value 類型完全一致, 且不影響最終結果的場景. 比如累加, 最大值等. Combiner 的使用一定得慎重, 如果用好, 它對 Job 執行效率有幫助, 反之會影響 Reducer 的最終結果
7. 合并溢寫文件, 每次溢寫會在磁盤上生成一個臨時文件 (寫之前判斷是否有 Combiner),如果 Mapper 的輸出結果真的很大, 有多次這樣的溢寫發生, 磁盤上相應的就會有多個臨時文件存在. 當整個數據處理結束之后開始對磁盤中的臨時文件進行 Merge 合并, 因為最終的文件只有一個, 寫入磁盤, 并且為這個文件提供了一個索引文件, 以記錄每個reduce對應數據的偏移量
配置
| 配置 | 默認值 | 解釋 |
| mapreduce.task.io.sort.mb | 100 | 設置 環型 緩沖 區的 內存 值大 小 |
| mapreduce.map.sort.spill.percent | 0.8 | 設置 溢寫 的比 例 |
| mapreduce.cluster.local.dir | ${hadoop.tmp.dir}/mapred/local | 溢寫 數據 目錄 |
| mapreduce.task.io.sort.factor | 10 | 設置 一次 合并 多少 個溢 寫文 件 |
?
總結
以上是生活随笔為你收集整理的MapReduce运行机制-Map阶段的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MapReduce-流量统计求和-分区代
- 下一篇: MapReduce运行机制-Reduce