如何编写Hadoop调度器
在Hadoop中,調度器是一個可插拔的模塊,用戶可以根據自己的實際應用要求設計調度器,然后在配置文件中指定相應的調度器,這樣,當Hadoop集群啟動時,便會加載該調度器。當前Hadoop自帶了幾種調度器,分別是FIFO(默認調度器),Capacity Scheduler和FairScheduler,通常境況下,這些調度器很難滿足公司復雜的應用需求,因而往往需要開發自己的調度器。本文介紹了Hadoop調度器的基本編寫方法。
2. Hadoop調度框架
Hadoop的調度器是在JobTracker中加載和調用的,用戶可以在配置文件mapred-site.xml中的mapred.jobtracker.taskScheduler屬性中指定調度器。本節分析了Hadoop調度器的調度框架,實際上分析了兩個重要類:TaskScheduler和JobTracker的關系。
(1) TaskScheduler
如果用戶要編寫自己的調度器,需要繼承抽象類TaskScheduler,該類的接口如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | abstract class TaskScheduler implements Configurable { protected Configuration conf; //配置文件 protected TaskTrackerManager taskTrackerManager; //一般會設為JobTracker public Configuration getConf() { ??return conf; } public void setConf(Configuration conf) { ??this.conf = conf; } public synchronized void setTaskTrackerManager( TaskTrackerManager taskTrackerManager) { ??this.taskTrackerManager = taskTrackerManager; } public void start() throws IOException { //初始化函數,如加載配置文件等 ??// do nothing } public void terminate() throws IOException { //結束函數 // do nothing } //最重要的函數,為該taskTracker分配合適的task public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException; ??//根據隊列名字獲job列表 public abstract Collection<JobInProgress> getJobs(String queueName); } |
(2) JobTracker
JobTracker是Hadoop最核心的組件,它監控整個集群中的作業運行情況并對資源進行管理和調度。
每個TaskTracker每個3s(默認值,可配置)通過heartbeat向JobTracker匯報自己管理的機器的一些基本信息,包括內存使用量,內存剩余量,正在運行的task,空閑的slot數目等,一旦JobTracker發現該TaskTracker出現了空閑的slot,便會調用調度器中的AssignTasks方法為該TaskTracker分配task。
下面分析JobTracker調用TaskScheduler的具體流程:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 | …… private final TaskScheduler taskScheduler; //聲明調度器對象 …… public static JobTracker startTracker(JobConf conf, String identifier) { ??……. ??result = new JobTracker(conf, identifier); ??result.taskScheduler.setTaskTrackerManager(result); //設置調度器的manager ??…… } //創建調度器 JobTracker(JobConf conf, String identifier) { ??…… ??// Create the scheduler ??Class<? extends TaskScheduler> schedulerClass ??= conf.getClass("mapred.jobtracker.taskScheduler", ????JobQueueTaskScheduler.class, TaskScheduler.class); ??taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf); ??….. } //run forever public void offerService() { ??…… ??taskScheduler.start(); //啟動調度器 ??…… } 。。。。。 HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ??……. ??// Check for new tasks to be executed on the tasktracker ??if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) { ????…… ????//使用調度器,為該taskTracker分配作業 ????tasks = taskScheduler.assignTasks(taskTrackerStatus); ????…… ??} } |
從上面的分析可以知道,Scheduler和JobTracker之間會相互包含(實際上是組合模式),Scheduler中要包含JobTracker(實際上就是TaskTrackerManager)對象,以便獲取整個Hadoop集群的一些信息,如slot總數,QueueManager對象,添加JobInProgressListener以便增加或刪除job時,通知Scheduler;JobTracker中要包含Scheduler對象,以便可以對每個TaskTracker分配task。
3. 編寫Hadoop調度器
假設我們要編寫一個新的調度器,為MyHadoopScheduler,需要進行以下工作:
(1) 用戶需要自己實現的類
@ MyHadoopSchedulerConf:配置文件管理類,讀取你自己的配置文件,并保存到合適的數據結構中,一般而言,這個類應該支持動態加載配置文件。
@ MyHadoopSchedulerListener:編寫自己的JobInProgressListener,并調用JobTracker的addJobInProgressListener(),將之加到系統的Listener隊列中,以便系統中添加或刪除job后,JobTracker可立刻告訴調度器。
@ MyHadoopScheduler:調度器的核心實現算法
(2) 用戶要用到的系統類
@?JobTracker:JobTracker在startTracker函數中,會將MyHadoopScheduler的taskTrackerManager賦值為JobTracker對象,這樣,在MyHadoopScheduler中,可調用Jobracker中的所有public方法和成員變量,常用的有:
$ getClusterStatus():獲取集群的狀態,如tasktracker列表,map slot總數,reduce slot總數,當前正在運行的map/reduce task總數等
$ getQueueManager():如果MyHadoopScheduler支持多隊列,那么需要使用該方法獲取QueueManager對象,通過該對象,會用可以獲取系統的所有隊列名稱,每個隊列的ACL(Access Control List),具體參考:http://hadoop.apache.org/common/docs/current/service_level_auth.html
$ killJob:可以調用該函數殺死某個job
$ killTask:如果調度器支持資源搶占,可調用該函數 殺死某個task以便進行資源搶占。
@?JobInprogress:用戶向Hadoop中提交一個job后,Hadoop會為該job創建一個叫JobInProgress的對象,該對象中包含了job相關的基本信息,且它會伴隨某個job的一生(與job共存亡)。該對象中包含的job信息有:該job包含的所有task的信息(如:正在運行的task列表,已經完成的task列表,尚未運行的task列表等),作業的優先級,作業的提交時間,開始運行時間,運行結束時間等信息。
在JobInprogress的task列表中,每個task以對象TaskInProgress的形式保存,該對象中包含了每個task的基本信息,包括:task要處理的數據split,task創建時間,task開始執行時間,task結束時間等信息。這些信息肯定會在調度器中使用。
@?JobConf
每個作業的運行參數和配置選項被保存到一個JobConf對象中,該對象包含了配置文件mapred-site.xml,core-site.xml和hdfs-site.xml設置的選項和該作業的特有屬性(用戶名,InputFormat,Mapper等),一般是以key/value的形式保存,比如:想獲取當前用戶名,可以這樣:
| 1 2 3 4 5 | JobConf conf; ……. String username = conf.get("user.name"); |
用戶也可以通過該對象傳遞一些自己定義的全局屬性,如用戶自己定義了一個屬性叫mapred.job.deadline(作業的deadline時間),用戶可以在提交作業時設定該值:
hadoop jar hadoop-examples.jar wordcount -files cachefile.txt \
-D mapred.job.deadline=100000 \
input output
然后在調度器中這樣獲取該屬性的值:
| 1 2 3 4 5 | JobConf conf; ……. int deadline=conf.getInt("mapred.job.deadline", -1); //獲取mapred.job.deadline屬性,如果沒有設置,則返回-1 |
4. 總結
調度器是Hadoop的中樞,其重要性可想而知。用戶如果要設計Hadoop調度器,需要對Hadoop的整個框架有比較深入的理解,同時需閱讀一些很重要的類(如JobTracker和JobInprogress等)的源碼,以便利用這些類完成你的調度算法。
Hadoop目前自帶了三個比較常用的調度器,分別為JobQueueTaskScheduler (FIFO,但隊列調度器),Capacity Scheduler(多隊列多用戶調度器)和Fair Scheduler(多隊列多用戶調度器),它們是你學習Hadoop調度器的最好資料。
5. 參考資料
(1) Hadoop-0.20.2源代碼
原創文章,轉載請注明:?轉載自董的博客
本文鏈接地址:?http://dongxicheng.org/mapreduce/how-to-write-hadoop-schedulers/
總結
以上是生活随笔為你收集整理的如何编写Hadoop调度器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 梯度、梯度下降,随机梯度下降
- 下一篇: 《Effective STL》学习笔记(