我們基于Hadoop 1.2.1源碼分析MapReduce V1的處理流程。MapReduce V1實現(xiàn)中,主要存在3個主要的分布式進程(角色):JobClient、JobTracker和TaskTracker,我們主要是以這三個角色的實際處理活動為主線,并結(jié)合源碼,分析實際處理流程。上一篇我們分析了Job提交過程中JobClient端的處理流程(詳見文章?MapReduce V1:Job提交流程之JobClient端分析),這里我們繼續(xù)詳細分析Job提交在JobTracker端的具體流程。通過閱讀源碼可以發(fā)現(xiàn),這部分的處理邏輯還是有點復(fù)雜,經(jīng)過梳理,更加細化清晰的流程,如下圖所示:
上圖中主要分為兩大部分:一部分是JobClient基于RPC調(diào)用提交Job到JobTracker后,在JobTracker端觸發(fā)TaskScheduler所注冊的一系列Listener進行Job信息初始化;另一部分是JobTracker端監(jiān)聽Job隊列的線程,監(jiān)聽到Job狀態(tài)發(fā)生變更觸發(fā)一系列Listener更新狀態(tài)。我們從這兩個方面展開分析:
JobTracker接收Job提交
JobTracker接收到JobClient提交的Job,在JobTracker端具體執(zhí)行流程,描述如下:
JobClient基于JobSubmissionProtocol協(xié)議遠程調(diào)用JobTracker的submitJob方法提交JobJobTracker接收提交的Job,創(chuàng)建一個JobInProgress對象,將其放入內(nèi)部維護的Map<JobID, JobInProgress> jobs隊列中觸發(fā)JobQueueJobInProgressListener執(zhí)行JobQueueJobInProgressListener的jobAdded方法,創(chuàng)建JobSchedulingInfo對象,并放入到JobQueueJobInProgressListener內(nèi)部維護的Map<JobSchedulingInfo, JobInProgress> jobQueue隊列中觸發(fā)EagerTaskInitializationListener執(zhí)行EagerTaskInitializationListener的jobAdded方法,將JobInProgress對象加入到List<JobInProgress> jobInitQueue隊列中在JobTracker端使用TaskScheduler進行Job/Task的調(diào)度,可以通過mapred.jobtracker.taskScheduler配置所使用的TaskScheduler實現(xiàn)類,默認使用的實現(xiàn)類JobQueueTaskScheduler,如下所示:
| 2 | Class<??extends?TaskScheduler> schedulerClass |
| 3 | ??= conf.getClass("mapred.jobtracker.taskScheduler", JobQueueTaskScheduler.class, TaskScheduler.class); |
| 4 | taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf); |
如果想要使用其他的TaskScheduler實現(xiàn),可以在mapred-site.xml中配置mapred.jobtracker.taskScheduler的屬性值,覆蓋默認的調(diào)度策略即可。
在JobQueueTaskScheduler實現(xiàn)類中,注冊了2個JobInProgressListener,JobInProgressListener是用來監(jiān)聽由JobClient端提交后在JobTracker端Job(在JobTracker端維護的JobInProgress)生命周期變化,并觸發(fā)相應(yīng)事件(jobAdded/jobUpdated/jobRemoved)的,如下所示:
| 01 | protected?JobQueueJobInProgressListener jobQueueJobInProgressListener; |
| 02 | protected?EagerTaskInitializationListener eagerTaskInitializationListener; |
| 03 | private?float?padFraction; |
| 05 | public?JobQueueTaskScheduler() { |
| 06 | ??this.jobQueueJobInProgressListener =?new?JobQueueJobInProgressListener(); |
| 10 | public?synchronized?void?start()?throws?IOException { |
| 12 | ??taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);?// taskTrackerManager是JobTracker的引用 |
| 13 | ??eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager); |
| 14 | ??eagerTaskInitializationListener.start(); |
| 15 | ??taskTrackerManager.addJobInProgressListener(eagerTaskInitializationListener); |
JobTracker維護一個List<JobInProgressListener> jobInProgressListeners隊列,在TaskScheduler(默認JobQueueTaskScheduler )啟動的時候向JobTracker注冊。在JobClient提交Job后,在JobTracker段創(chuàng)建一個對應(yīng)的JobInProgress對象,并將其放入到j(luò)obs隊列后,觸發(fā)這一組JobInProgressListener的jobAdded方法。
JobTracker管理Job提交
JobTracker接收到提交的Job后,需要對提交的Job進行初始化操作,具體流程如下所示:
EagerTaskInitializationListener.JobInitManager線程監(jiān)控EagerTaskInitializationListener內(nèi)部的List<JobInProgress> jobInitQueue隊列加載一個EagerTaskInitializationListener.InitJob線程去初始化Job在EagerTaskInitializationListener.InitJob線程中,調(diào)用JobTracker的initJob方法初始化Job調(diào)用JobInProgress的initTasks方法初始化該Job對應(yīng)的Tasks從HDFS讀取該Job對應(yīng)的splits信息,創(chuàng)建MapTask和ReduceTask(在JobTracker端維護的Task實際上是TaskInProgress)Job狀態(tài)變更,觸發(fā)JobQueueJobInProgressListener如果Job優(yōu)先級(Priority)/開始時間發(fā)生變更,則對Map<JobSchedulingInfo, JobInProgress> jobQueue隊列進行重新排序;如果Job完成,則將Job從jobQueue隊列中移除Job狀態(tài)變更,觸發(fā)EagerTaskInitializationListener如果Job優(yōu)先級(Priority)/開始時間發(fā)生變更,則對List<JobInProgress> jobInitQueue隊列進行重新排序下面,我們分析的Job初始化,以及Task初始化,都是在JobTracker端執(zhí)行的工作,主要是為了管理Job和Task的運行,創(chuàng)建了對應(yīng)的數(shù)據(jù)結(jié)構(gòu),Job對應(yīng)JobInProgress,Task對應(yīng)TaskInProgress。我們分析說明如下:
JobTracker接收到JobClient提交的Job,在放到JobTracker的Map<JobID, JobInProgress> jobs隊列后,觸發(fā)2個JobInProgressListener執(zhí)行jobAdded方法,首先會放到EagerTaskInitializationListener的List<JobInProgress> jobInitQueue隊列中。在EagerTaskInitializationListener內(nèi)部,有一個內(nèi)部線程類JobInitManager在監(jiān)控jobInitQueue隊列,如果有新的JobInProgress對象加入到隊列,則取出并啟動一個新的初始化線程InitJob去初始化該Job,代碼如下所示:
| 01 | class?JobInitManager?implements?Runnable { |
| 04 | ????JobInProgress job =?null; |
| 07 | ????????synchronized?(jobInitQueue) { |
| 08 | ??????????while?(jobInitQueue.isEmpty()) { |
| 09 | ????????????jobInitQueue.wait(); |
| 11 | ??????????job = jobInitQueue.remove(0);?// 取出JobInProgress |
| 13 | ????????threadPool.execute(new?InitJob(job));?// 創(chuàng)建一個InitJob線程去初始化該JobInProgress |
| 14 | ??????}?catch?(InterruptedException t) { |
| 15 | ????????LOG.info("JobInitManagerThread interrupted."); |
| 19 | ????LOG.info("Shutting down thread pool"); |
| 20 | ????threadPool.shutdownNow(); |
然后,在InitJob線程中,調(diào)用JobTracker的initJob方法初始化Job,如下所示:
| 01 | class?InitJob?implements?Runnable { |
| 03 | ??private?JobInProgress job; |
| 05 | ??public?InitJob(JobInProgress job) { |
| 10 | ????ttm.initJob(job);?// TaskTrackerManager ttm,調(diào)用JobTracker的initJob方法初始化 |
JobTracker中的initJob方法的主要邏輯,如下所示:
| 01 | JobStatus prevStatus = (JobStatus)job.getStatus().clone(); |
| 02 | LOG.info("Initializing "?+ job.getJobID()); |
| 03 | job.initTasks();?// 調(diào)用JobInProgress的initTasks方法初始化Task |
| 04 | // Inform the listeners if the job state has changed |
| 05 | // Note : that the job will be in PREP state. |
| 06 | JobStatus newStatus = (JobStatus)job.getStatus().clone(); |
| 07 | if?(prevStatus.getRunState() != newStatus.getRunState()) { |
| 08 | ??JobStatusChangeEvent event = |
| 09 | ????new?JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, |
| 11 | ??synchronized?(JobTracker.this) { |
| 12 | ????updateJobInProgressListeners(event);?// 更新Job相關(guān)隊列的狀態(tài) |
實際上,在JobTracker中的initJob方法中最核心的邏輯,就是初始化組成該Job的MapTask和ReduceTask,它們在JobTracker端都抽象為TaskInProgress。
在JobClient提交Job的過程中,已經(jīng)將該Job所對應(yīng)的資源復(fù)制到HDFS,在JobTracker端需要讀取這些信息來創(chuàng)建MapTask和ReduceTask。我們回顧一下:默認情況下,split和對應(yīng)的元數(shù)據(jù)存儲路徑分別為/tmp/hadoop/mapred/staging/${user}/.staging/${jobid}/job.split和/tmp/hadoop/mapred/staging/${user}/.staging/${jobid}/job.splitmetainfo,在創(chuàng)建MapTask和ReduceTask只需要split的元數(shù)據(jù)信息即可,我們看一下job.splitmetainfo文件存儲的數(shù)據(jù)格式如下圖所示:
上圖中,META_SPLIT_FILE_HEADER的值為META-SPL,版本version的值為1,numSplits的值根據(jù)實際Job輸入split大小計算的到,SplitMetaInfo包括的信息為split所存放的節(jié)點位置個數(shù)、所有的節(jié)點位置信息、split在文件中的起始偏移量、split數(shù)據(jù)的長度。有了這些描述信息,JobTracker就可以知道一個Job需要創(chuàng)建幾個MapTask,實現(xiàn)代碼如下所示:
| 1 | ????TaskSplitMetaInfo[] splits = createSplits(jobId); |
| 3 | ????numMapTasks = splits.length; |
| 5 | ????maps =?new?TaskInProgress[numMapTasks];?// MapTask在JobTracker的表示為TaskInProgress |
| 6 | ????for(int?i=0; i < numMapTasks; ++i) { |
| 7 | ??????inputLength += splits[i].getInputDataLength(); |
| 8 | ??????maps[i] =?new?TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf,?this, i, numSlotsPerMap); |
而ReduceTask的個數(shù),根據(jù)用戶在配置Job時指定的Reduce的個數(shù),創(chuàng)建ReduceTask的代碼,如下所示:
| 4 | this.reduces =?new?TaskInProgress[numReduceTasks]; |
| 5 | for?(int?i =?0; i < numReduceTasks; i++) { |
| 6 | ??reduces[i] =?new?TaskInProgress(jobId, jobFile, numMapTasks, i, jobtracker, conf,this, numSlotsPerReduce); |
| 7 | ??nonRunningReduces.add(reduces[i]); |
除了創(chuàng)建MapTask和ReduceTask之外,還會創(chuàng)建setup和cleanup task,每個Job的MapTask和ReduceTask各對應(yīng)一個,即共計2個setup task和2個cleanup task。setup task用來初始化MapTask/ReduceTask,而cleanup task用來清理MapTask/ReduceTask。創(chuàng)建setup和cleanup task,代碼如下所示:
| 01 | // create cleanup two cleanup tips, one map and one reduce. |
| 02 | cleanup =?new?TaskInProgress[2];?// cleanup task,map對應(yīng)一個,reduce對應(yīng)一個 |
| 04 | // cleanup map tip. This map doesn't use any splits. Just assign an empty split. |
| 05 | TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT; |
| 06 | cleanup[0] =?new?TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf,?this, numMapTasks,?1); |
| 07 | cleanup[0].setJobCleanupTask(); |
| 10 | cleanup[1] =?new?TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks, jobtracker, conf,?this,?1); |
| 11 | cleanup[1].setJobCleanupTask(); |
| 13 | // create two setup tips, one map and one reduce. |
| 14 | setup =?new?TaskInProgress[2];?// setup task,map對應(yīng)一個,reduce對應(yīng)一個 |
| 16 | // setup map tip. This map doesn't use any split. Just assign an empty split. |
| 17 | setup[0] =?new?TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf,?this, numMapTasks +?1,?1); |
| 18 | setup[0].setJobSetupTask(); |
| 21 | setup[1] =?new?TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks +?1, jobtracker, conf,?this,?1); |
| 22 | setup[1].setJobSetupTask(); |
一個Job在JobInProgress中進行初始化Task,這里初始化Task使得該Job滿足被調(diào)度的要求,比如,知道一個Job有哪些Task組成,每個Task對應(yīng)哪個split等等。在初始化完成后,置一個Task初始化完成標志,如下所示:
| 01 | synchronized(jobInitKillStatus){ |
| 02 | ??jobInitKillStatus.initDone =?true; |
| 04 | ??// set this before the throw to make sure cleanup works properly |
| 05 | ??tasksInited =?true;?// 置Task初始化完成標志 |
| 07 | ??if(jobInitKillStatus.killed) { |
| 08 | ????throw?new?KillInterruptedException("Job "?+ jobId +?" killed in init"); |
在置tasksInited = true;后,該JobInProgress就可以被TaskScheduler進行調(diào)度了,調(diào)度時,是以Task(MapTask/ReduceTask)為單位分派給TaskTracker。而對于哪些TaskTracker可以運行Task,需要通過TaskTracker向JobTracker周期性發(fā)送的心跳得到TaskTracker的健康狀況信息、節(jié)點資源信息等來確定,是否該TaskTracker可以運行一個Job的一個或多個Task。
總結(jié)
以上是生活随笔為你收集整理的MapReduce V1:Job提交流程之JobTracker端分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。