MapReduce多用户任务调度器——容量调度器(Capacity Scheduler)原理和源码研究
前言:為了研究需要,將Capacity Scheduler和Fair Scheduler的原理和代碼進(jìn)行學(xué)習(xí),用兩篇文章作為記錄。如有理解錯(cuò)誤之處,歡迎批評(píng)指正。
容量調(diào)度器(Capacity Scheduler)是Yahoo公司開(kāi)發(fā)的多用戶(hù)調(diào)度器。多用戶(hù)調(diào)度器的使用場(chǎng)景很多,根據(jù)資料1的說(shuō)法,Hadoop集群的用戶(hù)量越來(lái)越大,不同用戶(hù)提交的應(yīng)用程序具有不同的服務(wù)質(zhì)量要求(QoS):
1. 批處理作業(yè):耗時(shí)較長(zhǎng),對(duì)完成時(shí)間沒(méi)有嚴(yán)格要求。如數(shù)據(jù)挖掘、機(jī)器學(xué)習(xí)等應(yīng)用。
2. 交互式作業(yè):期望及時(shí)返回結(jié)果。如Hive等應(yīng)用。
3. 生產(chǎn)性作業(yè):要求一定量的的資源保證。如統(tǒng)計(jì)值計(jì)算、垃圾數(shù)據(jù)分析等。
傳統(tǒng)的FIFO調(diào)度器不能滿足應(yīng)用對(duì)響應(yīng)時(shí)間和資源的多樣化要求,多用戶(hù)多隊(duì)列調(diào)度器應(yīng)運(yùn)而生。容量調(diào)度器即是其中被廣泛應(yīng)用的一種。
一、基本思想
容量調(diào)度器以隊(duì)列為單位劃分資源,每個(gè)隊(duì)列都有資源使用的下限和上限。每個(gè)用戶(hù)也可以設(shè)定資源使用上限。一個(gè)隊(duì)列的剩余資源可以共享給另一個(gè)隊(duì)列,其他隊(duì)列使用后還可以歸還。管理員可以約束單個(gè)隊(duì)列、用戶(hù)或作業(yè)的資源使用。支持資源密集型作業(yè),可以給某些作業(yè)分配多個(gè)slot(這是比較特殊的一點(diǎn))。支持作業(yè)優(yōu)先級(jí),但不支持資源搶占。
這里明確一下用戶(hù)、隊(duì)列和作業(yè)之間的關(guān)系。Hadoop以隊(duì)列為單位管理資源,每個(gè)隊(duì)列分配到一定的資源,用戶(hù)只能向一個(gè)或幾個(gè)隊(duì)列提交作業(yè)。隊(duì)列管理體現(xiàn)為兩方面:1. 用戶(hù)權(quán)限管理:Hadoop用戶(hù)管理模塊建立在操作系統(tǒng)用戶(hù)和用戶(hù)組之間的映射之上,允許一個(gè)操作系統(tǒng)用戶(hù)或者用戶(hù)組對(duì)應(yīng)一個(gè)或者多個(gè)隊(duì)列。同時(shí)可以配置每個(gè)隊(duì)列的管理員用戶(hù)。隊(duì)列信息配置在mapred-site.xml文件中,包括隊(duì)列的名稱(chēng),是否啟用權(quán)限管理功能等信息,且不支持動(dòng)態(tài)加載。隊(duì)列權(quán)限選項(xiàng)配置在mapred-queue-acls.xml文件中,可以配置某個(gè)用戶(hù)或用戶(hù)組在某個(gè)隊(duì)列中的某種權(quán)限。權(quán)限包括作業(yè)提交權(quán)限和作業(yè)管理權(quán)限。2. 系統(tǒng)資源管理:管理員可以配置每個(gè)隊(duì)列和每個(gè)用戶(hù)的可用資源量信息,為調(diào)度器提供調(diào)度依據(jù)。這些信息配置在調(diào)度器自己的配置文件(如Capacity-Scheduler.xml)中。關(guān)于每個(gè)配置文件的常見(jiàn)內(nèi)容見(jiàn)附錄。
二、整體架構(gòu)
總體來(lái)說(shuō),容量調(diào)度器的工作流程分5個(gè)步驟:
1. 用戶(hù)提交作業(yè)到JobTracker。
2. JobTracker將提交的作業(yè)交給Capacity Scheduler的監(jiān)聽(tīng)器JobQueuesManager,并將作業(yè)加入等待隊(duì)列,由JobInitializationPoller線程初始化。
3. TaskTracker通過(guò)心跳信息要求JobTracker為其分配任務(wù)。
4. JobTracker調(diào)用Capacity Scheduler的assignTasks方法為其分配任務(wù)。
5. JobTracker將分配到的任務(wù)返回給TaskTracker。
接下,我們結(jié)合源代碼依次研究上述過(guò)程。
三、實(shí)現(xiàn)細(xì)節(jié)
1. 調(diào)度器的啟動(dòng)
回憶一下,前面談到調(diào)度器啟動(dòng)是由JobTracker調(diào)用調(diào)度器的start方法實(shí)現(xiàn)的,首先來(lái)看start方法:
?
// initialize our queues from the config settingsif (null == schedConf) {schedConf = new CapacitySchedulerConf();}首先生成配置對(duì)象,容量調(diào)度器定義了自己的配置對(duì)象,構(gòu)造時(shí)會(huì)加載調(diào)度器自己的配置文件作為資源,并初始化一些默認(rèn)的配置選項(xiàng):
?
?
public CapacitySchedulerConf() {rmConf = new Configuration(false);rmConf.addResource(SCHEDULER_CONF_FILE);initializeDefaults();}private void initializeDefaults() {defaultUlimitMinimum = rmConf.getInt("mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);defaultUserLimitFactor = rmConf.getFloat("mapred.capacity-scheduler.default-user-limit-factor", 1.0f);defaultSupportPriority = rmConf.getBoolean("mapred.capacity-scheduler.default-supports-priority", false);defaultMaxActiveTasksPerQueueToInitialize = rmConf.getInt("mapred.capacity-scheduler.default-maximum-active-tasks-per-queue", 200000);defaultMaxActiveTasksPerUserToInitialize = rmConf.getInt("mapred.capacity-scheduler.default-maximum-active-tasks-per-user", 100000);defaultInitToAcceptJobsFactor =rmConf.getInt("mapred.capacity-scheduler.default-init-accept-jobs-factor", 10);}例如,第一個(gè)默認(rèn)值表示每個(gè)用戶(hù)的最低資源保障,默認(rèn)為100%。第三個(gè)默認(rèn)值表示是否考慮作業(yè)優(yōu)先級(jí),默認(rèn)是不考慮。其他配置可以參考資料1中的講解。接下來(lái),初始化隊(duì)列信息,隊(duì)列信息由QueueManager對(duì)象獲得,該對(duì)象的構(gòu)造過(guò)程如下:
?
?
public QueueManager(Configuration conf) {checkDeprecation(conf);conf.addResource(QUEUE_ACLS_FILE_NAME);// Get configured ACLs and state for each queueaclsEnabled = conf.getBoolean("mapred.acls.enabled", false);queues.putAll(parseQueues(conf)); }synchronized private Map<String, Queue> parseQueues(Configuration conf) {Map<String, Queue> queues = new HashMap<String, Queue>();// First get the queue namesString[] queueNameValues = conf.getStrings("mapred.queue.names",new String[]{JobConf.DEFAULT_QUEUE_NAME});for (String name : queueNameValues) {Map queueACLs = getQueueAcls(name, conf);if (queueACLs == null) {LOG.error("The queue, " + name + " does not have a configured ACL list");}queues.put(name, new Queue(name, getQueueAcls(name, conf),getQueueState(name, conf), QueueMetrics.create(name, conf)));}return queues;}首先,獲取用戶(hù)權(quán)限配置文件mapred-queue-acls.xml。然后通過(guò)mapred-site.xml中的配置解析并生成隊(duì)列的列表queues。解析的過(guò)程是,先獲取每個(gè)隊(duì)列的名字,再通過(guò)名字獲取隊(duì)列的權(quán)限配置,最后依據(jù)這些信息以及隊(duì)列狀態(tài)和隊(duì)列度量對(duì)象構(gòu)造一個(gè)隊(duì)列并加入結(jié)果列表。如上面代碼。在初始化隊(duì)列之前還有構(gòu)造出每個(gè)隊(duì)列對(duì)應(yīng)的CapacitySchedulerQueue對(duì)象:
?
?
Map<String, CapacitySchedulerQueue> parseQueues(Collection<String> queueNames, CapacitySchedulerConf schedConf) throws IOException {Map<String, CapacitySchedulerQueue> queueInfoMap = new HashMap<String, CapacitySchedulerQueue>();// Sanity check: there should be at least one queue. if (0 == queueNames.size()) {throw new IllegalStateException("System has no queue configured");}float totalCapacityPercent = 0.0f;for (String queueName: queueNames) {float capacityPercent = schedConf.getCapacity(queueName);if (capacityPercent == -1.0) {throw new IOException("Queue '" + queueName + "' doesn't have configured capacity!");} totalCapacityPercent += capacityPercent;// create our Queue and add to our hashmapCapacitySchedulerQueue queue = new CapacitySchedulerQueue(queueName, schedConf);queueInfoMap.put(queueName, queue);}if (Math.floor(totalCapacityPercent) != 100) {throw new IllegalArgumentException("Sum of queue capacities not 100% at "+ totalCapacityPercent);} return queueInfoMap;}容量調(diào)度器隊(duì)列對(duì)象被裝入一個(gè)以隊(duì)列名為鍵的map中返回并用于初始化。獲取隊(duì)列后要進(jìn)行初始化,由函數(shù)initialize完成:
?
?
void initialize(QueueManager queueManager,Map<String, CapacitySchedulerQueue> newQueues,Configuration conf, CapacitySchedulerConf schedConf) {// Memory related configsinitializeMemoryRelatedConf(conf);// Setup queuesfor (Map.Entry<String, CapacitySchedulerQueue> e : newQueues.entrySet()) {String newQueueName = e.getKey();CapacitySchedulerQueue newQueue = e.getValue();CapacitySchedulerQueue currentQueue = queueInfoMap.get(newQueueName);if (currentQueue != null) {currentQueue.initializeQueue(newQueue);LOG.info("Updated queue configs for " + newQueueName);} else {queueInfoMap.put(newQueueName, newQueue);LOG.info("Added new queue: " + newQueueName);}}// Set SchedulingDisplayInfofor (String queueName : queueInfoMap.keySet()) {SchedulingDisplayInfo schedulingInfo = new SchedulingDisplayInfo(queueName, this);queueManager.setSchedulerInfo(queueName, schedulingInfo);}// Inform the queue manager jobQueuesManager.setQueues(queueInfoMap);// let our mgr objects know about the queuesmapScheduler.initialize(queueInfoMap);reduceScheduler.initialize(queueInfoMap);// scheduling tunablesmaxTasksPerHeartbeat = schedConf.getMaxTasksPerHeartbeat();maxTasksToAssignAfterOffSwitch = schedConf.getMaxTasksToAssignAfterOffSwitch();}具體過(guò)程如下:首先根據(jù)配置對(duì)象初始化跟內(nèi)存相關(guān)的一些變量;然后檢查某個(gè)隊(duì)列是否在queueInfoMap數(shù)據(jù)結(jié)構(gòu)中,若在,就更新隊(duì)列信息,若不在,則加入其中,該數(shù)據(jù)結(jié)構(gòu)提供了一個(gè)快速通過(guò)隊(duì)列名訪問(wèn)隊(duì)列的途徑;接下來(lái)設(shè)置每個(gè)隊(duì)列的調(diào)度信息用于展示或日志;然后將隊(duì)列map交給監(jiān)聽(tīng)器對(duì)象JobQueuesMananger;接著,將隊(duì)列信息再交給map和reduce調(diào)度器對(duì)象,每個(gè)調(diào)度器對(duì)象維護(hù)了可以獲取任務(wù)的隊(duì)列列表,用于調(diào)度時(shí)的隊(duì)列選擇;最后設(shè)置批量任務(wù)分配的最大數(shù)量。
?
上述過(guò)程中,用于不同任務(wù)調(diào)度的mapScheduler和reduceScheduler值得進(jìn)一步研究。隊(duì)列會(huì)被加入到map或reduce調(diào)度器的優(yōu)先級(jí)隊(duì)列中:
?
queuesForAssigningTasks.clear();queuesForAssigningTasks.addAll(queues.values());Collections.sort(queuesForAssigningTasks, queueComparator);隊(duì)列的優(yōu)先級(jí)由queueComparator定義,map和reduce的比較器實(shí)現(xiàn)基本相同,只是任務(wù)類(lèi)型不同:
?
?
public int compare(CapacitySchedulerQueue q1, CapacitySchedulerQueue q2) {// look at how much capacity they've filled. Treat a queue with// capacity=0 equivalent to a queue running at capacityTaskType taskType = getTaskType();double r1 = (0 == q1.getCapacity(taskType))? 1.0f:(double)q1.getNumSlotsOccupied(taskType)/(double) q1.getCapacity(taskType);double r2 = (0 == q2.getCapacity(taskType))? 1.0f:(double)q2.getNumSlotsOccupied(taskType)/(double) q2.getCapacity(taskType);if (r1<r2) return -1;else if (r1>r2) return 1;else return 0;}上述compare方法的實(shí)現(xiàn)表明,隊(duì)列的資源使用率越高,在隊(duì)列列表中的順序越靠后,優(yōu)先級(jí)越低。也就是說(shuō),Capacity Scheduler總是選擇資源利用率最低的隊(duì)列。至此,隊(duì)列初始化分析完畢。
?
接下來(lái),調(diào)度器將監(jiān)聽(tīng)器對(duì)象注冊(cè)到JobTracker:
?
// listen to job changestaskTrackerManager.addJobInProgressListener(jobQueuesManager);然后啟動(dòng)初始化線程:
?
?
//Start thread for initializationif (initializationPoller == null) {this.initializationPoller = new JobInitializationPoller(jobQueuesManager, schedConf, queueNames, taskTrackerManager);}initializationPoller.init(queueNames.size(), schedConf);initializationPoller.setDaemon(true);initializationPoller.start();初始化線程initializationPoller是個(gè)后臺(tái)線程。init方法為每個(gè)隊(duì)列指定一個(gè)初始化線程,線程總數(shù)總是小于或等于隊(duì)列的數(shù)量。然后啟動(dòng)每個(gè)初始化線程。
最后設(shè)置用于顯示調(diào)度器信息的Servlet:
?
?
if (taskTrackerManager instanceof JobTracker) {JobTracker jobTracker = (JobTracker) taskTrackerManager;HttpServer infoServer = jobTracker.infoServer;infoServer.setAttribute("scheduler", this);infoServer.addServlet("scheduler", "/scheduler",CapacitySchedulerServlet.class);}至此,調(diào)度器啟動(dòng)完畢。
2. 作業(yè)初始化
由于初始化的作業(yè)不能得到調(diào)度會(huì)占用過(guò)多內(nèi)存,容量調(diào)度器通過(guò)兩種策略初始化作業(yè):1. 優(yōu)先初始化最可能被調(diào)度器調(diào)度的作業(yè);2. 限制用戶(hù)初始化作業(yè)數(shù)目。詳細(xì)過(guò)程如下:作業(yè)被提交到JobTracker后,調(diào)度器的監(jiān)聽(tīng)器調(diào)用jobAdded方法:
?
// add job to the right queueCapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName());這條語(yǔ)句將作業(yè)加入對(duì)應(yīng)的隊(duì)列中。接下來(lái)調(diào)用隊(duì)列的addWaitingJob方法:
?
?
synchronized void addWaitingJob(JobInProgress job) throws IOException {JobSchedulingInfo jobSchedInfo = new JobSchedulingInfo(job);String user = job.getProfile().getUser();// Check acceptance limitscheckJobSubmissionLimits(job, user);waitingJobs.put(jobSchedInfo, job);// Update user statsUserInfo userInfo = users.get(user);if (userInfo == null) {userInfo = new UserInfo(comparator);users.put(user, userInfo);}userInfo.jobAdded(jobSchedInfo, job);}在該方法中,首先生成調(diào)度信息對(duì)象,此對(duì)象與默認(rèn)的FIFO調(diào)度器的調(diào)度信息對(duì)象一樣。然后檢查三個(gè)約束:
?
1. 作業(yè)的任務(wù)數(shù)是否超過(guò)每個(gè)用戶(hù)最大任務(wù)數(shù)
?
if (job.desiredTasks() > maxActiveTasksPerUser) {throw new IOException("Job '" + job.getJobID() + "' from user '" + user +"' rejected since it has " + job.desiredTasks() + " tasks which" +" exceeds the limit of " + maxActiveTasksPerUser + " tasks per-user which can be initialized for queue '" + queueName + "'");}2. 隊(duì)列中等待初始化、已經(jīng)初始化的作業(yè)數(shù)目和在運(yùn)行的作業(yè)數(shù)不能超過(guò)可接受值
?
?
// Across all jobs in queueint queueWaitingJobs = getNumWaitingJobs();int queueInitializingJobs = getNumInitializingJobs();int queueRunningJobs = getNumRunningJobs();if ((queueWaitingJobs + queueInitializingJobs + queueRunningJobs) >= maxJobsToAccept) {throw new IOException("Job '" + job.getJobID() + "' from user '" + user + "' rejected since queue '" + queueName + "' already has " + queueWaitingJobs + " waiting jobs, " + queueInitializingJobs + " initializing jobs and " + queueRunningJobs + " running jobs - Exceeds limit of " +maxJobsToAccept + " jobs to accept");}3. 用戶(hù)等待初始化、已經(jīng)初始化和在運(yùn)行的作業(yè)數(shù)不能超過(guò)可接受值
?
?
// Across all jobs of the userint userWaitingJobs = getNumWaitingJobsByUser(user);int userInitializingJobs = getNumInitializingJobsByUser(user);int userRunningJobs = getNumRunningJobsByUser(user);if ((userWaitingJobs + userInitializingJobs + userRunningJobs) >= maxJobsPerUserToAccept) {throw new IOException("Job '" + job.getJobID() + "' rejected since user '" + user + "' already has " + userWaitingJobs + " waiting jobs, " +userInitializingJobs + " initializing jobs and " +userRunningJobs + " running jobs - " +" Exceeds limit of " + maxJobsPerUserToAccept + " jobs to accept" +" in queue '" + queueName + "' per user");}
若有一個(gè)約束不滿足,則拋出異常。否則將作業(yè)加入等待初始化隊(duì)列。最后調(diào)用調(diào)度器的jobAdded方法通知調(diào)度器:
?
?
// called when a job is addedsynchronized void jobAdded(JobInProgress job) throws IOException {CapacitySchedulerQueue queue = queueInfoMap.get(job.getProfile().getQueueName());// Inform the queuequeue.jobAdded(job);// setup scheduler specific job informationpreInitializeJob(job);}首先獲取隊(duì)列,然后告訴隊(duì)列有作業(yè)加入,并將隊(duì)列中提交該作業(yè)的用戶(hù)的作業(yè)數(shù)更新。最后依據(jù)配置計(jì)算每個(gè)任務(wù)需要的slot數(shù)目(容量調(diào)度器支持大內(nèi)存作業(yè))。
?
作業(yè)初始化線程的入口在JobInitializationPoller(以下簡(jiǎn)稱(chēng)poller)的run方法。
?
public void run() {while (running) {try {cleanUpInitializedJobsList();selectJobsToInitialize();if (!this.isInterrupted()) {Thread.sleep(sleepInterval);}} catch (InterruptedException e) {LOG.error("Job Initialization poller interrupted"+ StringUtils.stringifyException(e));}}}?
在該方法中,首先從initializedJobs數(shù)據(jù)結(jié)構(gòu)中清除一些作業(yè),這些作業(yè)是正在運(yùn)行且獲得調(diào)度的作業(yè)或者是運(yùn)行完成的作業(yè)。接著,調(diào)用selectJobsToInitialize方法來(lái)選擇等待初始化的作業(yè)。具體過(guò)程如下,對(duì)于每個(gè)隊(duì)列,首先選擇該隊(duì)列中處于waitingJobs列表中的作業(yè):
?
ArrayList<JobInProgress> jobsToInitialize = getJobsToInitialize(queue);選擇的原則是:一看該作業(yè)是否已經(jīng)初始化;若不是,二檢查隊(duì)列中作業(yè)總數(shù)(正在運(yùn)行和正在初始化)和允許的活動(dòng)任務(wù)數(shù)是否超過(guò)上限;若沒(méi)有,檢查提交該作業(yè)的用戶(hù)是不是有過(guò)多的作業(yè)(正在運(yùn)行和正在初始化)或活動(dòng)的任務(wù);若仍不是,則進(jìn)一步檢查作業(yè)是否處于PREP狀態(tài)(沒(méi)有被kill掉),然后放入篩選結(jié)果列表,并通知所在隊(duì)列,將其放入initializingJobs列表。以上過(guò)程詳見(jiàn)getJobsToInitialized方法的實(shí)現(xiàn),這里不贅述。
?
下面獲取一個(gè)分配給該隊(duì)列的初始化線程,并將選擇初始化的作業(yè)加入屬于相應(yīng)隊(duì)列的調(diào)度列表中:
?
JobInitializationThread t = threadsToQueueMap.get(queue);for (JobInProgress job : jobsToInitialize) {t.addJobsToQueue(queue, job);}每個(gè)初始化線程維護(hù)了一個(gè)map(jobsPerQueue),通過(guò)隊(duì)列名字可以找到由該線程初始化的隊(duì)列的作業(yè)調(diào)度列表。
最后,我們來(lái)看初始化線程JobInitializationThread的run方法,該方法中不停地調(diào)用initializeJobs方法:
?
?
void initializeJobs() {// while there are more jobs to initialize...while (currentJobCount.get() > 0) {Set<String> queues = jobsPerQueue.keySet();for (String queue : queues) {JobInProgress job = getFirstJobInQueue(queue);if (job == null) {continue;}LOG.info("Initializing job : " + job.getJobID() + " in Queue "+ job.getProfile().getQueueName() + " For user : "+ job.getProfile().getUser());if (startIniting) {setInitializingJob(job);ttm.initJob(job);setInitializingJob(null);} else {break;}}}}從代碼可見(jiàn),獲取隊(duì)列中第一個(gè)作業(yè),將其交給JobTracker的initJob初始化,初始化詳細(xì)過(guò)程見(jiàn)前面的一系列文章。至此,容量調(diào)度器的作業(yè)初始化過(guò)程分析完畢。
作為該小節(jié)的結(jié)束,這里說(shuō)一下每個(gè)隊(duì)列中維護(hù)的幾個(gè)數(shù)據(jù)結(jié)構(gòu):
?
this.waitingJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);this.initializingJobs =new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);this.runningJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);正如名稱(chēng)暗示的那樣,三個(gè)列表分別持有等待初始化的作業(yè)、正在初始化的作業(yè)和正在運(yùn)行的作業(yè)。它們共有的參數(shù)為一個(gè)Comparator對(duì)象,用于定義列表中作業(yè)的順序。
它的初始化如下:
?
?
if (supportsPriorities) {// use the default priority-aware comparatorcomparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR;}else {comparator = STARTTIME_JOB_COMPARATOR;}如果調(diào)度器支持優(yōu)先級(jí),則比較器對(duì)象初始化為FIFO調(diào)度器中的FIFO比較器,原則是首先比較優(yōu)先級(jí),再比較開(kāi)始時(shí)間,最后比較作業(yè)ID。如果調(diào)度器不支持優(yōu)先級(jí),則初始化為開(kāi)始時(shí)間比較器,即先來(lái)先服務(wù)。
?
初始化線程會(huì)從waitingJobs列表中選擇要初始化的作業(yè),被選擇的作業(yè)會(huì)放入initializingJobs列表,初始化后得到調(diào)度的作業(yè)會(huì)進(jìn)入runningJobs列表。有關(guān)作業(yè)的調(diào)度見(jiàn)下一小節(jié)。
3. 任務(wù)調(diào)度
容量調(diào)度器采用三層調(diào)度模型:首先選擇一個(gè)隊(duì)列,其次選擇一個(gè)作業(yè),最后選擇作業(yè)的任務(wù)。任務(wù)選擇由調(diào)度器的assignTasks方法完成,下面詳述該方法。
首先調(diào)用下面方法更新各個(gè)隊(duì)列的資源使用信息:
?
updateAllQueues(mapClusterCapacity, reduceClusterCapacity);具體到每個(gè)隊(duì)列中,調(diào)用隊(duì)列的updateAll方法。
?
首先更新隊(duì)列最新的map和reduce資源量:
?
// Compute new capacities for maps and reducesmapSlots.updateCapacities(capacityPercent, maxCapacityPercent, mapClusterCapacity);reduceSlots.updateCapacities(capacityPercent, maxCapacityPercent, reduceClusterCapacity);接下來(lái)將以下信息更新到每個(gè)作業(yè)的調(diào)度信息對(duì)象中:
?
?
j.setSchedulingInfo(CapacityTaskScheduler.getJobQueueSchedInfo(numMapsRunningForThisJob, numRunningMapSlots,numReservedMapSlotsForThisJob,numReducesRunningForThisJob, numRunningReduceSlots,numReservedReduceSlotsForThisJob));包括:作業(yè)正在運(yùn)行的map和reduce作業(yè)數(shù),作業(yè)正在使用的map和reduce資源數(shù)和為這個(gè)作業(yè)保留的map和reduce資源數(shù)。
?
然后將每個(gè)作業(yè)的資源使用信息反映到該作業(yè)所在隊(duì)列的相關(guān)信息中:
?
update(TaskType.MAP, j, j.getProfile().getUser(), numMapsRunningForThisJob, numMapSlotsForThisJob);update(TaskType.REDUCE, j, j.getProfile().getUser(), numReducesRunningForThisJob, numReduceSlotsForThisJob);包括隊(duì)列中正在運(yùn)行的任務(wù)數(shù),正在使用的資源量和用戶(hù)使用的資源量等信息。
?
更新后,通過(guò)addMapTasks和addReduceTask兩個(gè)方法調(diào)度任務(wù):
?
// schedule tasksList<Task> result = new ArrayList<Task>();addMapTasks(taskTracker, result, maxMapSlots, currentMapSlots);addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);關(guān)于隊(duì)列和作業(yè)的優(yōu)先級(jí)前面已經(jīng)提到,這里關(guān)注任務(wù)的優(yōu)先順序。在addMapTasks方法中,調(diào)用CapacityScheduler的assignTasks方法:
?
?
JobInProgress job = taskTracker.getJobForFallowSlot(type);if (job != null) {if (availableSlots >= job.getNumSlotsPerTask(type)) {// Unreserve taskTracker.unreserveSlots(type, job);// We found a suitable job. Get task from it.if (type == TaskType.MAP) {// Don't care about locality!job.overrideSchedulingOpportunities();}return obtainNewTask(taskTrackerStatus, job, true);} else {// Re-reserve the current tasktrackertaskTracker.reserveSlots(type, job, availableSlots);return TaskLookupResult.getMemFailedResult(); }}首先,判斷TaskTracker是否正為某個(gè)作業(yè)預(yù)留資源(該作業(yè)為內(nèi)存密集型,一個(gè)任務(wù)可能需要多個(gè)slot,上次調(diào)度沒(méi)有足夠的slot分配,故將其預(yù)留給該作業(yè)用于下次調(diào)度。這是容量調(diào)度器的大內(nèi)存任務(wù)調(diào)度機(jī)制),若有預(yù)留,則判斷當(dāng)前可用的資源是否能滿足該作業(yè),若能則不再預(yù)留資源,并調(diào)用obtainNewTask方法將資源分配給該作業(yè)執(zhí)行;若不能,繼續(xù)將當(dāng)前資源預(yù)留給該作業(yè),并返回內(nèi)存失敗的結(jié)果。
?
如果TaskTracker沒(méi)有為某個(gè)作業(yè)預(yù)留資源,對(duì)于隊(duì)列集合中的每個(gè)隊(duì)里,從中選擇一個(gè)作業(yè),并調(diào)用obtainNewTask方法獲得一個(gè)任務(wù)。當(dāng)遇到當(dāng)前可用資源不能滿足一個(gè)任務(wù)時(shí),也要預(yù)留資源。注意,每次獲取一個(gè)任務(wù)都會(huì)返回獲取的狀態(tài),代碼如下:
?
for (CapacitySchedulerQueue queue : queuesForAssigningTasks) {//This call is for optimization if we are already over the//maximum-capacity we avoid traversing the queues.if (!queue.assignSlotsToQueue(type, 1)) {continue;}TaskLookupResult tlr = getTaskFromQueue(taskTracker, availableSlots, queue, assignOffSwitch);TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) {continue; // Look in other queues.}// if we find a task, returnif (lookUpStatus == TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND ||lookUpStatus == TaskLookupResult.LookUpStatus.OFF_SWITCH_TASK_FOUND) {return tlr;}// if there was a memory mismatch, returnelse if (lookUpStatus == TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT) {return tlr;}}最后來(lái)分析一下,獲取任務(wù)的核心方法obtainNewTask:
?
?
TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job, boolean assignOffSwitch) throws IOException {ClusterStatus clusterStatus = scheduler.taskTrackerManager.getClusterStatus();int numTaskTrackers = clusterStatus.getTaskTrackers();int numUniqueHosts = scheduler.taskTrackerManager.getNumberOfUniqueHosts();// Inform the job it is about to get a scheduling opportunityjob.schedulingOpportunity();// First, try to get a 'local' taskTask t = job.obtainNewNodeOrRackLocalMapTask(taskTracker,numTaskTrackers,numUniqueHosts);if (t != null) {return TaskLookupResult.getTaskFoundResult(t, job); }// Next, try to get an 'off-switch' task if appropriate// Do not bother as much about locality for High-RAM jobsif (job.getNumSlotsPerMap() > 1 || (assignOffSwitch && job.scheduleOffSwitch(numTaskTrackers))) {t = job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers, numUniqueHosts);}return (t != null) ? TaskLookupResult.getOffSwitchTaskFoundResult(t, job) :TaskLookupResult.getNoTaskFoundResult();}與FIFO調(diào)度器的實(shí)現(xiàn)類(lèi)似,首先也要試圖找到一個(gè)具有數(shù)據(jù)本地新的任務(wù)。若沒(méi)找到,則分配一個(gè)內(nèi)存密集型任務(wù)或off-switch的任務(wù)。具體的分配過(guò)程參見(jiàn)前面的文章對(duì)FIFO任務(wù)調(diào)度的分析( MapReduce任務(wù)調(diào)度與執(zhí)行原理之任務(wù)調(diào)度)。若仍然沒(méi)有找到,則返回沒(méi)有找到結(jié)果。
?
如果獲取到的任務(wù)數(shù)達(dá)到一次心跳返回的任務(wù)最大數(shù)量,則返回:
?
if (tasks.size() >= maxTasksPerHeartbeat) {return;}?
為了盡量提高任務(wù)的數(shù)據(jù)本地性,容量調(diào)度器采用了作業(yè)延遲調(diào)度機(jī)制:如果一個(gè)作業(yè)中未找到滿足數(shù)據(jù)本地性的任務(wù),則會(huì)讓該作業(yè)跳過(guò)一定數(shù)目的機(jī)會(huì),直到找到一個(gè)滿足數(shù)據(jù)本地性的任務(wù)或到達(dá)跳過(guò)次數(shù)上限。
?
if (job.getNumSlotsPerMap() > 1 || (assignOffSwitch && job.scheduleOffSwitch(numTaskTrackers))) {t = job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers, numUniqueHosts);}assignOffSwitch為true表示還未分配過(guò)不具有數(shù)據(jù)本地性的任務(wù),scheduleOffSwitch用于判斷作業(yè)是否到達(dá)跳過(guò)次數(shù)上限:
?
?
public boolean scheduleOffSwitch(int numTaskTrackers) {long missedTaskTrackers = getNumSchedulingOpportunities();long requiredSlots = Math.min((desiredMaps() - finishedMaps()), numTaskTrackers);return (requiredSlots * localityWaitFactor) < missedTaskTrackers;}localityWaitFactor表示作業(yè)輸入數(shù)據(jù)所在結(jié)點(diǎn)數(shù)占結(jié)點(diǎn)總數(shù)的比例,requiredSlots表示作業(yè)還需要的資源數(shù),二者的乘積來(lái)衡量跳過(guò)次數(shù)的上限,而missedTaskTrackers即為跳過(guò)次數(shù)。missedTaskTrackers每次分配任務(wù)時(shí)都會(huì)增加,如果分配到本地任務(wù),則返回任務(wù),該變量會(huì)重置為0;若沒(méi)有分配到,則表示跳過(guò)一次。在分配到非本地性任務(wù)后跳過(guò)次數(shù)也會(huì)重置為0。
?
reduce任務(wù)的分配機(jī)制相對(duì)簡(jiǎn)單,只采用了大內(nèi)存任務(wù)調(diào)度策略,調(diào)度器只要找到一個(gè)合適的reduce任務(wù)即返回,且沒(méi)有延遲調(diào)度。至此,容量調(diào)度器任務(wù)調(diào)度分析結(jié)束。
下一篇文章計(jì)劃學(xué)習(xí)Fair Scheduler。如有錯(cuò)誤和問(wèn)題,歡迎批評(píng)指正。
參考資料
?
【1】《Hadoop技術(shù)內(nèi)幕--深入解析MapReduce架構(gòu)設(shè)計(jì)與實(shí)現(xiàn)原理》董西成 【2】 ?Hadoop 1.0.0 源碼
?
2013年10月7日
?
?
?
?
總結(jié)
以上是生活随笔為你收集整理的MapReduce多用户任务调度器——容量调度器(Capacity Scheduler)原理和源码研究的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 对ios中CGContextRef和im
- 下一篇: 在新项目中要思考的技术点