shuffle的工作原理
Shuffle的正常意思是洗牌或弄亂,可能大家更熟悉的是Java API里的Collections.shuffle(List)方法,它會隨機地打亂參數(shù)list里的元素順序。如果你不知道MapReduce里Shuffle是什么,那么請看這張圖:?
????????這張是官方對Shuffle過程的描述。但我可以肯定的是,單從這張圖你基本不可能明白Shuffle的過程,因為它與事實相差挺多,細節(jié)也是錯亂的。后面我會具體描述Shuffle的事實情況,所以這里你只要清楚Shuffle的大致范圍就成-怎樣把map task的輸出結(jié)果有效地傳送到reduce端。也可以這樣理解, Shuffle描述著數(shù)據(jù)從map task輸出到reduce task輸入的這段過程。?
??????? 在Hadoop這樣的集群環(huán)境中,大部分map task與reduce task的執(zhí)行是在不同的節(jié)點上。當然很多情況下Reduce執(zhí)行時需要跨節(jié)點去拉取其它節(jié)點上的map task結(jié)果。如果集群正在運行的job有很多,那么task的正常執(zhí)行對集群內(nèi)部的網(wǎng)絡資源消耗會很嚴重。這種網(wǎng)絡消耗是正常的,我們不能限制,能做的就是最大化地減少不必要的消耗。還有在節(jié)點內(nèi),相比于內(nèi)存,磁盤IO對job完成時間的影響也是可觀的。從最基本的要求來說,我們對Shuffle過程的期望可以有:?
- 完整地從map task端拉取數(shù)據(jù)到reduce 端。
- 在跨節(jié)點拉取數(shù)據(jù)時,盡可能地減少對帶寬的不必要消耗。
- 減少磁盤IO對task執(zhí)行的影響。
??????? OK,看到這里時,大家可以先停下來想想,如果是自己來設計這段Shuffle過程,那么你的設計目標是什么。我想能優(yōu)化的地方主要在于減少拉取數(shù)據(jù)的量及盡量使用內(nèi)存而不是磁盤。?
??????? 我的分析是基于Hadoop0.21.0的源碼,如果與你所認識的Shuffle過程有差別,不吝指出。我會以WordCount為例,并假設它有8個map task和3個reduce task。從上圖看出,Shuffle過程橫跨map與reduce兩端,所以下面我也會分兩部分來展開。?
??????? 先看看map端的情況,如下圖:?
?
???????
??????? 上圖可能是某個map task的運行情況。拿它與官方圖的左半邊比較,會發(fā)現(xiàn)很多不一致。官方圖沒有清楚地說明partition, sort與combiner到底作用在哪個階段。我畫了這張圖,希望讓大家清晰地了解從map數(shù)據(jù)輸入到map端所有數(shù)據(jù)準備好的全過程。?
??????? 整個流程我分了四步。簡單些可以這樣說,每個map task都有一個內(nèi)存緩沖區(qū),存儲著map的輸出結(jié)果,當緩沖區(qū)快滿的時候需要將緩沖區(qū)的數(shù)據(jù)以一個臨時文件的方式存放到磁盤,當整個map task結(jié)束后再對磁盤中這個map task產(chǎn)生的所有臨時文件做合并,生成最終的正式輸出文件,然后等待reduce task來拉數(shù)據(jù)。?
??????? 當然這里的每一步都可能包含著多個步驟與細節(jié),下面我對細節(jié)來一一說明:?
1.??????? 在map task執(zhí)行時,它的輸入數(shù)據(jù)來源于HDFS的block,當然在MapReduce概念中,map task只讀取split。Split與block的對應關系可能是多對一,默認是一對一。在WordCount例子里,假設map的輸入數(shù)據(jù)都是像“aaa”這樣的字符串。?
2.??????? 在經(jīng)過mapper的運行后,我們得知mapper的輸出是這樣一個key/value對: key是“aaa”, value是數(shù)值1。因為當前map端只做加1的操作,在reduce task里才去合并結(jié)果集。前面我們知道這個job有3個reduce task,到底當前的“aaa”應該交由哪個reduce去做呢,是需要現(xiàn)在決定的。?
??????? MapReduce提供Partitioner接口,它的作用就是根據(jù)key或value及reduce的數(shù)量來決定當前的這對輸出數(shù)據(jù)最終應該交由哪個reduce task處理。默認對key hash后再以reduce task數(shù)量取模。默認的取模方式只是為了平均reduce的處理能力,如果用戶自己對Partitioner有需求,可以訂制并設置到job上。?
??????? 在我們的例子中,“aaa”經(jīng)過Partitioner后返回0,也就是這對值應當交由第一個reducer來處理。接下來,需要將數(shù)據(jù)寫入內(nèi)存緩沖區(qū)中,緩沖區(qū)的作用是批量收集map結(jié)果,減少磁盤IO的影響。我們的key/value對以及Partition的結(jié)果都會被寫入緩沖區(qū)。當然寫入之前,key與value值都會被序列化成字節(jié)數(shù)組。?
??????? 整個內(nèi)存緩沖區(qū)就是一個字節(jié)數(shù)組,它的字節(jié)索引及key/value存儲結(jié)構(gòu)我沒有研究過。如果有朋友對它有研究,那么請大致描述下它的細節(jié)吧。?
3.??????? 這個內(nèi)存緩沖區(qū)是有大小限制的,默認是100MB。當map task的輸出結(jié)果很多時,就可能會撐爆內(nèi)存,所以需要在一定條件下將緩沖區(qū)中的數(shù)據(jù)臨時寫入磁盤,然后重新利用這塊緩沖區(qū)。這個從內(nèi)存往磁盤寫數(shù)據(jù)的過程被稱為Spill,中文可譯為溢寫,字面意思很直觀。這個溢寫是由單獨線程來完成,不影響往緩沖區(qū)寫map結(jié)果的線程。溢寫線程啟動時不應該阻止map的結(jié)果輸出,所以整個緩沖區(qū)有個溢寫的比例spill.percent。這個比例默認是0.8,也就是當緩沖區(qū)的數(shù)據(jù)已經(jīng)達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啟動,鎖定這80MB的內(nèi)存,執(zhí)行溢寫過程。Map task的輸出結(jié)果還可以往剩下的20MB內(nèi)存中寫,互不影響。?
??????? 當溢寫線程啟動后,需要對這80MB空間內(nèi)的key做排序(Sort)。排序是MapReduce模型默認的行為,這里的排序也是對序列化的字節(jié)做的排序。?
??????? 在這里我們可以想想,因為map task的輸出是需要發(fā)送到不同的reduce端去,而內(nèi)存緩沖區(qū)沒有對將發(fā)送到相同reduce端的數(shù)據(jù)做合并,那么這種合并應該是體現(xiàn)是磁盤文件中的。從官方圖上也可以看到寫到磁盤中的溢寫文件是對不同的reduce端的數(shù)值做過合并。所以溢寫過程一個很重要的細節(jié)在于,如果有很多個key/value對需要發(fā)送到某個reduce端去,那么需要將這些key/value值拼接到一塊,減少與partition相關的索引記錄。?
??????? 在針對每個reduce端而合并數(shù)據(jù)時,有些數(shù)據(jù)可能像這樣:“aaa”/1, “aaa”/1。對于WordCount例子,就是簡單地統(tǒng)計單詞出現(xiàn)的次數(shù),如果在同一個map task的結(jié)果中有很多個像“aaa”一樣出現(xiàn)多次的key,我們就應該把它們的值合并到一塊,這個過程叫reduce也叫combine。但MapReduce的術語中,reduce只指reduce端執(zhí)行從多個map task取數(shù)據(jù)做計算的過程。除reduce外,非正式地合并數(shù)據(jù)只能算做combine了。其實大家知道的,MapReduce中將Combiner等同于Reducer。?
??????? 如果client設置過Combiner,那么現(xiàn)在就是使用Combiner的時候了。將有相同key的key/value對的value加起來,減少溢寫到磁盤的數(shù)據(jù)量。Combiner會優(yōu)化MapReduce的中間結(jié)果,所以它在整個模型中會多次使用。那哪些場景才能使用Combiner呢?從這里分析,Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結(jié)果。所以從我的想法來看,Combiner只應該用于那種Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結(jié)果的場景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它對job執(zhí)行效率有幫助,反之會影響reduce的最終結(jié)果。?
4.??????? 每次溢寫會在磁盤上生成一個溢寫文件,如果map的輸出結(jié)果真的很大,有多次這樣的溢寫發(fā)生,磁盤上相應的就會有多個溢寫文件存在。當map task真正完成時,內(nèi)存緩沖區(qū)中的數(shù)據(jù)也全部溢寫到磁盤中形成一個溢寫文件。最終磁盤中會至少有一個這樣的溢寫文件存在(如果map的輸出結(jié)果很少,當map執(zhí)行完成時,只會產(chǎn)生一個溢寫文件),因為最終的文件只有一個,所以需要將這些溢寫文件歸并到一起,這個過程就叫做Merge。Merge是怎樣的?如前面的例子,“aaa”從某個map task讀取過來時值是5,從另外一個map 讀取時值是8,因為它們有相同的key,所以得merge成group。什么是group。對于“aaa”就是像這樣的:{“aaa”, [5, 8, 2, …]},數(shù)組中的值就是從不同溢寫文件中讀取出來的,然后再把這些值加起來。請注意,因為merge是將多個溢寫文件合并到一個文件,所以可能也有相同的key存在,在這個過程中如果client設置過Combiner,也會使用Combiner來合并相同的key。?
??????? 至此,map端的所有工作都已結(jié)束,最終生成的這個文件也存放在TaskTracker夠得著的某個本地目錄內(nèi)。每個reduce task不斷地通過RPC從JobTracker那里獲取map task是否完成的信息,如果reduce task得到通知,獲知某臺TaskTracker上的map task執(zhí)行完成,Shuffle的后半段過程開始啟動。?
??????? 簡單地說,reduce task在執(zhí)行之前的工作就是不斷地拉取當前job里每個map task的最終結(jié)果,然后對從不同地方拉取過來的數(shù)據(jù)不斷地做merge,也最終形成一個文件作為reduce task的輸入文件。見下圖:?
??????? 如map 端的細節(jié)圖,Shuffle在reduce端的過程也能用圖上標明的三點來概括。當前reduce copy數(shù)據(jù)的前提是它要從JobTracker獲得有哪些map task已執(zhí)行結(jié)束,這段過程不表,有興趣的朋友可以關注下。Reducer真正運行之前,所有的時間都是在拉取數(shù)據(jù),做merge,且不斷重復地在做。如前面的方式一樣,下面我也分段地描述reduce 端的Shuffle細節(jié):?
1.??????? Copy過程,簡單地拉取數(shù)據(jù)。Reduce進程啟動一些數(shù)據(jù)copy線程(Fetcher),通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。因為map task早已結(jié)束,這些文件就歸TaskTracker管理在本地磁盤中。?
2.??????? Merge階段。這里的merge如map端的merge動作,只是數(shù)組中存放的是不同map端copy來的數(shù)值。Copy過來的數(shù)據(jù)會先放入內(nèi)存緩沖區(qū)中,這里的緩沖區(qū)大小要比map端的更為靈活,它基于JVM的heap size設置,因為Shuffle階段Reducer不運行,所以應該把絕大部分的內(nèi)存都給Shuffle用。這里需要強調(diào)的是,merge有三種形式:1)內(nèi)存到內(nèi)存? 2)內(nèi)存到磁盤? 3)磁盤到磁盤。默認情況下第一種形式不啟用,讓人比較困惑,是吧。當內(nèi)存中的數(shù)據(jù)量到達一定閾值,就啟動內(nèi)存到磁盤的merge。與map 端類似,這也是溢寫的過程,這個過程中如果你設置有Combiner,也是會啟用的,然后在磁盤中生成了眾多的溢寫文件。第二種merge方式一直在運行,直到?jīng)]有map端的數(shù)據(jù)時才結(jié)束,然后啟動第三種磁盤到磁盤的merge方式生成最終的那個文件。?
3.??????? Reducer的輸入文件。不斷地merge后,最后會生成一個“最終文件”。為什么加引號?因為這個文件可能存在于磁盤上,也可能存在于內(nèi)存中。對我們來說,當然希望它存放于內(nèi)存中,直接作為Reducer的輸入,但默認情況下,這個文件是存放于磁盤中的。至于怎樣才能讓這個文件出現(xiàn)在內(nèi)存中,之后的性能優(yōu)化篇我再說。當Reducer的輸入文件已定,整個Shuffle才最終結(jié)束。然后就是Reducer執(zhí)行,把結(jié)果放到HDFS上。
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
我們按照圖中的1234步逐步進行說明:
①在map端首先接觸的是InputSplit,在InputSplit中含有DataNode中的數(shù)據(jù),每一個InputSplit都會分配一個Mapper任務。
②當key/value被寫入緩沖區(qū)之前,都會被序列化為字節(jié)流。mapreduce提供Partitioner接口,它的作用就是根據(jù)key或value及reduce的數(shù)量來決定當前的這對輸出數(shù)據(jù)最終應該交由哪個reduce task處理(分區(qū))。默認對key hash后再以reduce task數(shù)量取模。默認的取模方式只是為了平均reduce的處理能力,如果用戶自己對Partitioner有需求,可以訂制并設置到job上。
注意:雖然Partitioner接口會計算出一個值來決定某個輸出會交給哪個reduce去處理,但是在緩沖區(qū)中并不會實現(xiàn)物理上的分區(qū),而是將結(jié)果加載key-value后面。物理上的分區(qū)實在磁盤上進行的。
每個map有一個環(huán)形內(nèi)存緩沖區(qū),用于存儲任務的輸出。默認大小100MB(io.sort.mb屬性)。
③一旦達到閥值80%(io.sort.spil l.percent),一個后臺線程就把內(nèi)容寫到(spill:溢寫)Linux本地磁盤中的指定目錄(mapred.local.dir)下的新建的一個溢出寫文件。在這一步會執(zhí)行兩個操作排序和Combiner(前提是設置了Combiner)。
這里大家可能會出現(xiàn)疑問:是將哪部分溢寫到磁盤上那?答案是,溢寫線程啟動時,會鎖定這80M的內(nèi)存,執(zhí)行溢寫過程。而剩余的那20M緩沖區(qū)會繼續(xù)接收map的輸出,直到緩沖區(qū)寫滿,Map 才會被阻塞直到spill 完成。spill操作和接收map輸出的操作是兩個獨立的線程,故互不影響。
spill 線程在把緩沖區(qū)的數(shù)據(jù)寫到磁盤前,會對它進行一個二次快速排序,首先根據(jù)數(shù)據(jù)所屬的partition (分區(qū))排序,然后每個partition 中再按Key 排序。輸出包括一個索引文件和數(shù)據(jù)文件。如果設定了Combiner,將在排序輸出的基礎上運行。Combiner 就是一個簡單Reducer操作,它在執(zhí)行Map 任務的節(jié)點本身運行,先對Map 的輸出做一次簡單Reduce,使得Map 的輸出更緊湊,更少的數(shù)據(jù)會被寫入磁盤和傳送到Reducer。spill 文件保存在由mapred.local.dir指定的目錄中,map 任務結(jié)束后刪除。
每次溢寫會在磁盤上生成一個溢寫文件,如果map的輸出結(jié)果很大,有多次這樣的溢寫發(fā)生,磁盤上相應的就會有多個溢寫文件存在。而如果map的輸出很小以至于最終也沒有到達閥值,那最后會將其緩沖區(qū)的內(nèi)容寫入磁盤。
④因為最終的文件只有一個,所以需要將這些溢寫文件歸并到一起,
這個過程就叫做Merge。因為merge是將多個溢寫文件合并到一個文件,所以可能也有相同的key存在,在這個過程中如果client設置過Combiner,也會使用Combiner來合并相同的key。
從這里我們可以得出,溢寫操作是寫到了磁盤上,并不一定就是最終的結(jié)果,因為最終結(jié)果是要只有一個文件,除非其map的輸出很小以至于沒有沒有發(fā)生過溢寫(也就是說磁盤上只有一個文件)。
到這里,map端的shuffle就全部完成了。
?
reduce端的shuffle:
?
map完成后,會通過心跳將信息傳給tasktracker,其進而通知jobtracker,reduce task不斷地通過RPC從JobTracker那里獲取map task是否完成的信息,當?shù)弥硞€TaskTracker上的map task執(zhí)行完成,Reduce端的shuffle就開始工作了。
注意:這里是reduce端的shuffle開始工作,而不是reduce操作開始執(zhí)行,在shuffle階段reduce不會運行。
同樣我們按照圖中的標號,分為三個階段進行講解。
**①**Copy階段:reduce端默認有5個數(shù)據(jù)復制線程從map端復制數(shù)據(jù),其通過Http方式得到Map對應分區(qū)的輸出文件。reduce端并不是等map端執(zhí)行完后將結(jié)果傳來,而是直接去map端去Copy輸出文件。
**②**Merge階段:reduce端的shuffle也有一個環(huán)形緩沖區(qū),它的大小要比map端的靈活(由JVM的heapsize設置),由Copy階段獲得的數(shù)據(jù),會存放的這個緩沖區(qū)中,同樣,當?shù)竭_閥值時會發(fā)生溢寫操作,這個過程中如果設置了Combiner也是會執(zhí)行的,這個過程會一直執(zhí)行直到所有的map輸出都被復制過來,如果形成了多個磁盤文件還會進行合并,最后一次合并的結(jié)果作為reduce的輸入而不是寫入到磁盤中。
③當Reducer的輸入文件確定后,整個Shuffle操作才最終結(jié)束。之后就是Reducer的執(zhí)行了,最后Reducer會把結(jié)果存到HDFS上。
轉(zhuǎn)載于:https://www.cnblogs.com/HHR-SUN/p/10478216.html
總結(jié)
以上是生活随笔為你收集整理的shuffle的工作原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。