总结:详细讲解MapReduce过程(整理补充)
從啟動和資源調度來看MapReduce過程
Hadoop 1.x版本
首先-先了解一下必知概念
From:MapReduce工作原理圖文詳解,JobTracker和TaskTracker概述
- 客戶端(Client):編寫mapreduce程序,配置作業,提交作業,這就是程序員完成的工作;
- JobTracker:JobTracker是一個后臺服務進程,啟動之后,會一直監聽并接收來自各個TaskTracker發送的心跳信息,包括資源使用情況和任務運行情況等信息。
- 作業控制:在hadoop中每個應用程序被表示成一個作業,每個作業又被分成多個任務,JobTracker的作業控制模塊則負責作業的分解和狀態監控。
- 狀態監控:主要包括TaskTracker狀態監控、作業狀態監控和任務狀態監控。主要作用:容錯和為任務調度提供決策依據。
- JobTracker只有一個,他負責了任務的信息采集整理,你就把它當做包工頭把,這個和采用Master/Slave結構中的Master保持一致
- JobTracker 對應于 NameNode
- 一般情況應該把JobTracker部署在單獨的機器上
- TaskTracker:TaskTracker是JobTracker和Task之間的橋梁。TaskTracker與JobTracker和Task之間采用了RPC協議進行通信。
- 從JobTracker接收并執行各種命令:運行任務、提交任務、殺死任務等
- 將本地節點上各個任務的狀態通過心跳周期性匯報給JobTracker,節點健康情況、資源使用情況,任務執行進度、任務運行狀態等,比如說map task我做完啦,你什么時候讓reduce task過來拉數據啊
- TaskTracker是運行在多個節點上的slaver服務。TaskTracker主動與JobTracker通信,接收作業,并負責直接執行每一個任務。
- TaskTracker都需要運行在HDFS的DataNode上
- HDFS:保存作業的數據、配置信息等等,最后的結果也是保存在hdfs上面
- NameNode: 管理文件目錄結構,接受用戶的操作請求,管理數據節點(DataNode)
- DataNode:是HDFS中真正存儲數據的
- Block:是hdfs讀寫數據的基本單位,默認64MB大小,就是說如果你有130MB數據,那就要分成三個block,兩個存放64MB,最后一個存放2MB數據,雖然最后一個block塊是64MB,但實際上占用空間為2MB
- Sencondary NameNode:它的目的是幫助 NameNode 合并編輯日志,減少 NameNode 啟動時間,在文件系統中設置一個檢查點來幫助NameNode更好的工作。它不是要取代掉NameNode也不是NameNode的備份??蓞⒖糞econdary NameNode:它究竟有什么作用?
其次-走一遍流程
規范是:
- 步驟中涉及的知識點
- 知識點的補充
按照這樣的規范進行講解
在客戶端啟動一個作業。拿個比方說,我提交了一個hive程序
- 客戶端(Client):編寫mapreduce程序,配置作業,提交作業,這就是程序員完成的工作;
向JobTracker請求一個Job ID,就像你排隊買車一樣,你不得搖個號啊,沒有這個號你就不能買車(執行任務)。
- JobTracker:JobTracker是一個后臺服務進程,啟動之后,會一直監聽并接收來自各個TaskTracker發送的心跳信息,包括資源使用情況和任務運行情況等信息。
- 作業控制:在hadoop中每個應用程序被表示成一個作業,每個作業又被分成多個任務,JobTracker的作業控制模塊則負責作業的分解和狀態監控。
- 狀態監控:主要包括TaskTracker狀態監控、作業狀態監控和任務狀態監控。主要作用:容錯和為任務調度提供決策依據。
- JobTracker只有一個,他負責了任務的信息采集整理,你就把它當做包工頭把,這個和采用Master/Slave結構中的Master保持一致
- JobTracker 對應于 NameNode
- 一般情況應該把JobTracker部署在單獨的機器上
將運行作業所需要的資源文件復制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客戶端計算所得的輸入劃分信息。這些文件都存放在JobTracker專門為該作業創建的文件夾中。文件夾名為該作業的Job ID。JAR文件默認會有10個副本(mapred.submit.replication屬性控制);輸入劃分信息(Split)告訴了JobTracker應該為這個作業啟動多少個map任務等信息。
- HDFS:保存作業的數據、配置信息等等,最后的結果也是保存在hdfs上面
- NameNode: 管理文件目錄結構,接受用戶的操作請求,管理數據節點(DataNode)
- DataNode:是HDFS中真正存儲數據的
- Block:是hdfs讀寫數據的基本單位,默認64MB大小,就是說如果你有130MB數據,那就要分成三個block,兩個存放64MB,最后一個存放2MB數據,雖然最后一個block塊是64MB,但實際上占用空間為2MB
- Sencondary NameNode:它的目的是幫助 NameNode 合并編輯日志,減少 NameNode 啟動時間,在文件系統中設置一個檢查點來幫助NameNode更好的工作。它不是要取代掉NameNode也不是NameNode的備份??蓞⒖糩Secondary NameNode:它究竟有什么作用?
JobTracker接收到作業后,將其放在一個作業隊列里(一般來說,公司部門都與自己的隊列,默認的調度方法是FIFO,也就是first in first out-隊列),等待作業調度器對其進行調度,當作業調度器根據自己的調度算法調度到該作業時,會根據輸入劃分信息(Split)為每個劃分創建一個map任務,并將map任務分配給TaskTracker執行。對于map和reduce任務,TaskTracker根據主機核的數量和內存的大小有固定數量的map槽和reduce槽。這里需要強調的是:map任務不是隨隨便便地分配給某個TaskTracker的,這里有個概念叫:數據本地化(Data-Local)。意思是:將map任務分配給含有該map處理的數據塊的TaskTracker上,同時將程序JAR包復制到該TaskTracker上來運行,這叫“運算移動,數據不移動”。而分配reduce任務時并不考慮數據本地化。
- TaskTracker:TaskTracker是JobTracker和Task之間的橋梁。TaskTracker與JobTracker和Task之間采用了RPC協議進行通信。
- 從JobTracker接收并執行各種命令:運行任務、提交任務、殺死任務等
- 將本地節點上各個任務的狀態通過心跳周期性匯報給JobTracker,節點健康情況、資源使用情況,任務執行進度、任務運行狀態等,比如說map task我做完啦,你什么時候讓reduce task過來拉數據啊
- TaskTracker是運行在多個節點上的slaver服務。TaskTracker主動與JobTracker通信,接收作業,并負責直接執行每一個任務。
- TaskTracker都需要運行在HDFS的DataNode上
TaskTracker每隔一段時間會給JobTracker發送一個心跳,告訴JobTracker它依然在運行,同時心跳中還攜帶著很多的信息,比如當前map任務完成的進度等信息。當JobTracker收到作業的最后一個任務完成信息時,便把該作業設置成“成功”。當JobClient查詢狀態時,它將得知任務已完成,便顯示一條消息給用戶。
Hadoop 2.x版本
相較于1.x的版本,目前絕大多數公司用的都是基于2.x的版本,很大的區別就在于使用了Yarn作為了資源管理器,可以使不同的計算框架運行與同一個資源調度器下,而且也解決了1.x版本中JobTracker壓力過大,無法擴展及NameNode單點故障等問題。
首先闡述幾個重要概念
包含主要的組件:定時調用器(Scheduler)以及應用管理器(ApplicationManager)。
- 定時調度器(Scheduler):從本質上來說,定時調度器就是一種策略,或者說一種算法。當 Client 提交一個任務的時候,它會根據所需要的資源以及當前集群的資源狀況進行分配。注意,它只負責向應用程序分配資源,并不做監控以及應用程序的狀態跟蹤。
- 應用管理器(ApplicationManager):同樣,聽名字就能大概知道它是干嘛的。應用管理器就是負責管理 Client 用戶提交的應用。上面不是說到定時調度器(Scheduler)不對用戶提交的程序監控嘛,其實啊,監控應用的工作正是由應用管理器(ApplicationManager)完成的。
- 每當 Client 提交一個 Application 時候,就會新建一個 ApplicationMaster 。由這個 ApplicationMaster 去與 ResourceManager 申請容器資源,獲得資源后會將要運行的程序發送到容器上啟動,然后進行分布式計算。
- NodeManager 是 ResourceManager 在每臺機器的上代理,負責容器的管理,并監控他們的資源使用情況(cpu,內存,磁盤及網絡等),以及向 ResourceManager/Scheduler 提供這些資源使用報告。
執行過程如圖所示
From:https://blog.csdn.net/lb812913059/article/details/79897863
通過submit或者waitForCompletion提交作業,waitForCompletion()方法通過每秒循環輪轉作業進度如果發現與上次報告有改變,則將進度報告發送到控制臺
向ResourceManager申請Application ID,RM檢查輸入輸出說明、計算輸入分片
復制作業的資源文件,將作業信息(jar、配置文件、分片信息)復制到HDFS上用戶的應用緩存目錄中
通過submitApplication()方法提交作業到資源管理器
資源管理器在收到submitApplication()消息后,將請求傳遞給調度器(Scheduler)
調度器為其分配一個容器Container,然后RM在NM的管理下在container中啟動程序的ApplicationMaster進程
ApplicationMaster對作業進行初始化,創建過個薄記對象以跟蹤作業進度
是一個java應用程序,他的主類是MRAppmaster
ApplicationMaster接受來自HDFS在客戶端計算的輸入分片
對每一個分片創建一個map任務,任務對象,由mapreduce.job.reduces屬性設置reduce個數
- uber模式:當任務小的時候就會啟動一個JVM運行MapReduce作業,這在MapReduce1中是不允許的;這樣的作業在YARN中成為uber作業,通過設置mapreduce.job.ubertask.enable設置為false使用;那什么是小任務呢?當小于10個mapper且只有1個reducer且輸入大小小于一個HDFS塊的任務
如果作業不適合uber任務運行,ApplicationMaster就會為所有的map任務和reduce任務向資源管理器申請容器
請求為任務指定內存需求,map任務和reduce任務的默認都會申請1024MB的內存
資源管理器為任務分配了容器,ApplicationMaster就通過節點管理器啟動容器。
該任務由主類YarnChild的java應用程序執行。
運行任務之前,首先將資源本地化,包括作業配置、jar文件和所有來自分布式緩存的文件
最后執行map任務和reduce任務
用人話描述大致描述一下
首先Client向ResourceManager (RM)提交一個Application,RM找了下下資源比較豐富的NodeManager(NM),要求他開辟一個container來啟動ApplicationMaster(AM), AM收集到啟動任務需要用到的資源量(如申請的map的個數依賴于Input Split的大小),將所需要的資源量向RM提交,RM通過一個資源列表的方式選擇一些資源相對豐富的NM返回,告訴AM哪一些節點NM可用啟動任務,AM開始和NM進行通信,告知啟動對應的map/reduce任務;NM開辟一些列的container來執行這些任務,和AM保持通信。一旦所有任務執行結束,AM向Client輸出結果并向RM注銷自己
以提交一個完整的mapreduce任務來演示,這里用hive
<span style="color:#000000"><code class="language-sql">hive<span style="color:#669900">></span> <span style="color:#c678dd">set</span> mapred<span style="color:#999999">.</span>job<span style="color:#999999">.</span>queue<span style="color:#999999">.</span>name<span style="color:#669900">=</span>root<span style="color:#999999">.</span>xxxxxxxxx<span style="color:#999999">;</span><span style="color:#669900">></span> <span style="color:#c678dd">insert</span> overwrite <span style="color:#c678dd">table</span> query_result<span style="color:#669900">></span> <span style="color:#c678dd">select</span> <span style="color:#669900">*</span> <span style="color:#c678dd">from</span> A <span style="color:#c678dd">join</span> B <span style="color:#c678dd">on</span> A<span style="color:#999999">.</span>name<span style="color:#669900">=</span>B<span style="color:#999999">.</span>name<span style="color:#999999">;</span> <span style="color:#5c6370"># 這里是提交一個job,這里提交hive任務</span>Query ID <span style="color:#669900">=</span> xx_20161216130626_40fda9ef<span style="color:#669900">-</span><span style="color:#98c379">386</span>f<span style="color:#669900">-</span><span style="color:#98c379">4</span>ef3<span style="color:#669900">-</span><span style="color:#98c379">9</span>e13<span style="color:#669900">-</span><span style="color:#98c379">7</span>b08ad69118c <span style="color:#5c6370"># 申請資源</span> Total jobs <span style="color:#669900">=</span> <span style="color:#98c379">3</span> Launching Job <span style="color:#98c379">1</span> <span style="color:#c678dd">out</span> <span style="color:#c678dd">of</span> <span style="color:#98c379">3</span> Number <span style="color:#c678dd">of</span> reduce tasks <span style="color:#669900">not</span> specified<span style="color:#999999">.</span> Estimated <span style="color:#c678dd">from</span> input <span style="color:#c678dd">data</span> size: <span style="color:#98c379">1</span> <span style="color:#5c6370"># reduce個數如果沒有設置,那就根據split來確定</span> <span style="color:#669900">In</span> <span style="color:#c678dd">order</span> <span style="color:#c678dd">to</span> change the average <span style="color:#c678dd">load</span> <span style="color:#c678dd">for</span> a reducer <span style="color:#999999">(</span><span style="color:#669900">in</span> bytes<span style="color:#999999">)</span>:<span style="color:#c678dd">set</span> hive<span style="color:#999999">.</span><span style="color:#c678dd">exec</span><span style="color:#999999">.</span>reducers<span style="color:#999999">.</span>bytes<span style="color:#999999">.</span>per<span style="color:#999999">.</span>reducer<span style="color:#669900">=</span><span style="color:#669900"><</span>number<span style="color:#669900">></span> <span style="color:#669900">In</span> <span style="color:#c678dd">order</span> <span style="color:#c678dd">to</span> <span style="color:#c678dd">limit</span> the maximum number <span style="color:#c678dd">of</span> reducers:<span style="color:#c678dd">set</span> hive<span style="color:#999999">.</span><span style="color:#c678dd">exec</span><span style="color:#999999">.</span>reducers<span style="color:#999999">.</span>max<span style="color:#669900">=</span><span style="color:#669900"><</span>number<span style="color:#669900">></span> <span style="color:#669900">In</span> <span style="color:#c678dd">order</span> <span style="color:#c678dd">to</span> <span style="color:#c678dd">set</span> a constant number <span style="color:#c678dd">of</span> reducers:<span style="color:#c678dd">set</span> mapreduce<span style="color:#999999">.</span>job<span style="color:#999999">.</span>reduces<span style="color:#669900">=</span><span style="color:#669900"><</span>number<span style="color:#669900">></span> <span style="color:#c678dd">Starting</span> Job <span style="color:#669900">=</span> job_1481285758114_429080<span style="color:#999999">,</span> Tracking URL <span style="color:#669900">=</span> http:<span style="color:#5c6370">//bigdata-hdp-apache500.xg01:8088/proxy/application_1481285758114_429080/</span> <span style="color:#c678dd">Kill</span> Command <span style="color:#669900">=</span> <span style="color:#669900">/</span>usr<span style="color:#669900">/</span><span style="color:#c678dd">local</span><span style="color:#669900">/</span>hadoop<span style="color:#669900">-</span><span style="color:#98c379">2.7</span><span style="color:#98c379">.2</span><span style="color:#669900">/</span>bin<span style="color:#669900">/</span>hadoop job <span style="color:#669900">-</span><span style="color:#c678dd">kill</span> job_1481285758114_429080 Hadoop job information <span style="color:#c678dd">for</span> Stage<span style="color:#669900">-</span><span style="color:#98c379">1</span>: number <span style="color:#c678dd">of</span> mappers: <span style="color:#98c379">2</span><span style="color:#999999">;</span> number <span style="color:#c678dd">of</span> reducers: <span style="color:#98c379">1</span> <span style="color:#98c379">2016</span><span style="color:#669900">-</span><span style="color:#98c379">12</span><span style="color:#669900">-</span><span style="color:#98c379">16</span> <span style="color:#98c379">13</span>:<span style="color:#98c379">06</span>:<span style="color:#98c379">39</span><span style="color:#999999">,</span><span style="color:#98c379">518</span> Stage<span style="color:#669900">-</span><span style="color:#98c379">1</span> map <span style="color:#669900">=</span> <span style="color:#98c379">0</span><span style="color:#669900">%</span><span style="color:#999999">,</span> reduce <span style="color:#669900">=</span> <span style="color:#98c379">0</span><span style="color:#669900">%</span> <span style="color:#98c379">2016</span><span style="color:#669900">-</span><span style="color:#98c379">12</span><span style="color:#669900">-</span><span style="color:#98c379">16</span> <span style="color:#98c379">13</span>:<span style="color:#98c379">06</span>:<span style="color:#98c379">50</span><span style="color:#999999">,</span><span style="color:#98c379">846</span> Stage<span style="color:#669900">-</span><span style="color:#98c379">1</span> map <span style="color:#669900">=</span> <span style="color:#98c379">50</span><span style="color:#669900">%</span><span style="color:#999999">,</span> reduce <span style="color:#669900">=</span> <span style="color:#98c379">0</span><span style="color:#669900">%</span><span style="color:#999999">,</span> Cumulative CPU <span style="color:#98c379">3.7</span> sec <span style="color:#98c379">2016</span><span style="color:#669900">-</span><span style="color:#98c379">12</span><span style="color:#669900">-</span><span style="color:#98c379">16</span> <span style="color:#98c379">13</span>:<span style="color:#98c379">06</span>:<span style="color:#98c379">55</span><span style="color:#999999">,</span><span style="color:#98c379">987</span> Stage<span style="color:#669900">-</span><span style="color:#98c379">1</span> map <span style="color:#669900">=</span> <span style="color:#98c379">100</span><span style="color:#669900">%</span><span style="color:#999999">,</span> reduce <span style="color:#669900">=</span> <span style="color:#98c379">0</span><span style="color:#669900">%</span><span style="color:#999999">,</span> Cumulative CPU <span style="color:#98c379">11.24</span> sec <span style="color:#98c379">2016</span><span style="color:#669900">-</span><span style="color:#98c379">12</span><span style="color:#669900">-</span><span style="color:#98c379">16</span> <span style="color:#98c379">13</span>:<span style="color:#98c379">07</span>:<span style="color:#98c379">13</span><span style="color:#999999">,</span><span style="color:#98c379">418</span> Stage<span style="color:#669900">-</span><span style="color:#98c379">1</span> map <span style="color:#669900">=</span> <span style="color:#98c379">100</span><span style="color:#669900">%</span><span style="color:#999999">,</span> reduce <span style="color:#669900">=</span> <span style="color:#98c379">67</span><span style="color:#669900">%</span><span style="color:#999999">,</span> Cumulative CPU <span style="color:#98c379">12.53</span> sec <span style="color:#98c379">2016</span><span style="color:#669900">-</span><span style="color:#98c379">12</span><span style="color:#669900">-</span><span style="color:#98c379">16</span> <span style="color:#98c379">13</span>:<span style="color:#98c379">07</span>:<span style="color:#98c379">28</span><span style="color:#999999">,</span><span style="color:#98c379">715</span> Stage<span style="color:#669900">-</span><span style="color:#98c379">1</span> map <span style="color:#669900">=</span> <span style="color:#98c379">100</span><span style="color:#669900">%</span><span style="color:#999999">,</span> reduce <span style="color:#669900">=</span> <span style="color:#98c379">82</span><span style="color:#669900">%</span><span style="color:#999999">,</span> Cumulative CPU <span style="color:#98c379">16.31</span> sec <span style="color:#98c379">2016</span><span style="color:#669900">-</span><span style="color:#98c379">12</span><span style="color:#669900">-</span><span style="color:#98c379">16</span> <span style="color:#98c379">13</span>:<span style="color:#98c379">07</span>:<span style="color:#98c379">58</span><span style="color:#999999">,</span><span style="color:#98c379">252</span> Stage<span style="color:#669900">-</span><span style="color:#98c379">1</span> map <span style="color:#669900">=</span> <span style="color:#98c379">100</span><span style="color:#669900">%</span><span style="color:#999999">,</span> reduce <span style="color:#669900">=</span> <span style="color:#98c379">100</span><span style="color:#669900">%</span><span style="color:#999999">,</span> Cumulative CPU <span style="color:#98c379">17.7</span> sec MapReduce Total cumulative CPU <span style="color:#c678dd">time</span>: <span style="color:#98c379">17</span> seconds <span style="color:#98c379">700</span> msec Ended Job <span style="color:#669900">=</span> job_1481285758114_429080 Stage<span style="color:#669900">-</span><span style="color:#98c379">4</span> <span style="color:#669900">is</span> selected <span style="color:#c678dd">by</span> condition resolver<span style="color:#999999">.</span> Stage<span style="color:#669900">-</span><span style="color:#98c379">3</span> <span style="color:#669900">is</span> filtered <span style="color:#c678dd">out</span> <span style="color:#c678dd">by</span> condition resolver<span style="color:#999999">.</span> Stage<span style="color:#669900">-</span><span style="color:#98c379">5</span> <span style="color:#669900">is</span> filtered <span style="color:#c678dd">out</span> <span style="color:#c678dd">by</span> condition resolver<span style="color:#999999">.</span> Moving <span style="color:#c678dd">data</span> <span style="color:#c678dd">to</span>: hdfs:<span style="color:#5c6370">//mycluster-tj/tmp/hive-staging/hadoop_hive_2016-12-16_13-06-26_309_6781206579090170211-1/-ext-10000</span> Loading <span style="color:#c678dd">data</span> <span style="color:#c678dd">to</span> <span style="color:#c678dd">table</span> test<span style="color:#999999">.</span>query_result <span style="color:#c678dd">Table</span> test<span style="color:#999999">.</span>query_result stats: <span style="color:#999999">[</span>numFiles<span style="color:#669900">=</span><span style="color:#98c379">1</span><span style="color:#999999">,</span> numRows<span style="color:#669900">=</span><span style="color:#98c379">3</span><span style="color:#999999">,</span> totalSize<span style="color:#669900">=</span><span style="color:#98c379">69</span><span style="color:#999999">,</span> rawDataSize<span style="color:#669900">=</span><span style="color:#98c379">66</span><span style="color:#999999">]</span> MapReduce Jobs Launched: Stage<span style="color:#669900">-</span>Stage<span style="color:#669900">-</span><span style="color:#98c379">1</span>: Map: <span style="color:#98c379">2</span> Reduce: <span style="color:#98c379">1</span> Cumulative CPU: <span style="color:#98c379">17.7</span> sec HDFS <span style="color:#c678dd">Read</span>: <span style="color:#98c379">13657</span> HDFS <span style="color:#c678dd">Write</span>: <span style="color:#98c379">142</span> SUCCESS Total MapReduce CPU <span style="color:#c678dd">Time</span> Spent: <span style="color:#98c379">17</span> seconds <span style="color:#98c379">700</span> msec OK owntest<span style="color:#999999">.</span>name owntest<span style="color:#999999">.</span>age owntest2<span style="color:#999999">.</span>workplace owntest2<span style="color:#999999">.</span>name <span style="color:#c678dd">Time</span> taken: <span style="color:#98c379">96.932</span> seconds </code></span>- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
以上是在客戶端、JobTracker、TaskTracker的層次來分析MapReduce的工作原理的,下面我們再細致一點,從map任務和reduce任務的層次來分析分析吧。
從Map,Reduce,Shuffle理解
總體步驟
也就分成input,split,map,shuffle,reduce五個步驟,這里我們按照例子來簡單說明下,再詳細的說明我會放在后面的細分部分,這部分作用是了解下mapreduce的怎么個流程。這里是一個wordcount的過程了
input:也就是數據存儲位置,這里當然是類似于hdfs這樣的分布式存儲啦,
split:因為map task只讀split,而split基本上和hdfs的基本存儲塊block同樣大小注意:split是邏輯分片,一個split對應一個map,你可以把它當做map的單位塊來理解,投喂進map的時候必須要這樣的格式,打個比方,比如只收硬幣的地鐵站,你只能投放1元硬幣的,你投什么五毛,一角的,都是犯法的!對,警察叔叔就是他!
map:這里做的是wordcount,而map程序是由程序原來編寫的,如果非要用代碼來寫,我用python寫你別打我
<span style="color:#000000"><code class="language-python">splitdata <span style="color:#669900">=</span> <span style="color:#669900">'Deer Bear River'</span> aftermap <span style="color:#669900">=</span> <span style="color:#669900">map</span><span style="color:#999999">(</span><span style="color:#c678dd">lambda</span> x<span style="color:#999999">:</span><span style="color:#999999">(</span>x<span style="color:#999999">,</span><span style="color:#98c379">1</span><span style="color:#999999">)</span><span style="color:#999999">,</span>splitdata<span style="color:#999999">.</span>split<span style="color:#999999">(</span><span style="color:#669900">" "</span><span style="color:#999999">)</span><span style="color:#999999">)</span> <span style="color:#c678dd">print</span> aftermap<span style="color:#5c6370"># [('Deer', 1), ('Bear', 1), ('River', 1)]</span> </code></span>- 1
- 2
- 3
- 4
- 5
三個map對應三組Split,我這里只取了其中一組,就是這么個意思,組成key/value鍵值對
shuffle:這是一個比較核心的過程,shuffle有洗牌的意思,這里的意思你把她理解成在拉斯維加斯賭場發牌的小姐姐,但是這個小姐姐并不是隨機發牌,而是把紅桃發給A先生,黑桃都發給B先生,諸如此類。如果非要說有什么套路,那么其中有一個HashPartitioner就幫我們做了hash,你把它想成低配版的聚類,狹義版聚類,因為這里的類特喵的必須是值hash(key)%reduceNum相等的發到同一個reduce里去 啊!
reduce:既然都說了似wordcount了,那我,額,額,做戲做全套,我還是用python來寫這個過程
<span style="color:#000000"><code class="language-python">shuffledata <span style="color:#669900">=</span> <span style="color:#999999">[</span><span style="color:#999999">(</span><span style="color:#669900">'Deer'</span><span style="color:#999999">,</span> <span style="color:#98c379">1</span><span style="color:#999999">)</span><span style="color:#999999">,</span><span style="color:#999999">(</span><span style="color:#669900">'Deer'</span><span style="color:#999999">,</span> <span style="color:#98c379">1</span><span style="color:#999999">)</span><span style="color:#999999">,</span><span style="color:#999999">(</span><span style="color:#669900">'River'</span><span style="color:#999999">,</span> <span style="color:#98c379">1</span><span style="color:#999999">)</span><span style="color:#999999">]</span> afterreduce_dict <span style="color:#669900">=</span> <span style="color:#999999">{</span><span style="color:#999999">}</span> <span style="color:#c678dd">for</span> i <span style="color:#c678dd">in</span> shuffledata<span style="color:#999999">:</span><span style="color:#c678dd">if</span> i<span style="color:#999999">[</span><span style="color:#98c379">0</span><span style="color:#999999">]</span> <span style="color:#669900">not</span> <span style="color:#c678dd">in</span> afterreduce_dict<span style="color:#999999">:</span>afterreduce_dict<span style="color:#999999">[</span>i<span style="color:#999999">[</span><span style="color:#98c379">0</span><span style="color:#999999">]</span><span style="color:#999999">]</span> <span style="color:#669900">=</span> <span style="color:#98c379">0</span>afterreduce_dict<span style="color:#999999">[</span>i<span style="color:#999999">[</span><span style="color:#98c379">0</span><span style="color:#999999">]</span><span style="color:#999999">]</span> <span style="color:#669900">+=</span><span style="color:#98c379">1</span><span style="color:#c678dd">print</span> afterreduce_dict<span style="color:#5c6370"># {'River': 1, 'Deer': 2}</span> </code></span>- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
這個‘reduce’只是對傳統意義上的取鍵值對統計而已,理解這層意思就行,不要糾結形式
總體的步驟就是這么5步,其實最玄妙的shuffle還沒有拆開細講,接下來,見證shuffle的時刻,百分之九十抄的(捂臉)
再細分
shuffle是什么?
除了map,reduce,中間部分就是shuffle,它橫跨了map和reduce兩端,可以說是核心過程
這里你只要清楚Shuffle的大致范圍就成-怎樣把map task的輸出結果有效地傳送到reduce端。也可以這樣理解, Shuffle描述著數據從map task輸出到reduce task輸入的這段過程。你說我map task干完活了要輸出數據了,然后接下來數據給哪個reduce?不能因為我和那個reduce關系好,我把所有輸出結果都給它把,總要考慮下別的reduce的感受啊!那怎么分配?就有了shuffle過程!上面給了兩張圖,我都是盜來的,一張是官方的解釋,一張是中文翻譯,Tkanks-MapReduce:詳解Shuffle過程、MapReduce工作原理圖文詳解
shuffle解決什么問題?
在Hadoop這樣的集群環境中,大部分map task與reduce task的執行是在不同的節點上。當然很多情況下Reduce執行時需要跨節點去拉取其它節點上的map task結果。如果集群正在運行的job有很多,那么task的正常執行對集群內部的網絡資源消耗會很嚴重。這種網絡消耗是正常的,我們不能限制,能做的就是最大化地減少不必要的消耗。還有在節點內,相比于內存,磁盤IO對job完成時間的影響也是可觀的。從最基本的要求來說,我們對Shuffle過程的期望可以有:
- 完整地從map task端拉取數據到reduce 端。
- 在跨節點拉取數據時,盡可能地減少對帶寬的不必要消耗。
- 減少磁盤IO對task執行的影響。
Map端分析
與上圖步驟1,2,3,4一一對應來解釋
- Block是HDFS的基本存儲單元,上文也有寫,Block默認大小是64MB
- Split是map task只讀的單位,存儲的并非數據本身,而是一個分片長度和一個記錄數據的位置的數組
- Split與block的對應關系可能是多對一,默認是一對一
Running map task?就是程序員編寫好的map函數了,因此map函數效率相對好控制,而且一般map操作都是本地化操作也就是在數據存儲節點上進行,這也就是上問所提到的?數據本地化!
- 數據本地化(Data-Local)。意思是:將map任務分配給含有該map處理的數據塊的TaskTracker上,同時將程序JAR包復制到該TaskTracker上來運行,這叫“運算移動,數據不移動”。而分配reduce任務時并不考慮數據本地化。
- 簡單的hash做分配,就比如說我map task輸出鍵值對(1,1),(2,1),(3,1), 我有三個reducer,假設我寫個hash是除3取余的值為分配到的reducer位置(其實是要將key先變hash code再除的,我這里簡略),那么(1,1)被分配到第一個reduce,(2,1)被分配到第二個reduce,(3,1)被分配到第三個reduce,同理再來個(4,1)被分配到第一個reduce,HashPartitioner的目的就是合理的分組策略將使得每個Reducer獲得的計算負載差距不大,從而整體reduce的性能更加均衡。
- 沒理解HashPartitioner的胖友,可以參考下這個MapReduce 編程 系列十 使用HashPartitioner來調節Reducer的計算負載
**Memory Buffer & 3:**這是一個環形的緩沖區。map task輸出結果首先會進入一個緩沖區內,這個緩沖區的大小是100MB,如果map task內容太大,是很容易撐爆內存的,所以會有一個守護進程,每當緩沖區到達上限80%的時候,就會啟動一個Spill(溢寫)進程,它的作用是把內存里的map task的結果寫入到磁盤。這里值得注意的是,溢寫程序是單獨的一個進程,不會影響map task的繼續輸出(在寫磁盤過程中,另外的20%內存可以繼續寫入數據,兩種操作互不干擾,但如果在此期間緩沖區被填滿,那么map就會阻塞寫入內存的操作,讓寫入磁盤操作完成后再執行寫入內存。)。當溢寫線程啟動后,需要對這80MB空間內的key做排序(Sort)。排序是MapReduce模型默認的行為,這里的排序也是對序列化的字節做的排序。shuffle排序可以參考這里
- 在這個緩沖區內會有二次排序,先對partition分區排序,然后對同一個partition內的數據進行排序。
- 很多人的誤解在Map階段,如果不使用Combiner便不會排序,這是錯誤的,不管你用不用Combiner,Map Task均會對產生的數據排序(如果沒有Reduce Task,則不會排序, 實際上Map階段的排序就是為了減輕Reduce端排序負載)
- 數據按照partition值和key兩個關鍵字升序排序,移動的只是索引數據,排序結果是Kvmeta中數據按照partition為單位聚集在一起,同一partition內的按照key有序。
??Combiner:如果client設置過Combiner,那么現在就是使用Combiner的時候了。將有相同key的key/value對的value加起來,減少溢寫到磁盤的數據量,combiner簡單說就是map端的reduce!。Combiner會優化MapReduce的中間結果,所以它在整個模型中會多次使用。那哪些場景才能使用Combiner呢?從這里分析,Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。所以從我的想法來看,Combiner只應該用于那種Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結果的場景。比如累加,最大值等。但對于均值就會有影響;Combiner的使用一定得慎重,如果用好,它對job執行效率有幫助,反之會影響reduce的最終結果。
引用:然后為merge過程創建一個叫file.out的文件和一個叫file.out.Index的文件用來存儲最終的輸出和索引,一個partition一個partition的進行合并輸出。對于某個partition來說,從索引列表中查詢這個partition對應的所有索引信息,每個對應一個段插入到段列表中。也就是這個partition對應一個段列表,記錄所有的Spill文件中對應的這個partition那段數據的文件名、起始位置、長度等等。然后對這個partition對應的所有的segment進行合并,目標是合并成一個segment。當這個partition對應很多個segment時,會分批地進行合并:先從segment列表中把第一批取出來,以key為關鍵字放置成最小堆,然后從最小堆中每次取出最小的輸出到一個臨時文件中,這樣就把這一批段合并成一個臨時的段,把它加回到segment列表中;再從segment列表中把第二批取出來合并輸出到一個臨時segment,把其加入到列表中;這樣往復執行,直到剩下的段是一批,輸出到最終的文件中。最終的索引數據仍然輸出到Index文件中。
?也就是說,上述三個spill.out文件中,同一個顏色的表示計算出的partition的值是一致的,根據每一個spill.out文件都有spill.index文件對應,里面記錄了該out文件中某個partition的位置,將同一個partition從三個spill.out文件中都拿出來進行歸并操作,搞定后變成了最終文件的頭部,然后再采集藍色部分代表的partition,往復操作,最終每個map都會對應出一個結果文件,等待reduce來拉取自己需要處理的部分,至于怎么拉,拉誰,這個都記錄在index中,其實就是partition的值了
- Merge是怎樣的?比如“aaa”從某個map task讀取過來時值是5,從另外一個map 讀取時值是8,因為它們有相同的key,所以得merge成group。什么是group。對于“aaa”就是像這樣的:{“aaa”, [5, 8, 2, …]},數組中的值就是從不同溢寫文件中讀取出來的,然后再把這些值放在一起。
- 合并(Combine)和歸并(Merge)的區別:
兩個鍵值對<“a”,1>和<“a”,1>,如果合并,會得到<“a”,2>,如果歸并,會得到<“a”,<1,1>>,每個map都會有一個環形緩沖區,merge默認對此溢寫文件進行合并,而combine則需要設置,設置后,同個key的value會相加
- 補充一下,combine這個如果設置了,會在兩個地方被觸發;
- 補充另一點,在shuffle階段中map端涉及到的排序
在spill的過程中時,會對partition和key進行排序,確保partition內的key是有序的,這里用的是優化后的快排;然后再溢寫到小文件之后,針對這些小文件,最終需要merge到一個文件,這里也會出現一次排序,這里用的基本思想就是歸并排序,其實是可以理解的,因為在溢寫的文件塊內,已經保證局部有序了,相當于只需要將局部有序的列表進行合并,形成一個全局有序的,這個讓歸并來做實在合適,甚至只需要做歸并的后半部分!我用代碼闡述下
<span style="color:#000000"><code class="language-python"> <span style="color:#5c6370"># 假設兩個溢寫文件且是兩個partition的情況,里面的key已經有序了,現在需要全局有序,也就是小文件a,b merge到temp的過程</span>a <span style="color:#669900">=</span> <span style="color:#999999">[</span><span style="color:#98c379">1</span><span style="color:#999999">,</span><span style="color:#98c379">2</span><span style="color:#999999">,</span><span style="color:#98c379">4</span><span style="color:#999999">,</span><span style="color:#98c379">7</span><span style="color:#999999">,</span><span style="color:#98c379">20</span><span style="color:#999999">,</span><span style="color:#98c379">23</span><span style="color:#999999">]</span> b <span style="color:#669900">=</span> <span style="color:#999999">[</span><span style="color:#98c379">3</span><span style="color:#999999">,</span><span style="color:#98c379">5</span><span style="color:#999999">,</span><span style="color:#98c379">9</span><span style="color:#999999">,</span><span style="color:#98c379">11</span><span style="color:#999999">,</span><span style="color:#98c379">13</span><span style="color:#999999">,</span><span style="color:#98c379">17</span><span style="color:#999999">,</span><span style="color:#98c379">19</span><span style="color:#999999">,</span><span style="color:#98c379">20</span><span style="color:#999999">]</span> a_size <span style="color:#669900">=</span> <span style="color:#669900">len</span><span style="color:#999999">(</span>a<span style="color:#999999">)</span> b_size <span style="color:#669900">=</span> <span style="color:#669900">len</span><span style="color:#999999">(</span>b<span style="color:#999999">)</span> temp <span style="color:#669900">=</span> <span style="color:#999999">[</span><span style="color:#999999">]</span> i<span style="color:#999999">,</span>j <span style="color:#669900">=</span> <span style="color:#98c379">0</span><span style="color:#999999">,</span><span style="color:#98c379">0</span> <span style="color:#c678dd">while</span> i <span style="color:#669900">!=</span> a_size <span style="color:#669900">and</span> j <span style="color:#669900">!=</span> b_size<span style="color:#999999">:</span><span style="color:#c678dd">if</span> a<span style="color:#999999">[</span>i<span style="color:#999999">]</span> <span style="color:#669900"><</span> b<span style="color:#999999">[</span>j<span style="color:#999999">]</span><span style="color:#999999">:</span>temp<span style="color:#999999">.</span>append<span style="color:#999999">(</span>a<span style="color:#999999">[</span>i<span style="color:#999999">]</span><span style="color:#999999">)</span>i <span style="color:#669900">=</span> i <span style="color:#669900">+</span> <span style="color:#98c379">1</span><span style="color:#c678dd">else</span><span style="color:#999999">:</span>temp<span style="color:#999999">.</span>append<span style="color:#999999">(</span>b<span style="color:#999999">[</span>j<span style="color:#999999">]</span><span style="color:#999999">)</span>j <span style="color:#669900">=</span> j <span style="color:#669900">+</span> <span style="color:#98c379">1</span>temp<span style="color:#999999">.</span>extend<span style="color:#999999">(</span>b<span style="color:#999999">[</span>j<span style="color:#999999">:</span><span style="color:#999999">]</span> <span style="color:#c678dd">if</span> i <span style="color:#669900">==</span> a_size <span style="color:#c678dd">else</span> a<span style="color:#999999">[</span>i<span style="color:#999999">:</span><span style="color:#999999">]</span><span style="color:#999999">)</span><span style="color:#999999">[</span><span style="color:#98c379">1</span><span style="color:#999999">,</span> <span style="color:#98c379">2</span><span style="color:#999999">,</span> <span style="color:#98c379">3</span><span style="color:#999999">,</span> <span style="color:#98c379">4</span><span style="color:#999999">,</span> <span style="color:#98c379">5</span><span style="color:#999999">,</span> <span style="color:#98c379">7</span><span style="color:#999999">,</span> <span style="color:#98c379">9</span><span style="color:#999999">,</span> <span style="color:#98c379">11</span><span style="color:#999999">,</span> <span style="color:#98c379">13</span><span style="color:#999999">,</span> <span style="color:#98c379">17</span><span style="color:#999999">,</span> <span style="color:#98c379">19</span><span style="color:#999999">,</span> <span style="color:#98c379">20</span><span style="color:#999999">,</span> <span style="color:#98c379">20</span><span style="color:#999999">,</span> <span style="color:#98c379">23</span><span style="color:#999999">]</span></code></span>- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
代碼比較簡陋,但是闡述了歸并的基本思想,就是使用外部排序的方式,開辟一個緩沖區進行排序,這樣就不會局限于內存的限制了,其實在大數據排序中經常碰到,指針交替前進,最后刷完整個列表;
至此,map端的所有工作都已結束,最終生成的這個文件也存放在TaskTracker夠得著的某個本地目錄內。每個reduce task不斷地通過RPC從JobTracker那里獲取map task是否完成的信息,如果reduce task得到通知,獲知某臺TaskTracker上的map task執行完成,Shuffle的后半段過程開始啟動。
關于這個過程更詳細的請見?MapReduce shuffle過程詳解
Reduce端分析
- 默認情況下,當整個MapReduce作業的所有已執行完成的Map Task任務數超過Map Task總數的5%后,JobTracker便會開始調度執行Reduce Task任務。然后Reduce Task任務默認啟動mapred.reduce.parallel.copies(默認為5)個MapOutputCopier線程到已完成的Map Task任務節點上分別copy一份屬于自己的數據。 這些copy的數據會首先保存的內存緩沖區中,當內沖緩沖區的使用率達到一定閥值后,則寫到磁盤上。
- 有人可能會問:分區中的數據怎么知道它對應的reduce是哪個呢?其實map任務一直和其父TaskTracker保持聯系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整個集群中的宏觀信息。只要reduce任務向JobTracker獲取對應的map輸出位置就ok了。還有就是map端已經做完了partition,reduce根據partition標識符來拉自己需要的數據
總結
以上是生活随笔為你收集整理的总结:详细讲解MapReduce过程(整理补充)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 在Docker启动Cloudera并开始
- 下一篇: MapReduce简述、工作流程