MapReduce Job本地提交过程源码跟踪及分析
MapReduce Job作業的提交過程可以分為本地提交模式與集群模式提交,這兩種提交模式與org.apache.hadoop.mapred.LocalJobRunner、org.apache.hadoop.mapred.YARNRunner這兩個類相關。在本篇文章中,將剖析Job作業本地的提交過程。用到了JVM的遠程調試,具體操作請見這篇?“eclipse中遠程調試JVM(以啟動namenode進程為例)”?
所寫的MapReduce程序、debug的操作步驟以及本文中涉及到的內容均以整理好打包上傳,下載地址。
Job作業本地提交過程簡略表示
先將整理得出的提交過程簡略標示圖,以方便閱讀后文所寫的關鍵代碼解析
job.submit() --> JobSubmitter.submitJobInternal() --> LocalJobRunner.submitJob(..)--> 轉換Job為LocalJobRunner.Job的對象(線程,并啟動)new LocalJobRunner(){...this.start(); //啟動job線程}--> LocalJobRunner.run()1.創建mapRunnables集合 //map數量取決于切片的數量2.runTasks(mapRunnable集合) 3.創建reduceRunnables集合 //reduce數量需要手動進行設置4.runTasks(reduceRunnable集合)--> runTasks(...)for(Runnable r : runnables){service.submit(r);}--> MapTaskRunnable.run() \ ReduceTaskRunnable.run(){MapTask task = new MapTask(); \ ReduceTask task = new ReduceTask();task.run();}--> MapTask.run(){MyMaxTempMapper.run(){setup();while(){map(...);}cleanup();}}ReduceTask.run(){MyMaxTempMapper.run(){setup();while(){reduce(...);}cleanup();}}源碼解析
org.apache.hadoop.mapreduce.Job類
waitForCompletion()方法
/*** 提交作業給集群并等待完成* Submit the job to the cluster and wait for it to finish.* @param verbose print the progress to the user* @return true if the job succeeded* @throws IOException thrown if the communication with the * <code>JobTracker</code> is lost*/public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {//狀態的提取if (state == JobState.DEFINE) {//調用自己的提交方法submit();}if (verbose) {monitorAndPrintJob();} else {// get the completion poll interval from the client.int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());while (!isComplete()) {try {Thread.sleep(completionPollIntervalMillis);} catch (InterruptedException ie) {}}}return isSuccessful();}submit()方法
/*** 提交作業給集群立即返回* Submit the job to the cluster and return immediately.* @throws IOException*/public void submit() throws IOException, InterruptedException, ClassNotFoundException {//確認狀態ensureState(JobState.DEFINE);//設置新型APIsetUseNewAPI();//連接,連接到集群connect();//創建了一個作業提交器,從集群中取出文件系統和Client,得到文件系統和客戶端的值;通過集群的文件系統和集群的客戶端,從而得到作業提交器final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());//匿名內部類new PrivilegedExceptionAction<JobStatus>()status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {//調用了JobSubmitter的submitJobInternal()方法(內部的提交方法) 最重要一步return submitter.submitJobInternal(Job.this, cluster);}});state = JobState.RUNNING;LOG.info("The url to track the job: " + getTrackingURL());}connect()方法
//使用了匿名內部類對象new PrivilegedExceptionAction<Cluster>()//目的就是創建集群對象,通過匿名內部類創建出一個集群對象,返回給cluster(cluster是Job類的成員變量)private synchronized void connect()throws IOException, InterruptedException, ClassNotFoundException {if (cluster == null) {cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() {public Cluster run()throws IOException, InterruptedException, ClassNotFoundException {//通過getConfiguration()方法將配置傳給集群return new Cluster(getConfiguration());}});}}org.apache.hadoop.mapreduce.JobSubmitter類
JobSubmitter類中的submitJobInternal()方法
JobSubmitter.submitJobInternal(){...JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {//validate the jobs output specs 檢查空間,如果輸出目錄存在就會拋出異常checkSpecs(job);//拿到job的配置Configuration conf = job.getConfiguration();//把conf放入分布式緩存(可以先不管)addMRFrameworkToDistributedCache(conf);//進入作業的階段性區域(hdfs的臨時目錄)Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);//取得本地客戶IP(作業提交客戶端的ip地址,即在哪一臺機子上進行提交)InetAddress ip = InetAddress.getLocalHost();if (ip != null) {//得到本機的提交地址submitHostAddress = ip.getHostAddress();//得到本機的主機名字submitHostName = ip.getHostName();//獲取之后,在配置文件中進行submitHostName和submitHostAddress的設置conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);//創建一個新的作業ID(每個Job作業都有一個唯一的ID)JobID jobId = submitClient.getNewJobID();//對JobID進行設置job.setJobID(jobId);//通過jobid和臨時目錄構造出一個新的路徑Path submitJobDir = new Path(jobStagingArea, jobId.toString());//作業狀態JobStatus status = null;try{//設置用戶名稱conf.set(MRJobConfig.USER_NAME,UserGroupInformation.getCurrentUser().getShortUserName());//設置過濾器初始化conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");//設置作業目錄//submitJobDir的值發生了改變,值為file:/tmp/hadoop-陶/mapred/staging/ì635285396/.staging/job_local635285396_0001//說明在./.staging/目錄下還要創造一個子文件夾來存放作業conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());...//接下來的一長串操作是進行安全性控制,進行安全認證(hadoop可以加入安全認證的插件)// generate a secret to authenticate shuffle transfersif (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {KeyGenerator keyGen;try {keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);keyGen.init(SHUFFLE_KEY_LENGTH);} catch (NoSuchAlgorithmException e) {throw new IOException("Error generating shuffle secret key", e);}SecretKey shuffleKey = keyGen.generateKey();TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),job.getCredentials());}if (CryptoUtils.isEncryptedSpillEnabled(conf)) {conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);LOG.warn("Max job attempts set to 1 since encrypted intermediate" +"data spill is enabled");}//拷貝并配置文件copyAndConfigureFiles(job, submitJobDir);Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);// Create the splits for the job 為作業創建切片(map任務的個數取決于切片的個數)LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));int maps = writeSplits(job, submitJobDir);//執行完writeSplits()方法(該方法就是創建切片的過程)之后,在./.staging/job_local635285396_0001/目錄下生成了四個文件//.job.split.crc、.job.splitmetainfo.crc校驗和文件//job.split切片文件 job.splitmetainfo切片元信息文件//設置MapReduce的map任務數conf.setInt(MRJobConfig.NUM_MAPS, maps);//打印map信息LOG.info("number of splits:" + maps);//作業提交到作業隊列中去,由作業隊列進行管理// write "queue admins of the queue to which job is being submitted"// to job file.String queue = conf.get(MRJobConfig.QUEUE_NAME,JobConf.DEFAULT_QUEUE_NAME);AccessControlList acl = submitClient.getQueueAdmins(queue);conf.set(toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());...//寫入job file到提交目錄中(提交job.xml到提交目錄)//job.xml文件里包含了四個配置文件里面所設置的所有參數writeConf(conf, submitJobFile);...//submitClient=LocalJobRunner//LocalJobRunner.submitJob() 通過執行器提交作業status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());}}}... }JobSubmitter類中的copyAndConfigureFiles()方法
JobSubmitter{...//使用命令行參數設置conf信息private void copyAndConfigureFiles(Job job, Path jobSubmitDir) throws IOException {JobResourceUploader rUploader = new JobResourceUploader(jtFs);rUploader.uploadFiles(job, jobSubmitDir);// Get the working directory. If not set, sets it to filesystem working dir// This code has been added so that working directory reset before running// the job. This is necessary for backward compatibility as other systems// might use the public API JobConf#setWorkingDirectory to reset the working// directory.job.getWorkingDirectory();}... }org.apache.hadoop.mapred.LocalJobRunner類
LocalJobRunner類中的submitJob方法
LocalJobRunner{...public org.apache.hadoop.mapreduce.JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,Credentials credentials) throws IOException {//創建LocalJobRunner.job的內部類對象 Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);job.job.setCredentials(credentials);return job.status;}... }LocalJobRunner類中的內部類Job?
在Job類中涉及到了run、runTasks以及Job的內部類MapTaskRunnable與ReduceTaskRunnable
Job類中的run()方法
@Overridepublic void run() {//得到作業的IDJobID jobId = profile.getJobID();//拿到作業的上下文JobContext jContext = new JobContextImpl(job, jobId);org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;try {outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);} catch (Exception e) {LOG.info("Failed to createOutputCommitter", e);return;}try {//任務切片元信息,得到切片信息TaskSplitMetaInfo[] taskSplitMetaInfos = SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);//得到Reduce任務數int numReduceTasks = job.getNumReduceTasks();//安裝作業 設置作業outputCommitter.setupJob(jContext);//設置作業進度status.setSetupProgress(1.0f);//輸出文件Map<TaskAttemptID, MapOutputFile> mapOutputFiles =Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());//得到mappper對應的runnable個數(Runner.Job.MapTaskRunnable) org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnableList<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(taskSplitMetaInfos, jobId, mapOutputFiles);//初始化計數器initCounters(mapRunnables.size(), numReduceTasks);//創建一個map的線程池的執行器ExecutorService mapService = createMapExecutor();//開始運行map任務任務//注意:mapreduce的運行過程中,使用了線程池的技術(放到隊列當中,在將來的某個時刻進行執行)runTasks(mapRunnables, mapService, "map");try {//如果reduce的個數大于0,再去執行reduce階段if (numReduceTasks > 0) {//計算reduce對應的runnable個數List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(jobId, mapOutputFiles);ExecutorService reduceService = createReduceExecutor();//開始運行reduce任務runTasks(reduceRunnables, reduceService, "reduce");}} finally {for (MapOutputFile output : mapOutputFiles.values()) {output.removeAll();}}// delete the temporary directory in output directoryoutputCommitter.commitJob(jContext);status.setCleanupProgress(1.0f);if (killed) {this.status.setRunState(JobStatus.KILLED);} else {this.status.setRunState(JobStatus.SUCCEEDED);}JobEndNotifier.localRunnerNotification(job, status);} catch (Throwable t) {try {outputCommitter.abortJob(jContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED);} catch (IOException ioe) {LOG.info("Error cleaning up job:" + id);}status.setCleanupProgress(1.0f);if (killed) {this.status.setRunState(JobStatus.KILLED);} else {this.status.setRunState(JobStatus.FAILED);}LOG.warn(id, t);JobEndNotifier.localRunnerNotification(job, status);} finally {try {fs.delete(systemJobFile.getParent(), true); // delete submit dirlocalFs.delete(localJobFile, true); // delete local copy// Cleanup distributed cachelocalDistributedCacheManager.close();} catch (IOException e) {LOG.warn("Error cleaning up "+id+": "+e);}}}Job類中的runTasks()方法
.../** Run a set of tasks and waits for them to complete. */private void runTasks(List<RunnableWithThrowable> runnables,ExecutorService service, String taskType) throws Exception {// Start populating the executor with work units.// They may begin running immediately (in other threads).for (Runnable r : runnables) {//進行提交 是一個線程池,執行map和reduceservice.submit(r);}...}Job的內部類MapTaskRunnable
MapTaskRunnable{...public void run() {try {//生成一個maptask的IDTaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, taskId), 0);LOG.info("Starting task: " + mapId);//將mapID加入到mapIds這個集合里來mapIds.add(mapId);//作業文件(實質上是一個job.xml文件,可通過watch查看),mapid,任務id,切片信息 去構造一個MapTask類型的對象MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,info.getSplitIndex(), 1);map.setUser(UserGroupInformation.getCurrentUser().getShortUserName());//設置目錄//map為MapTask類型的一個值,例如本次調試中所獲取的值為:attempt_local335618588_0001_m_000000_0//localConf()加載配置文件的信息setupChildMapredLocalDirs(map, localConf);//創建一個map輸出文件MapOutputFile mapOutput = new MROutputFiles();//設置配置信息mapOutput.setConf(localConf);mapOutputFiles.put(mapId, mapOutput);//localJobFile.toString()的值為file:/tmp/hadoop-陶/mapred/local/localRunner/ì?/job_local335618588_0001/job_local335618588_0001.xml//實質上是一個job_local335618588_0001.xml文件map.setJobFile(localJobFile.toString());localConf.setUser(map.getUser());map.localizeConfiguration(localConf);map.setConf(localConf);try {map_tasks.getAndIncrement();//launchMap()方法,進行啟動mapmyMetrics.launchMap(mapId);map.run(localConf, Job.this);myMetrics.completeMap(mapId);} finally {map_tasks.getAndDecrement();}LOG.info("Finishing task: " + mapId);} catch (Throwable e) {this.storedException = e;}}...}}Job的內部類ReduceTaskRunnable
ReduceTaskRunnable{...public void run() {try {TaskAttemptID reduceId = new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, taskId), 0);LOG.info("Starting task: " + reduceId);ReduceTask reduce = new ReduceTask(systemJobFile.toString(),reduceId, taskId, mapIds.size(), 1);reduce.setUser(UserGroupInformation.getCurrentUser().getShortUserName());setupChildMapredLocalDirs(reduce, localConf);reduce.setLocalMapFiles(mapOutputFiles);if (!Job.this.isInterrupted()) {reduce.setJobFile(localJobFile.toString());localConf.setUser(reduce.getUser());reduce.localizeConfiguration(localConf);reduce.setConf(localConf);try {reduce_tasks.getAndIncrement();// 進行啟動reducemyMetrics.launchReduce(reduce.getTaskID());// 開始運行reduce任務reduce.run(localConf, Job.this);myMetrics.completeReduce(reduce.getTaskID());} finally {reduce_tasks.getAndDecrement();}LOG.info("Finishing task: " + reduceId);...}}}...} }org.apache.hadoop.mapred.MapTask
MapTask{...@Overridepublic void run(final JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, ClassNotFoundException, InterruptedException {this.umbilical = umbilical;//判斷是否是map任務if (isMapTask()) {// If there are no reducers then there won't be any sort. Hence the map // phase will govern the entire attempt's progress.//如果沒有reduce就不做任何排序if (conf.getNumReduceTasks() == 0) {mapPhase = getProgress().addPhase("map", 1.0f);} else {// If there are reducers then the entire attempt's progress will be // split between the map phase (67%) and the sort phase (33%).mapPhase = getProgress().addPhase("map", 0.667f);//進行排序sortPhase = getProgress().addPhase("sort", 0.333f);}}//啟動一個匯報TaskReporter reporter = startReporter(umbilical);boolean useNewApi = job.getUseNewMapper();//進行初始化工作initialize(job, getJobID(), reporter, useNewApi);// check if it is a cleanupJobTaskif (jobCleanup) {runJobCleanupTask(umbilical, reporter);return;}if (jobSetup) {runJobSetupTask(umbilical, reporter);return;}if (taskCleanup) {runTaskCleanupTask(umbilical, reporter);return;}//判斷是否是新型Apiif (useNewApi) {//調用自己所寫的mapper類//作業信息,切片元信息,,報告runNewMapper(job, splitMetaInfo, umbilical, reporter);} else {//調用自己所寫的mapper類runOldMapper(job, splitMetaInfo, umbilical, reporter);}done(umbilical, reporter);}...@SuppressWarnings("unchecked")private <INKEY,INVALUE,OUTKEY,OUTVALUE>void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical,TaskReporter reporter) throws IOException, ClassNotFoundException,InterruptedException {// make a task context so we can get the classes// 通過創建任務的上下文對象,獲取類的對象org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),reporter);// make a mapper// 創建一個mapper,通過反射的方式得到mapper(值為class com.zhaotao.hadoop.mr.MyMaxTempMapper)// taskContext.getMapperClass()的值為class com.zhaotao.hadoop.mr.MyMaxTempMapperorg.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);// make the input format// 通過反射獲取輸入格式的對象org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);// rebuild the input split// 重建輸入的切片org.apache.hadoop.mapreduce.InputSplit split = null;split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset());LOG.info("Processing split: " + split);org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =new NewTrackingRecordReader<INKEY,INVALUE>(split, inputFormat, reporter, taskContext);job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());org.apache.hadoop.mapreduce.RecordWriter output = null;// get an output object// 判斷是否有reduceif (job.getNumReduceTasks() == 0) {output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);} else {// 新建一個輸出收集器(與自己重寫的mapper方法種Context對象相關)===>等價于Context對象output = new NewOutputCollector(taskContext, job, umbilical, reporter);}// 將新建的輸出收集器對象output放入MapContextImpl中,以此得到MapContext對象org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);// 對mapContext進行包裝,得到定義的Context類型的mapperContextorg.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext);try {// 先做初始化input.initialize(split, mapperContext);//具體內容看Mapper類中的run()方法mapper.run(mapperContext);mapPhase.complete();setPhase(TaskStatus.Phase.SORT);statusUpdate(umbilical);input.close();input = null;output.close(mapperContext);output = null;} finally {closeQuietly(input);closeQuietly(output, mapperContext);}}... }org.apache.hadoop.mapreduce.Mapper
Mapper{...// map的執行過程中分為三個階段// a.安裝階段// b.循環調用map()階段 (只有該階段是自己定義的) 在Java中,這是一個典型的回調機制(只有在重寫方法的時候進行回調機制)// c.清除階段// 傳入了Context類型的參數public void run(Context context) throws IOException, InterruptedException {setup(context);try {// 使用循環,不斷調用map方法(實質上就是自己重寫的map方法,對其不斷的調用,用于處理文本行)// 判斷是否存在下一個Key - Valuewhile (context.nextKeyValue()) {// 存在,取出Key值、Value值,并傳入上下文contextmap(context.getCurrentKey(), context.getCurrentValue(), context);}} finally {// 最后對其進行清理cleanup(context);}}... }org.apache.hadoop.mapreduce.lib.map.WrappedMapper
WrappedMapper{...@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {// 返回Key值return mapContext.nextKeyValue();}@Overridepublic Counter getCounter(Enum<?> counterName) {// 返回Value值return mapContext.getCounter(counterName);}... }org.apache.hadoop.mapred.ReduceTask
ReduceTask{...@Override@SuppressWarnings("unchecked")public void run(JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, InterruptedException, ClassNotFoundException {job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());// 判斷是Map或者Reduceif (isMapOrReduce()) {// 添加一個拷貝階段copyPhase = getProgress().addPhase("copy");// 增加一個排序階段sortPhase = getProgress().addPhase("sort");// 增加一個reduce化簡階段reducePhase = getProgress().addPhase("reduce");}// start thread that will handle communication with parentTaskReporter reporter = startReporter(umbilical);// 判斷是否使用了新的Reducer Apiboolean useNewApi = job.getUseNewReducer();// 利用上述值進行初始化操作initialize(job, getJobID(), reporter, useNewApi);// check if it is a cleanupJobTaskif (jobCleanup) {runJobCleanupTask(umbilical, reporter);return;}if (jobSetup) {runJobSetupTask(umbilical, reporter);return;}if (taskCleanup) {runTaskCleanupTask(umbilical, reporter);return;}// Initialize the codec// 檢查map的輸出是否被壓縮了codec = initCodec();// 一個迭代器RawKeyValueIterator rIter = null;// shuffle洗牌ShuffleConsumerPlugin shuffleConsumerPlugin = null;Class combinerClass = conf.getCombinerClass();CombineOutputCollector combineCollector = (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;Class<? extends ShuffleConsumerPlugin> clazz =job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);// 正式開啟reduce之前的shuffle過程ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter,shuffledMapsCounter,reduceShuffleBytes, failedShuffleCounter,mergedMapOutputsCounter,taskStatus, copyPhase, sortPhase, this,mapOutputFile, localMapFiles);// shuffle完成shuffleConsumerPlugin.init(shuffleContext);// rIter是一個MergeQueue合并隊列rIter = shuffleConsumerPlugin.run();// free up the data structuresmapOutputFilesOnDisk.clear();// 排序已經完成sortPhase.complete(); // sort is completesetPhase(TaskStatus.Phase.REDUCE); statusUpdate(umbilical);Class keyClass = job.getMapOutputKeyClass();Class valueClass = job.getMapOutputValueClass();RawComparator comparator = job.getOutputValueGroupingComparator();// 判斷reduce的Api是否為新版本if (useNewApi) {// 開始運行reduce任務runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);} else {runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);}shuffleConsumerPlugin.close();done(umbilical, reporter);}...@SuppressWarnings("unchecked")private <INKEY,INVALUE,OUTKEY,OUTVALUE>void runNewReducer(JobConf job,final TaskUmbilicalProtocol umbilical,final TaskReporter reporter,RawKeyValueIterator rIter,RawComparator<INKEY> comparator,Class<INKEY> keyClass,Class<INVALUE> valueClass) throws IOException,InterruptedException, ClassNotFoundException {// wrap value iterator to report progress.final RawKeyValueIterator rawIter = rIter;rIter = new RawKeyValueIterator() {public void close() throws IOException {rawIter.close();}public DataInputBuffer getKey() throws IOException {return rawIter.getKey();}public Progress getProgress() {return rawIter.getProgress();}public DataInputBuffer getValue() throws IOException {return rawIter.getValue();}public boolean next() throws IOException {boolean ret = rawIter.next();reporter.setProgress(rawIter.getProgress().getProgress());return ret;}};// make a task context so we can get the classes// 創建一個任務上下文來獲取classorg.apache.hadoop.mapreduce.TaskAttemptContext taskContext =new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,getTaskID(), reporter);// make a reducer// 通過反射的方式獲取reducer對象// taskContext.getReducerClass()的值為com.zhaotao.hadoop.mr.MyMaxTempReducerorg.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getReducerClass(), job);org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);job.setBoolean("mapred.skip.on", isSkipping());job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(),rIter, reduceInputKeyCounter, reduceInputValueCounter, trackedRW,committer,reporter, comparator, keyClass,valueClass);try {// 進入并開始reduce的運行reducer.run(reducerContext);} finally {trackedRW.close(reducerContext);}}... }org.apache.hadoop.mapreduce.Reducer
Reducer{...// 分三階段運行,與mapper的run()方法運行的一樣public void run(Context context) throws IOException, InterruptedException {setup(context);try {while (context.nextKey()) {reduce(context.getCurrentKey(), context.getValues(), context);// If a back up store is used, reset itIterator<VALUEIN> iter = context.getValues().iterator();if(iter instanceof ReduceContext.ValueIterator) {((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); }}} finally {cleanup(context);}}... }Job作業本地提交流程分析圖
總結
以上是生活随笔為你收集整理的MapReduce Job本地提交过程源码跟踪及分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 考究Hadoop中split的计算方法
- 下一篇: MapReduce Job集群提交过程源