Hadoop详解(六):MapReduce计算框架详解
1. Hadoop MapReduce簡介
Hadoop MapReduce是一個使用簡便的軟件框架,是Google云計算模型MapReduce的Java開源實現(xiàn),基于它寫出來的應用程序能夠運行在由上千萬臺普通機器注冊的大型集群系統(tǒng)中,并以一種可靠地、容錯的方式并行處理上T級別的數(shù)據(jù)集。
Hadoop MapReduce基本思想:一個MapReduce作業(yè)通常會把輸入的數(shù)據(jù)集合切分為若干獨立的數(shù)據(jù)塊,由Map任務并行的方式處理。該框架會對Map的輸出先進行排序,然后把結(jié)果輸出作為Reduce任務的輸入。通常作用的輸入和輸出都會存儲在文件系統(tǒng)中。
1.1 系統(tǒng)架構(gòu)
在系統(tǒng)架構(gòu)上,MapReduce框架是一種主從架構(gòu),由一個單獨的JobTracker節(jié)點和多個TaskTracker節(jié)點共同組成。
JobTracker是MapReduce的Master,負責調(diào)度構(gòu)成一個作業(yè)的所有任務,這些任務分布在不同 的TaskTracker節(jié)點上,監(jiān)控它們的執(zhí)行,重新執(zhí)行已經(jīng)失敗的任務,同時提高狀態(tài)和診斷信息給作業(yè)客戶端。
TaskTracker是MapReduce的Slave,僅負責運行由Master指派的任務執(zhí)行。
Hadoop的作業(yè)客戶端提交作業(yè)(jar包和可執(zhí)行程序)和配置信息作為Master的JobTracker,JobTracker負責分發(fā)用戶程序和配置信息給集群中的TaskTracker,以及調(diào)度任務并監(jiān)控他們的執(zhí)行,同時提供狀態(tài)和診斷信息給作業(yè)客戶端。
2. MapReduce模型
2.1 MapReduce編程模型
對于Map函數(shù),處理輸入的鍵值對,并且產(chǎn)生一組中間的鍵值對。MapReduce框架收集所有相同的中間鍵值的鍵值對,并且發(fā)送給Reduce函數(shù)進行處理。對于Reduce函數(shù),它處理中間鍵的鍵值對,以及這個中間鍵值對相關(guān)的值集合。此函數(shù)合并這些值,最后形成一個相對較少的值集合。
2.2 MapReduce實現(xiàn)原理
3. 計算流程與機制
3.1 Hadoop作業(yè)提交和初始化
MapReduce的Master接收到客戶端所提交的作業(yè)后首先要完成的就是將作業(yè)初始化為map任務和reduce任務,然后就是等待JobTracker調(diào)度執(zhí)行。
通過上面的操作就完成了MapReduce作業(yè)的提交工作了,那么接下來就開始作業(yè)的初始化操作:作業(yè)的初始化操作主要指的就是構(gòu)造MapTask和ReduceTask并且對他們進行初始化操作,這一步操作主要是調(diào)度器JobTracker.initJob()方法來進行的。具體情況是Hadoop將每個作業(yè)分成4個不同類型的任務:Setup Task、Map Task、Reduce Task、Cleanup Task。
3.2 Mapper
Mapper是MapReduce框架給用戶暴露的Map編程接口,用戶在實現(xiàn)自己的Mapper類時需要繼承這個基類。執(zhí)行Map Task任務:將輸入鍵值對(key/value pair)映射到一組中間格式的鍵值對集合。
處理流程如下:
3.3 Reducer
Reducer將與一個key關(guān)聯(lián)的一組中間數(shù)值集歸約為一個更小的數(shù)值集。
3.4 Reporter和OutputCollector
Report是用于MapReduce應用程序的報告進度,設定應用級別的狀態(tài)信息,更新Counters(計數(shù)器)的機制。Master和Reducer的處理情況可以利用Reporter來報告進度或者表明自己是運行正常的
OutputCollector是一個由Map/Reduce框架提供的,用于收集Mapper或者Reducer輸出數(shù)據(jù)的通用機制。
4. MapReduce的輸入/輸出格式
MapReduce計算框架本質(zhì)上是一種基于磁盤的批處理并行計算系統(tǒng),每一輪MapReduce作業(yè)都需要從分布式文件系統(tǒng)中讀取數(shù)據(jù),處理之后再寫入分布式文件系統(tǒng)。其他涉及到很多I/O操作,這些操作包括內(nèi)存到磁盤、磁盤到內(nèi)存,以及節(jié)點之間的數(shù)據(jù)交換。
4.1 輸入格式
TextInputFormat
TextInputFormat 用于讀取純文本文件。
KeyValueTextInputFormat
KeyValueTextInputFormat同樣是用于讀取文本文件。
NLineInputForamt
NLineInputForamt可以將文件以行為單位進行split切分,比如文件中的每一行對應的一個Map。
SequenceFileInputFormat
SequenceFileInputFormat用于讀取SequenceFile。
4.2 輸出格式
Hadoop中的OutputFormat用來描述MapReduce作業(yè)的輸出格式:
TextOutputFormat
TextOutputFormat是Hadoop默認的輸出格式。
SequenceFileOutputFormat
SequenceFileOutputFormat用于就是輸出到Hadoop中的SequenceFile文件格式。
MapFileOutputFormat
指定MapFileOutputFormat輸出類型可以將數(shù)據(jù)輸出為Hadoop中的MapFile文件格式。
MultipleOutputFormat
MultipleOutputFormat是Hadoop中的多路輸出處理類,通過這個類可以實現(xiàn)根據(jù)key將記錄控制輸出到不同的文件。
5. 核心問題
5.1 Map和Reduce數(shù)量問題
MapTask數(shù)量
Max.split(100M) Min.split(10M) Block(64M)InputSize=Max(min.split,min(max.split,block))- Max.split指的是最大InputSplit文件大小
- Min.split指的是最小InputSplit文件大小
- Block指的是Block文件大小
其中InputSize的大小是InputSize=Max(min.split,min(max.split,block))
ReduceTask數(shù)量
job.setNumReduceTasks(numReduceTask);通過job設置ReduceTask數(shù)量個數(shù)。
單個Reduce:
多個Reduce
數(shù)量為0(適應于不需要歸約和處理的作業(yè))
5.2 作業(yè)配置
- 作業(yè)配置的相關(guān)設置方法
| setNumReduceTasks | 設置reduce數(shù)目 |
| setNumMapTasks | 設置Map數(shù)目 |
| setInputFormatClass | 設置輸入文件格式類 |
| setOutputFormatClass | 設置輸出文件格式類 |
| setMapperClass | 輸出Map類 |
| setCombiner | 設置Combiner類 |
| setReducerClass | 設置Reduce類 |
| setPartitionerClass | 設置Partitioner類 |
| setMapOutputKeyClass | 設置Map輸出的Key類 |
| setMapOutputValueClass | 設置Map輸出的Value類 |
| setCompressMapOutput | 設置Map輸出是否壓縮 |
| setOutputValueClass | 設置輸出value類 |
| setJobName | 設置作業(yè)名字 |
| setSpeculativeExecution | 設置是否開啟預防性執(zhí)行 |
| setMapSpeculativeExecution | 設置是否開啟Map任務的預防性執(zhí)行 |
| setReduceSpeculativeExecution | 設置是否開啟Reduce任務的預防性執(zhí)行 |
5.3 作業(yè)的容錯機制
MapReduce作為一個通用的并行計算框架,有著非常健壯的容錯機制,在不同的粒度上均有考慮。
再執(zhí)行
用戶的一個MapReduce作業(yè)往往是由很多任務組成的,只有所有的任務執(zhí)行處理完畢之后才算整個作業(yè)成功。對于任務的容錯機制,MapReduce采用的最簡單的方法進行處理,即“再執(zhí)行”,也就是對于失敗的任務重新調(diào)度執(zhí)行一次。一般有以下兩種情況需要再執(zhí)行:
任務以及正在運行中的Map和Reduce任務都將調(diào)度重新執(zhí)行,同時在其他機器上正在運行的Reduce任務也將被重新執(zhí)行。
推測式執(zhí)行
在mapreduce中,影響一個作業(yè)的總執(zhí)行時間最通常的因素是“落伍者”:在運算過程中,如果有一臺機器花了很長時間才完成最后幾個Map或者Reduce任務,導致mapReduce任務執(zhí)行時間超過預期。出現(xiàn)“落伍者”的原因很多,CPU、內(nèi)存、本地磁盤和網(wǎng)絡帶寬等因素都會導致某些Map或者reduce任務執(zhí)行效率更加緩慢。
所謂推測式執(zhí)行策略就是MapReduce對每一個任務都計算它的進度,如果一個任務的進度遠遠慢于其他的任務時,那么這個任務便可以被認為是一個“落伍者”。在發(fā)現(xiàn)一個“落伍者”之后,調(diào)度器就會在其他節(jié)點上重新調(diào)度這任務以便重新執(zhí)行。在這個時候,一般會有兩個相同的任務在同時執(zhí)行,最終先完成的那個任務就算成功了,而沒有完成的那個任務就會被殺死。
5.4 作業(yè)調(diào)度
調(diào)度的功能是將各種類型的作業(yè)在調(diào)度算法作用下分配給Hadoop集群中的計算節(jié)點,從而達到 分布式和并行計算 的目的。
調(diào)度算法模塊中至少涉及兩個重要流程:1.作業(yè)的選擇 2.任務的分配。
調(diào)度過程 :
1)MapReduce框架中作業(yè)通常是通過JobClient.runJob(job)方法提交到JobTracker,JobTracker接收到JobClient的請求后將其加入作業(yè)調(diào)度隊列中。
2)然后JobTracker一直等待JobClient通過RPC向其提交作業(yè),而TaskTracker則一直通過RPC向JobTracker發(fā)送心跳信號詢問是否有任務可執(zhí)行,有則請求JobTracker派發(fā)任務給它執(zhí)行。
3)如果JobTracker的作業(yè)隊列不為空,則TaskTracker發(fā)送的心跳將會獲得JobTracker向它派發(fā)的任務。
這是一個主動請求的任務:slave的TaskTracker主動向master的JobTracker請求任務。
4)當TaskTracker接到任務后,通過自身調(diào)度在本slave建立起Task,執(zhí)行任務。
常用調(diào)度器 主要包括:JobQueueTaskScheduler(FIFO調(diào)度器),CapacityScheduler(容量調(diào)度器),Fair Scheduler(公平調(diào)度器)等。
- FIFO調(diào)度器:基本思想是作業(yè)按照先后順序統(tǒng)一放入一個隊列中,然后根據(jù)優(yōu)先級按照時間先后順序依次執(zhí)行,總體遵循先進先出的基本調(diào)度策略
- 容量調(diào)度器:計算能力調(diào)度器,是雅虎結(jié)合自己的集群業(yè)務類型提出的一種調(diào)度策略。這種調(diào)度策略支持多種隊列,每個隊列可以單獨配置一定的資源量,每個隊列采取FIFO策略
- 公平調(diào)度器:FaceBook開發(fā)的貢獻給開源社區(qū)的,可以是多種作業(yè)并行執(zhí)行并共享資源池。公平調(diào)度器的目的就是為了保證在多用戶、多作業(yè)類型的情況下保證整個集群的資源利用率,同時可以讓所有用戶公平地共享整個集群資源。
6. MapReduce特性
6.1 計數(shù)器(Counters)
MapReduce Counter可以為我們提供一個觀察MapReduce Job運行期中的各個細節(jié)數(shù)據(jù)視圖。通過這些Counter計數(shù)器我們可以從全局視角來審查程序的運行情況,以及做出錯誤診斷進行相應處理。
6.2 DistributedCache
DistributedCache是MapReduce計算框架提供的功能,能夠緩存應用程序所需要的文件(包括文本、檔案文件、jar文件等)。可以將具體應用相關(guān)的、大尺寸的、只讀的文件有效地分發(fā)到各個計算機中,應用程序只需要在JobConf中通過url(hdfs://)指定需要緩存的文件。
MapReduce框架在作業(yè)的所有任務執(zhí)行之前會把必要的文件復制到slave節(jié)點上。它運行高效的原因是因為每個作業(yè)的文件只復制一次并且為那些沒有文檔的slave文件緩存文檔。
6.3 Tool
6.4 Profiling
6.5 數(shù)據(jù)壓縮
總結(jié)
以上是生活随笔為你收集整理的Hadoop详解(六):MapReduce计算框架详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hadoop详解(七):YARYN完全分
- 下一篇: Hadoop详解(八):MapReduc