TaskTracker获取并执行map或reduce任务的过程(一)
我們知道TaskTracker在默認情況下,每個3秒就行JobTracker發送一個心跳包,也就是在這個心跳包中包含對任務的請求。JobTracker返回給TaskTracker的心跳包中包含有各種action(任務),如果有滿足在此TaskTracker上執行的任務的話,該任務也就包含在心跳包的響應中。在TaskTracker端有線程專門等待map或reduce任務,并從隊列中取出執行。
1.?TaskTracker發送心跳包
TaskTracker是作為一個單獨的JVM運行的,它啟動以后一直處于offerService()函數中,每隔3秒就執行一次transmitHeartBeat函數,如下所示:
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);該函數具體代碼為:
HeartbeatResponse transmitHeartBeat(long now) throws IOException {......
if (status == null) {synchronized (this) {status = new TaskTrackerStatus(taskTrackerName, localHostname, httpPort, cloneAndResetRunningTaskStatuses(sendCounters), failures, maxMapSlots,maxReduceSlots); }} //// 檢查是否可以接受新的任務// boolean askForNewTask;long localMinSpaceStart;synchronized (this) {askForNewTask = ((status.countOccupiedMapSlots() < maxMapSlots || status.countOccupiedReduceSlots() < maxReduceSlots) && acceptNewTasks); localMinSpaceStart = minSpaceStart;}
......
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted,justInited,askForNewTask, heartbeatResponseId); ......return heartbeatResponse;}
我們從中可以看出,TaskTracker首先創建一個TaskTrackerStatus對象,其中包含有TaskTracker的各種信息,比如,map slot的數目,reducer slot槽的數目,TaskTracker所在的主機名等信息。然后,對TaskTracker的空閑的slot以及磁盤空間進行檢查,如果滿足相應的條件時,最終就會通過JobClient(為JobTracker的代理)將心跳信息發送給JobTracker,并得到JobTracker的響應HeartbeatResponse。如下所示,JobClient是InterTrackerProtocol的一個實例,而JobTracker實現了InterTrackerProtocol這個接口。
this.jobClient = (InterTrackerProtocol) UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Object>() {public Object run() throws IOException {return RPC.waitForProxy(InterTrackerProtocol.class,InterTrackerProtocol.versionID,jobTrackAddr, fConf);}});那么,TaskTracker怎樣通過JobTracker的代理與JobTracker進行通信呢?它是通過RPC調用JobTracker的heartbeat(......)方法而實現的。
2.?TaskTracker端獲取任務
TaskTracker接收到任務后,會將它們放入到相應的LinkedList中,LinkedList實現了List和Queue接口,它是基于鏈表實現的FIFO的隊列。
heartbeatInterval = heartbeatResponse.getHeartbeatInterval();if (actions != null){ for(TaskTrackerAction action: actions) {if (action instanceof LaunchTaskAction) {addToTaskQueue((LaunchTaskAction)action);......}}......
private void addToTaskQueue(LaunchTaskAction action) {
if (action.getTask().isMapTask()) {
mapLauncher.addToTaskQueue(action);
} else {
reduceLauncher.addToTaskQueue(action);
}
}
TaskTracker啟動的時候,創建了兩個線程:mapLauncher和reduceLauncher,它們分別處理map任務和reduce任務,map任務有mapLauncher負責將其放入到LinkedList中,reduce任務有reducerLauncher負責將其放入到它維護的LinkedList中。
public void addToTaskQueue(LaunchTaskAction action) {synchronized (tasksToLaunch) {TaskInProgress tip = registerTask(action, this);tasksToLaunch.add(tip);tasksToLaunch.notifyAll();}}mapLauncher或者是reducerLauncher根據接收到的action,創建對應的TaskTracker.TaskInProgress對象,并放入到隊列中,喚醒等待的線程進行處理。?如下所示,該線程負責從taskToLaunch中獲取task,當有空間的slot時,執行這個task。
synchronized (tasksToLaunch) {while (tasksToLaunch.isEmpty()) {tasksToLaunch.wait();}//get the TIPtip = tasksToLaunch.remove(0);task = tip.getTask();LOG.info("Trying to launch : " + tip.getTask().getTaskID() + " which needs " + task.getNumSlotsRequired() + " slots");} .....//得到空閑的slot后,啟動這個taskstartNewTask(tip);這樣,TaskTracker就得到了待處理的任務,具體如何執行請參考下一篇博客。
轉載于:https://www.cnblogs.com/yueliming/p/3278196.html
總結
以上是生活随笔為你收集整理的TaskTracker获取并执行map或reduce任务的过程(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: winphone8更换帐号问题的几点说明
- 下一篇: 转:背包问题的解法