Hadoop生态之Mapreduce
今天給大家?guī)淼氖荋adoop生態(tài)中的Mapreduce,看到這里諸佬們可能就有疑惑了呢,啥是Mapreduce?小小的腦袋大大的疑惑。
在上篇博客中博主使用了王者來舉例子,如果把Hadoop當(dāng)作王者的話,HDFS是后臺(tái)存儲(chǔ)點(diǎn)券數(shù)據(jù)的系統(tǒng)的話,那么我們今天介紹的Mapreduce就是某者用來計(jì)算優(yōu)惠力度,并且計(jì)算游戲里最終到賬的點(diǎn)券。(雖然博主不怎么充錢)
Mapreduce
- 1.MapReduce概述
- 1.1 MapReduce定義
- 1.2 MapReduce優(yōu)缺點(diǎn)
- 1.2.1 優(yōu)點(diǎn)
- 1.2.2 缺點(diǎn)
- 2.MapReduce的運(yùn)行機(jī)制
- 3. Hadoop序列化
- 3.1 序列化概述
- 3.2 自定義bean對(duì)象實(shí)現(xiàn)序列化接口(Writable)
- 4.MapReduce的框架原理
- 4.1 InputFormat數(shù)據(jù)輸入
- 4.1.1 切片與MapTask并行度決定機(jī)制
- 4.1.2 Job提交流程源碼和切片源碼詳解
- 5.MapReduce開發(fā)總結(jié)
- 1)輸入數(shù)據(jù)接口:InputFormat
- 2)邏輯處理接口:Mapper
- 3)Partitioner分區(qū)
- 4)Comparable排序
- 5)Combiner合并
- 6)邏輯處理接口:Reducer
- 7)輸出數(shù)據(jù)接口:OutputFormat
- 6.MapReduce常見面試題
- 1. Mapreduce 的 map 數(shù)量 和 reduce 數(shù)量是由什么決定的 ,怎么配置
- 2. MapReduce優(yōu)化經(jīng)驗(yàn)
- 3. 分別舉例什么情況要使用 combiner,什么情況不使用?
- 4. MR運(yùn)行流程解析
- 5. suffle階段運(yùn)行流程(必背)
- 6. map端的shuffle
- 7. reduce端的shuffle
- 8.Split
- 8.1 分片概念
- 8.2 分片數(shù)量與Map Task數(shù)量的關(guān)系
- 8.3 由誰來劃分分片?
- 8.4 分片的大小
- 8.5 默認(rèn)分片大小與Block分塊大小相同的原因是什么?
1.MapReduce概述
1.1 MapReduce定義
MapReduce是一個(gè)分布式運(yùn)算程序的編程框架,是用戶開發(fā)“基于Hadoop的數(shù)據(jù)分析應(yīng)用”的核心框架。
MapReduce核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個(gè)完整的分布式運(yùn)算程序,并發(fā)運(yùn)行在一個(gè)Hadoop集群上。
1.2 MapReduce優(yōu)缺點(diǎn)
1.2.1 優(yōu)點(diǎn)
1)MapReduce易于編程
它簡(jiǎn)單的實(shí)現(xiàn)一些接口,就可以完成一個(gè)分布式程序,這個(gè)分布式程序可以分布到大量廉價(jià)的PC機(jī)器上運(yùn)行。也就是說你寫一個(gè)分布式程序,跟寫一個(gè)簡(jiǎn)單的串行程序是一模一樣的。就是因?yàn)檫@個(gè)特點(diǎn)使得MapReduce編程變得非常流行。
2)良好的擴(kuò)展性
當(dāng)你的計(jì)算資源不能得到滿足的時(shí)候,你可以通過簡(jiǎn)單的增加機(jī)器來擴(kuò)展它的計(jì)算能力。
3)高容錯(cuò)性
MapReduce設(shè)計(jì)的初衷就是使程序能夠部署在廉價(jià)的PC機(jī)器上,這就要求它具有很高的容錯(cuò)性。比如其中一臺(tái)機(jī)器掛了,它可以把上面的計(jì)算任務(wù)轉(zhuǎn)移到另外一個(gè)節(jié)點(diǎn)上運(yùn)行,不至于這個(gè)任務(wù)運(yùn)行失敗,而且這個(gè)過程不需要人工參與,而完全是由Hadoop內(nèi)部完成的。
4)適合PB級(jí)以上海量數(shù)據(jù)的離線處理
可以實(shí)現(xiàn)上千臺(tái)服務(wù)器集群并發(fā)工作,提供數(shù)據(jù)處理能力。
1.2.2 缺點(diǎn)
1)不擅長實(shí)時(shí)計(jì)算
MapReduce無法像MySQL一樣,在毫秒或者秒級(jí)內(nèi)返回結(jié)果。
2)不擅長流式計(jì)算
流式計(jì)算的輸入數(shù)據(jù)是動(dòng)態(tài)的,而MapReduce的輸入數(shù)據(jù)集是靜態(tài)的,不能動(dòng)態(tài)變化。這是因?yàn)镸apReduce自身的設(shè)計(jì)特點(diǎn)決定了數(shù)據(jù)源必須是靜態(tài)的。
3)不擅長DAG(有向無環(huán)圖)計(jì)算
多個(gè)應(yīng)用程序存在依賴關(guān)系,后一個(gè)應(yīng)用程序的輸入為前一個(gè)的輸出。在這種情況下,MapReduce并不是不能做,而是使用后,每個(gè)MapReduce作業(yè)的輸出結(jié)果都會(huì)寫入到磁盤,會(huì)造成大量的磁盤IO,導(dǎo)致性能非常的低下。
2.MapReduce的運(yùn)行機(jī)制
諸佬們從MapReduce的名字以及上面的介紹中,應(yīng)該也可以知道,MapReduce實(shí)現(xiàn)中最重要的兩個(gè)概念:Map和Reduce。
Map
Map的任務(wù)是:處理原始數(shù)據(jù)、為數(shù)據(jù)打標(biāo)簽、對(duì)數(shù)據(jù)進(jìn)行分發(fā)(嚴(yán)格來說這并不完全是map的職責(zé))
處理原始數(shù)據(jù)
這一階段是對(duì)原始數(shù)據(jù)進(jìn)行預(yù)處理的階段,可以從行和列兩個(gè)角度來考慮。
行:比如我們需要對(duì)數(shù)據(jù)按照時(shí)間過濾,只選擇本周一的數(shù)據(jù),其他數(shù)據(jù)過濾掉不處理。
列:比如原始數(shù)據(jù)有10列,我們只需要其中的5列,其他列過濾掉不處理。
舉例:
假如我的HDFS上有一周的支出數(shù)據(jù),我們想統(tǒng)計(jì)周一周二的支出情況,接下來我們會(huì)一步步解釋這個(gè)過程。下圖是其中一部分記錄:
map:處理原始數(shù)據(jù)
可以看出過濾掉了非周一周二的數(shù)據(jù),并且刪除了使用人的字段。
為數(shù)據(jù)打標(biāo)簽
map處理完原始數(shù)據(jù)之后,接下來就要將數(shù)據(jù)分組,從而分配給合適的reduce去處理,分組的第一步就是打標(biāo)簽。
map:為數(shù)據(jù)打標(biāo)簽
可以看出,對(duì)每一條數(shù)據(jù)加了一條對(duì)應(yīng)天數(shù)的標(biāo)簽。
對(duì)數(shù)據(jù)進(jìn)行分發(fā)
打完標(biāo)簽之后,就需要對(duì)數(shù)據(jù)進(jìn)行分發(fā),嚴(yán)格來說,這并不完全屬于Map的職責(zé),其中也用到了一個(gè)神秘的中間環(huán)節(jié):shuffle。不過入門來看,我們就單純?nèi)蝿?wù)這屬于Map。
分發(fā)的意思是,打完標(biāo)簽之后,要對(duì)數(shù)據(jù)進(jìn)行分類處理,然后再發(fā)送給Reduce;分類的依據(jù),就是上面對(duì)其打的自定義標(biāo)簽。
map:分發(fā)有標(biāo)簽的數(shù)據(jù)
可以看出,對(duì)每一條數(shù)據(jù),按照標(biāo)簽分配,由原來的一個(gè)列表,變成了現(xiàn)在的兩個(gè)列表。
Map階段到此完成,接下來的任務(wù)就是要等著Reduce來取數(shù)了。
Reduce
Reduce的任務(wù)是:拉取Map分類好的數(shù)據(jù)(這也并不完全是Reduce的職責(zé))、執(zhí)行具體的計(jì)算
拉取Map分類好的數(shù)據(jù)
之前說到,Map已經(jīng)將數(shù)據(jù)分類,我們直接拉取Reduce需要的數(shù)據(jù)就好了;但是要注意的是,我們是在一個(gè)分布式的環(huán)境中執(zhí)行的任務(wù),所以,Reduce的數(shù)據(jù)來源可能是多個(gè)Map中屬于自己的塊。
reduce:獲取map分發(fā)的數(shù)據(jù)
可以看到,Reduce按照Map分類的key拉取到了自己應(yīng)該處理的當(dāng)日數(shù)據(jù)。
執(zhí)行具體的計(jì)算
Reduce在拿到所有自己的數(shù)據(jù)之后,接下來就可以執(zhí)行自定義的計(jì)算邏輯了,最簡(jiǎn)單的就是計(jì)數(shù)、去重。
reduce:執(zhí)行具體的計(jì)算
可以看到,Reduce已經(jīng)完成了所需要的單日支出計(jì)算功能。
PS:
Map和Reduce的職責(zé)并不是完全絕對(duì)的,比如過濾操作可以在Map,也可以在Reduce,只是因?yàn)樵贛ap做可以減少傳輸?shù)臄?shù)據(jù)量,減少網(wǎng)絡(luò)IO壓力和時(shí)間消耗,所以做了上述的分工。
3. Hadoop序列化
3.1 序列化概述
1)什么是序列化
序列化就是把內(nèi)存中的對(duì)象,轉(zhuǎn)換成字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)以便于存儲(chǔ)到磁盤(持久化)和網(wǎng)絡(luò)傳輸。
反序列化就是將收到字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)或者是磁盤的持久化數(shù)據(jù),轉(zhuǎn)換成內(nèi)存中的對(duì)象。
2)為什么要序列化
一般來說,“活的”對(duì)象只生存在內(nèi)存里,關(guān)機(jī)斷電就沒有了。而且“活的”對(duì)象只能由本地的進(jìn)程使用,不能被發(fā)送到網(wǎng)絡(luò)上的另外一臺(tái)計(jì)算機(jī)。 然而序列化可以存儲(chǔ)“活的”對(duì)象,可以將“活的”對(duì)象發(fā)送到遠(yuǎn)程計(jì)算機(jī)。
3)為什么不用Java的序列化
Java的序列化是一個(gè)重量級(jí)序列化框架(Serializable),一個(gè)對(duì)象被序列化后,會(huì)附帶很多額外的信息(各種校驗(yàn)信息,Header,繼承體系等),不便于在網(wǎng)絡(luò)中高效傳輸。所以,Hadoop自己開發(fā)了一套序列化機(jī)制(Writable)。
4)Hadoop序列化特點(diǎn):
(1)緊湊 :高效使用存儲(chǔ)空間。
(2)快速:讀寫數(shù)據(jù)的額外開銷小。
(3)互操作:支持多語言的交互
3.2 自定義bean對(duì)象實(shí)現(xiàn)序列化接口(Writable)
在企業(yè)開發(fā)中往往常用的基本序列化類型不能滿足所有需求,比如在Hadoop框架內(nèi)部傳遞一個(gè)bean對(duì)象,那么該對(duì)象就需要實(shí)現(xiàn)序列化接口。
具體實(shí)現(xiàn)bean對(duì)象序列化步驟如下7步。
(1)必須實(shí)現(xiàn)Writable接口
(2)反序列化時(shí),需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有空參構(gòu)造
(3)重寫序列化方法
@Override public void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow); }(4)重寫反序列化方法
@Override public void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong(); }(5)注意反序列化的順序和序列化的順序完全一致
(6)要想把結(jié)果顯示在文件中,需要重寫toString(),可用"\t"分開,方便后續(xù)用。
(7)如果需要將自定義的bean放在key中傳輸,則還需要實(shí)現(xiàn)Comparable接口,因?yàn)镸apReduce框中的Shuffle過程要求對(duì)key必須能排序。。
諸佬們?nèi)绻肷钊肜斫庑蛄谢咐?#xff0c;可以參考硅谷的經(jīng)典wordcount案例,有時(shí)候經(jīng)常閱讀源碼也是一個(gè)非常好的習(xí)慣呢
4.MapReduce的框架原理
Hadoop 劃分工作為任務(wù)。有兩種類型的任務(wù):
Map 任務(wù) (分割及映射)
Reduce 任務(wù) (重排,還原)
如上所述完整的執(zhí)行流程(執(zhí)行 Map 和 Reduce 任務(wù))是由兩種類型的實(shí)體的控制,稱為Jobtracker : 就像一個(gè)主(負(fù)責(zé)提交的作業(yè)完全執(zhí)行)
多任務(wù)跟蹤器 : 充當(dāng)角色就像從機(jī),它們每個(gè)執(zhí)行工作
對(duì)于每一項(xiàng)工作提交執(zhí)行在系統(tǒng)中,有一個(gè) JobTracker 駐留在 Namenode 和 Datanode 駐留多個(gè) TaskTracker。
4.1 InputFormat數(shù)據(jù)輸入
4.1.1 切片與MapTask并行度決定機(jī)制
1)問題引出
MapTask的并行度決定Map階段的任務(wù)處理并發(fā)度,進(jìn)而影響到整個(gè)Job的處理速度。
思考:1G的數(shù)據(jù),啟動(dòng)8個(gè)MapTask,可以提高集群的并發(fā)處理能力。那么1K的數(shù)據(jù),也啟動(dòng)8個(gè)MapTask,會(huì)提高集群性能嗎?MapTask并行任務(wù)是否越多越好呢?哪些因素影響了MapTask并行度?
2)MapTask并行度決定機(jī)制
數(shù)據(jù)塊:Block是HDFS物理上把數(shù)據(jù)分成一塊一塊。數(shù)據(jù)塊是HDFS存儲(chǔ)數(shù)據(jù)單位。
數(shù)據(jù)切片:數(shù)據(jù)切片只是在邏輯上對(duì)輸入進(jìn)行分片,并不會(huì)在磁盤上將其切分成片進(jìn)行存儲(chǔ)。數(shù)據(jù)切片是MapReduce程序計(jì)算輸入數(shù)據(jù)的單位,一個(gè)切片會(huì)對(duì)應(yīng)啟動(dòng)一個(gè)MapTask。
4.1.2 Job提交流程源碼和切片源碼詳解
1)Job提交流程源碼詳解 waitForCompletion()submit();// 1建立連接connect(); // 1)創(chuàng)建提交Job的代理new Cluster(getConfiguration());// (1)判斷是本地運(yùn)行環(huán)境還是yarn集群運(yùn)行環(huán)境initialize(jobTrackAddr, conf); // 2 提交job submitter.submitJobInternal(Job.this, cluster)// 1)創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);// 2)獲取jobid ,并創(chuàng)建Job路徑JobID jobId = submitClient.getNewJobID();// 3)拷貝jar包到集群 copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir);// 4)計(jì)算切片,生成切片規(guī)劃文件 writeSplits(job, submitJobDir);maps = writeNewSplits(job, jobSubmitDir);input.getSplits(job);// 5)向Stag路徑寫XML配置文件 writeConf(conf, submitJobFile);conf.writeXml(out);// 6)提交Job,返回提交狀態(tài) status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());5.MapReduce開發(fā)總結(jié)
1)輸入數(shù)據(jù)接口:InputFormat
(1)默認(rèn)使用的實(shí)現(xiàn)類是:TextInputFormat
(2)TextInputFormat的功能邏輯是:一次讀一行文本,然后將該行的起始偏移量作為key,行內(nèi)容作為value返回。
(3)CombineTextInputFormat可以把多個(gè)小文件合并成一個(gè)切片處理,提高處理效率。
2)邏輯處理接口:Mapper
用戶根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)其中三個(gè)方法:map() setup() cleanup ()
3)Partitioner分區(qū)
(1)有默認(rèn)實(shí)現(xiàn) HashPartitioner,邏輯是根據(jù)key的哈希值和numReduces來返回一個(gè)分區(qū)號(hào);key.hashCode()&Integer.MAXVALUE % numReduces
(2)如果業(yè)務(wù)上有特別的需求,可以自定義分區(qū)。
4)Comparable排序
(1)當(dāng)我們用自定義的對(duì)象作為key來輸出時(shí),就必須要實(shí)現(xiàn)WritableComparable接口,重寫其中的compareTo()方法。
(2)部分排序:對(duì)最終輸出的每一個(gè)文件進(jìn)行內(nèi)部排序。
(3)全排序:對(duì)所有數(shù)據(jù)進(jìn)行排序,通常只有一個(gè)Reduce。
(4)二次排序:排序的條件有兩個(gè)。
5)Combiner合并
Combiner合并可以提高程序執(zhí)行效率,減少IO傳輸。但是使用時(shí)必須不能影響原有的業(yè)務(wù)處理結(jié)果。
6)邏輯處理接口:Reducer
用戶根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)其中三個(gè)方法:reduce() setup() cleanup ()
7)輸出數(shù)據(jù)接口:OutputFormat
(1)默認(rèn)實(shí)現(xiàn)類是TextOutputFormat,功能邏輯是:將每一個(gè)KV對(duì),向目標(biāo)文本文件輸出一行。
(2)用戶還可以自定義OutputFormat。
6.MapReduce常見面試題
最后博主再給諸佬奉上幾個(gè)常見面試題希望大家能三連一波。
1. Mapreduce 的 map 數(shù)量 和 reduce 數(shù)量是由什么決定的 ,怎么配置
map數(shù)量是由任務(wù)提交時(shí),傳來的切片信息決定的,切片有多少,map數(shù)量就有多少
科普:什么是切片?切片的數(shù)量怎么決定?
舉例:輸入路徑中有兩個(gè)文件,a.txt(130M),b.txt(1M),切片是一塊128M,但是不會(huì)跨越文件,每個(gè)文件單獨(dú)切片,所以這個(gè)路徑提交之后獲得的切片數(shù)量是3,大小分別是128M,2M,1M
reduce的數(shù)量是可以自己設(shè)置的
2. MapReduce優(yōu)化經(jīng)驗(yàn)
設(shè)置合理的map和reduce的個(gè)數(shù)。合理設(shè)置blocksize
避免出現(xiàn)數(shù)據(jù)傾斜
combine函數(shù)
對(duì)數(shù)據(jù)進(jìn)行壓縮
小文件處理優(yōu)化:事先合并成大文件,combineTextInputformat,在hdfs上用- mapreduce將小文件合并成SequenceFile大文件(key:文件名,value:文件內(nèi)容)
參數(shù)優(yōu)化
3. 分別舉例什么情況要使用 combiner,什么情況不使用?
求平均數(shù)的時(shí)候就不需要用combiner,因?yàn)椴粫?huì)減少reduce執(zhí)行數(shù)量,運(yùn)行結(jié)果也會(huì)出錯(cuò)。在其他的時(shí)候,可以依據(jù)情況,使用combiner,來減少map的輸出數(shù)量,減少拷貝到reduce的文件,從而減輕reduce的壓力,節(jié)省網(wǎng)絡(luò)開銷,提升執(zhí)行效率
4. MR運(yùn)行流程解析
這些都是必須記住的,面試經(jīng)常考
map操作
reduce操作
上面的流程是整個(gè)MapReduce最全工作流程,但是Shuffle過程只是從第7步開始到第16步結(jié)束,
5. suffle階段運(yùn)行流程(必背)
Map方法之后,Reduce方法之前的數(shù)據(jù)處理過程稱之為Shuffle。
具體Shuffle過程詳解,如下:
(1)MapTask收集我們的map()方法輸出的kv對(duì),放到內(nèi)存緩沖區(qū)中
(2)從內(nèi)存緩沖區(qū)不斷溢出本地磁盤文件,可能會(huì)溢出多個(gè)文件
(3)多個(gè)溢出文件會(huì)被合并成大的溢出文件
(4)在溢出過程及合并的過程中,都要調(diào)用Partitioner進(jìn)行分區(qū)和針對(duì)key進(jìn)行排序
(5)ReduceTask根據(jù)自己的分區(qū)號(hào),去各個(gè)MapTask機(jī)器上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)
(6)ReduceTask會(huì)抓取到同一個(gè)分區(qū)的來自不同MapTask的結(jié)果文件,ReduceTask會(huì)將這些文件再進(jìn)行合并(歸并排序)
(7)合并成大文件后,Shuffle的過程也就結(jié)束了,后面進(jìn)入ReduceTask的邏輯運(yùn)算過程(從文件中取出一個(gè)一個(gè)的鍵值對(duì)Group,調(diào)用用戶自定義的reduce()方法)
注意:
(1)Shuffle中的緩沖區(qū)大小會(huì)影響到MapReduce程序的執(zhí)行效率,原則上說,緩沖區(qū)越大,磁盤io的次數(shù)越少,執(zhí)行速度就越快。
(2)緩沖區(qū)的大小可以通過參數(shù)調(diào)整,參數(shù):mapreduce.task.io.sort.mb默認(rèn)100M。
6. map端的shuffle
(1)partition:
(1)過程:經(jīng)過map函數(shù)處理后輸出新的<key,value>,它首先被存儲(chǔ)到環(huán)形緩沖區(qū)的kvbuffer,環(huán)形緩沖區(qū)默認(rèn)是100M,并且對(duì)每個(gè)key/value對(duì)hash一個(gè)partition值,相同的partition值為同一個(gè)分區(qū)。
(2)作用:分區(qū)之后,每個(gè)reduce就會(huì)處理對(duì)應(yīng)的partition,減少reduce的壓力。
(2)sort/combiner/compress:
(1)過程:對(duì)環(huán)形緩沖區(qū)內(nèi)的partition值和key值進(jìn)行排序;如果用戶設(shè)置了combiner,會(huì)對(duì)每個(gè)partition中的數(shù)據(jù)進(jìn)行預(yù)處理,相當(dāng)于是map端的reduce;如果用戶設(shè)置了compress,會(huì)對(duì)combiner的數(shù)據(jù)進(jìn)行壓縮。
(2)作用:sort作用是在內(nèi)部排序,減少reduce的壓力;combiner作用是節(jié)省網(wǎng)絡(luò)帶寬和本地磁盤的IO;compress作用是減少本地磁盤的讀寫和減少reduce拷貝map端數(shù)據(jù)時(shí)的網(wǎng)絡(luò)帶寬。
(3)spill:
(1)過程:因?yàn)榄h(huán)形緩沖區(qū)的內(nèi)存不夠用,所以必須要寫到本地磁盤中。將排序好的數(shù)據(jù)spill到本地磁盤中。
(2)作用:數(shù)據(jù)量非常大,全部放到內(nèi)存不現(xiàn)實(shí),所以最后還是會(huì)存到本地磁盤中。
(4)merge:
(1)過程:因?yàn)闀?huì)產(chǎn)生多次spill,本身存放數(shù)據(jù)的out文件和存放數(shù)據(jù)偏移量索引index文件會(huì)產(chǎn)生多個(gè),把多個(gè)文件合并在一起。
(2)作用:方便reduce的一次性拷貝。
7. reduce端的shuffle
(1)copy:
(1)過程:reduce拷貝map最終的輸出的磁盤數(shù)據(jù),一個(gè)reduce拷貝每個(gè)map節(jié)點(diǎn)的相同partition數(shù)據(jù)。
(2)作用:拷貝后的數(shù)據(jù)不止一份,先進(jìn)行合并操作,為后面的排序做準(zhǔn)備。
(2)merge:
(1)過程:reduce拷貝map最終的輸出的磁盤數(shù)據(jù),一個(gè)reduce拷貝每個(gè)map節(jié)點(diǎn)的相同partition數(shù)據(jù)。
(2)作用:拷貝后的數(shù)據(jù)不止一份,先進(jìn)行合并操作,為后面的排序做準(zhǔn)備。
(3)、sort:這里和map端的一樣,但是reduce端的緩沖區(qū)更加靈活一點(diǎn),如果內(nèi)存夠用,就是內(nèi)存到內(nèi)存的merge,不夠用了就是內(nèi)存到磁盤的merge,最后是磁盤到磁盤的merge。
(4)、group:將排序好的數(shù)據(jù)進(jìn)行分組,分組默認(rèn)是將相同的key的value放在一起。作用是為了reduce函數(shù)更好的計(jì)算相同key值出現(xiàn)的次數(shù)。
8.Split
8.1 分片概念
這里的分片只是邏輯分片,根據(jù)文件的字節(jié)索引進(jìn)行分割。比如0—1MB位置定義為第一個(gè)分片,1MB-2MB定義為為第二個(gè)分片,依次類推……而原來的大文件還是原來的大文件,不會(huì)受到影響.
因此,輸入分片(input split)存儲(chǔ)的并非數(shù)據(jù)本身,而是一個(gè)分片長度和一個(gè)記錄數(shù)據(jù)的位置的數(shù)組。
8.2 分片數(shù)量與Map Task數(shù)量的關(guān)系
Map Task的個(gè)數(shù)等于split的個(gè)數(shù)。 mapreduce在處理大文件的時(shí)候,會(huì)根據(jù)一定的規(guī)則,把大文件劃分成多個(gè)分片,這樣能夠提高map的并行度。 劃分出來的就是InputSplit,每個(gè)map處理一個(gè)InputSplit,因此,有多少個(gè)InputSplit,就有多少個(gè)map task。
8.3 由誰來劃分分片?
主要是 InputFormat類 來負(fù)責(zé)劃分Split。InputFormat類有2個(gè)重要的作用:
1)將輸入的數(shù)據(jù)切分為多個(gè)邏輯上的InputSplit,其中每一個(gè)InputSplit作為一個(gè)map的輸入。
2)提供一個(gè)RecordReader,用于將InputSplit的內(nèi)容轉(zhuǎn)換為可以作為map輸入的k,v鍵值對(duì)。
FileInputFormat是InputFormat的子類,是使用比較廣泛的類,輸入格式如果是hdfs上的文件,基本上用的都是FileInputFormat的子類,如TextInputFormat用來處理普通的文件,SequceFileInputFormat用來處理Sequce格式文件。 FileInputFormat類中的getSplits(JobContext job)方法是劃分split的主要邏輯。
8.4 分片的大小
每個(gè)輸入分片的大小是固定的,默認(rèn)為128M。
分片大小范圍可以在mapred-site.xml中設(shè)置
8.5 默認(rèn)分片大小與Block分塊大小相同的原因是什么?
優(yōu)點(diǎn)就是可以實(shí)現(xiàn)分塊優(yōu)化,減少網(wǎng)絡(luò)傳輸數(shù)據(jù),使用本地?cái)?shù)據(jù)運(yùn)行map任務(wù)。
如果分片跨越兩個(gè)數(shù)據(jù)塊的話,對(duì)于任何一個(gè)HDFS節(jié)點(diǎn),分片中的另外一塊數(shù)據(jù)就需要通過網(wǎng)絡(luò)傳輸?shù)組ap任務(wù)節(jié)點(diǎn),效率更低!
總結(jié)
以上是生活随笔為你收集整理的Hadoop生态之Mapreduce的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: WP_支持XP的Windows Phon
- 下一篇: linux屏幕怎么暗一点,Ubuntu