YARN源码分析(一)-----ApplicationMaster
前言
在之前兩周主要學了HDFS中的一些模塊知識,其中的許多都或多或少有我們借鑒學習的地方,現在將目光轉向另外一個塊,被譽為MRv2,就是yarn,在Yarn中,解決了MR中JobTracker單點的問題,將此拆分成了ResourceManager和NodeManager這樣的結構,在每個節點上,還會有ApplicationMaster來管理應用程序的整個生命周期,的確在Yarn中,多了許多優秀的設計,而今天,我主要分享的就是這個ApplicationMaster相關的一整套服務,他是隸屬于ResoureManager的內部服務中的.了解了AM的啟動機制,你將會更進一步了解Yarn的任務啟動過程.
ApplicationMaster管理涉及類
ApplicationMaster管理涉及到了4大類,ApplicationMasterLauncher,AMLivelinessMonitor,ApplicationMasterService,以及ApplicationMaster自身類.下面介紹一下這些類的用途,在Yarn中,每個類都會有自己明確的功能模塊的區分.
1.ApplicationMasterLauncher--姑且叫做AM啟動關閉事件處理器,他既是一個服務也是一個處理器,在這個類中,只處理2類事件,launch和cleanup事件.分別對應啟動應用和關閉應用的情形.
2.AMLivelinessMonitor--這個類從名字上可以看出他是監控類,監控的對象是AM存活狀態的監控類,檢測的方法與之前的HDFS一樣,都是采用heartbeat的方式,如果有節點過期了,將會觸發一次過期事件.
3.ApplicationMasterService--AM請求服務處理類.AMS存在于ResourceManager,中,服務的對象是各個節點上的ApplicationMaster,負責接收各個AM的注冊請求,更新心跳包信息等.
4.ApplicationMaster--節點應用管理類,簡單的說,ApplicationMaster負責管理整個應用的生命周期.
簡答的描述完AM管理的相關類,下面從源碼級別分析一下幾個流程.
AM啟動
要想讓AM啟動,啟動的背景當然是有用戶提交了新的Application的時候,之后ApplicationMasterLauncher會生成Launch事件,與對應的nodemanager通信,讓其準備啟動的新的AM的Container.在這里,就用到了ApplicationMasterLauncher這個類,之前在上文中已經提到,此類就處理2類事件,Launch啟動和Cleanup清洗事件,先來看看這個類的基本變量設置
//Application應用事件處理器 public class ApplicationMasterLauncher extends AbstractService implementsEventHandler<AMLauncherEvent> {private static final Log LOG = LogFactory.getLog(ApplicationMasterLauncher.class);private final ThreadPoolExecutor launcherPool;private LauncherThread launcherHandlingThread;//事件隊列private final BlockingQueue<Runnable> masterEvents= new LinkedBlockingQueue<Runnable>();//資源管理器上下文protected final RMContext context;public ApplicationMasterLauncher(RMContext context) {super(ApplicationMasterLauncher.class.getName());this.context = context;//初始化線程池this.launcherPool = new ThreadPoolExecutor(10, 10, 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());//新建處理線程this.launcherHandlingThread = new LauncherThread();}還算比較簡單,有一個masterEvents事件隊列,還有執行線程以及所需的線程池執行環境。在RM相關的服務中,基本都是繼承自AbstractService這個抽象服務類的。ApplicationMasterLauncher中主要處理2類事件,就是下面的展示的
@Overridepublic synchronized void handle(AMLauncherEvent appEvent) {AMLauncherEventType event = appEvent.getType();RMAppAttempt application = appEvent.getAppAttempt();//處理來自ApplicationMaster獲取到的請求,分為啟動事件和清洗事件2種switch (event) {case LAUNCH:launch(application);break;case CLEANUP:cleanup(application);default:break;}}然后調用具體的實現方法,以啟動事件launch事件為例 //添加應用啟動事件private void launch(RMAppAttempt application) {Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.LAUNCH);//將啟動事件加入事件隊列中masterEvents.add(launcher);}這些事件被加入到事件隊列之后,是如何被處理的呢,通過消息隊列的形式,在一個獨立的線程中逐一被執行 //執行線程實現private class LauncherThread extends Thread {public LauncherThread() {super("ApplicationMaster Launcher");}@Overridepublic void run() {while (!this.isInterrupted()) {Runnable toLaunch;try {//執行方法為從事件隊列中逐一取出事件toLaunch = masterEvents.take();//放入線程池池中進行執行launcherPool.execute(toLaunch);} catch (InterruptedException e) {LOG.warn(this.getClass().getName() + " interrupted. Returning.");return;}}}}如果論到事件的具體執行方式,就要看具體AMLauch是如何執行的,AMLauch本身就是一個runnable實例。 /*** The launch of the AM itself.* Application事件執行器*/ public class AMLauncher implements Runnable {private static final Log LOG = LogFactory.getLog(AMLauncher.class);private ContainerManagementProtocol containerMgrProxy;private final RMAppAttempt application;private final Configuration conf;private final AMLauncherEventType eventType;private final RMContext rmContext;private final Container masterContainer; 在里面主要的run方法如下,就是按照事件類型進行區分操作 @SuppressWarnings("unchecked")public void run() {//AMLauncher分2中事件分別處理switch (eventType) {case LAUNCH:try {LOG.info("Launching master" + application.getAppAttemptId());//調用啟動方法launch();handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),RMAppAttemptEventType.LAUNCHED));...break;case CLEANUP:try {LOG.info("Cleaning master " + application.getAppAttemptId());//調用作業清洗方法cleanup();...break;default:LOG.warn("Received unknown event-type " + eventType + ". Ignoring.");break;}}后面的launch操作會調用RPC函數與遠程的NodeManager通信來啟動Container。然后到了ApplicationMaster的run()啟動方法,在啟動方法中,會進行應用注冊的方法,@SuppressWarnings({ "unchecked" })public boolean run() throws YarnException, IOException {LOG.info("Starting ApplicationMaster");Credentials credentials =UserGroupInformation.getCurrentUser().getCredentials();DataOutputBuffer dob = new DataOutputBuffer();credentials.writeTokenStorageToStream(dob);// Now remove the AM->RM token so that containers cannot access it.Iterator<Token<?>> iter = credentials.getAllTokens().iterator();while (iter.hasNext()) {Token<?> token = iter.next();if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {iter.remove();}}allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());//與ResourceManager通信,周期性發送心跳信息,包含了應用的最新信息AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);amRMClient.init(conf);amRMClient.start();.....// Register self with ResourceManager// This will start heartbeating to the RM//啟動之后進行AM的注冊appMasterHostname = NetUtils.getHostname();RegisterApplicationMasterResponse response = amRMClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort,appMasterTrackingUrl);// Dump out information about cluster capability as seen by the// resource managerint maxMem = response.getMaximumResourceCapability().getMemory();LOG.info("Max mem capabililty of resources in this cluster " + maxMem);// A resource ask cannot exceed the max.if (containerMemory > maxMem) {LOG.info("Container memory specified above max threshold of cluster."+ " Using max value." + ", specified=" + containerMemory + ", max="+ maxMem);containerMemory = maxMem;}在這個操作中,會將自己注冊到AMLivelinessMonitor中,此刻開始啟動心跳監控。
AMLiveLinessMonitor監控
在這里把重心從ApplicationMaster轉移到AMLivelinessMonitor上,首先這是一個激活狀態的監控線程,此類線程都有一個共同的父類
//應用存活狀態監控線程 public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAttemptId> {在AbstractlinessMonitor中定義監控類線程的一類特征和方法 //進程存活狀態監控類 public abstract class AbstractLivelinessMonitor<O> extends AbstractService {private static final Log LOG = LogFactory.getLog(AbstractLivelinessMonitor.class);//thread which runs periodically to see the last time since a heartbeat is//received.//檢查線程private Thread checkerThread;private volatile boolean stopped;//默認超時時間5分鐘public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins//超時時間private int expireInterval = DEFAULT_EXPIRE;//監控間隔檢測時間,為超時時間的1/3private int monitorInterval = expireInterval/3;private final Clock clock;//保存了心跳檢驗的結果記錄private Map<O, Long> running = new HashMap<O, Long>();心跳檢測本身非常的簡單,做一次通信記錄檢查,然后更新一下,記錄時間,當一個新的節點加入監控或解除監控操作 //新的節點注冊心跳監控public synchronized void register(O ob) {running.put(ob, clock.getTime());}//節點移除心跳監控public synchronized void unregister(O ob) {running.remove(ob);}每次做心跳周期檢測的時候,調用下述方法 //更新心跳監控檢測最新時間public synchronized void receivedPing(O ob) {//only put for the registered objectsif (running.containsKey(ob)) {running.put(ob, clock.getTime());}}非常簡單的更新方法,O ob對象在這里因場景而異,在AM監控中,為ApplicationID應用ID。在后面的AMS和AM的交互中會看到。新的應用加入AMLivelinessMonitor監控中后,后面的主要操作就是AMS與AM之間的交互操作了。AM與AMS
在ApplicationMaster運行之后,會周期性的向ApplicationMasterService發送心跳信息,心跳信息包含有許多資源描述信息。
//ApplicationMaster心跳信息更新@Overridepublic AllocateResponse allocate(AllocateRequest request)throws YarnException, IOException {ApplicationAttemptId appAttemptId = authorizeRequest();//進行心跳信息時間的更新this.amLivelinessMonitor.receivedPing(appAttemptId);....每次心跳信息一來,就會更新最新監控時間。在AMS也有對應的注冊應用的方法 //ApplicationMaster在ApplicationMasterService上服務上進行應用注冊@Overridepublic RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnException,IOException {ApplicationAttemptId applicationAttemptId = authorizeRequest();ApplicationId appID = applicationAttemptId.getApplicationId();.....//在存活監控線程上進行心跳記錄,更新檢測時間,key為應用IDthis.amLivelinessMonitor.receivedPing(applicationAttemptId);RMApp app = this.rmContext.getRMApps().get(appID);// Setting the response id to 0 to identify if the// application master is register for the respective attemptidlastResponse.setResponseId(0);responseMap.put(applicationAttemptId, lastResponse);LOG.info("AM registration " + applicationAttemptId);this.rmContext如果在心跳監控中出現過期的現象,就會觸發一個expire事件,在AMLiveLinessMonitor中,這部分的工作是交給CheckThread執行的 //進程存活狀態監控類 public abstract class AbstractLivelinessMonitor<O> extends AbstractService {...//thread which runs periodically to see the last time since a heartbeat is//received.//檢查線程private Thread checkerThread;....//默認超時時間5分鐘public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins//超時時間private int expireInterval = DEFAULT_EXPIRE;//監控間隔檢測時間,為超時時間的1/3private int monitorInterval = expireInterval/3;....//保存了心跳檢驗的結果記錄private Map<O, Long> running = new HashMap<O, Long>();...private class PingChecker implements Runnable {@Overridepublic void run() {while (!stopped && !Thread.currentThread().isInterrupted()) {synchronized (AbstractLivelinessMonitor.this) {Iterator<Map.Entry<O, Long>> iterator = running.entrySet().iterator();//avoid calculating current time everytime in looplong currentTime = clock.getTime();while (iterator.hasNext()) {Map.Entry<O, Long> entry = iterator.next();//進行超時檢測if (currentTime > entry.getValue() + expireInterval) {iterator.remove();//調用超時處理方法,將處理事件交由調度器處理expire(entry.getKey());LOG.info("Expired:" + entry.getKey().toString() + " Timed out after " + expireInterval/1000 + " secs");}}} check線程主要做的事件就是遍歷每個節點的最新心跳更新時間,通過計算差值進行判斷是否過期,過期調用expire方法。此方法由其子類實現 //應用存活狀態監控線程 public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAttemptId> {//中央調度處理器private EventHandler dispatcher;...@Overrideprotected void expire(ApplicationAttemptId id) {//一旦應用過期,處理器處理過期事件處理dispatcher.handle(new RMAppAttemptEvent(id, RMAppAttemptEventType.EXPIRE));} }產生應用超期事件,然后發給中央調度器去處理。之所以采用的這樣的方式,是因為在RM中,所有的模塊設計是以事件驅動的形式工作,最大程度的保證了各個模塊間的解耦。不同模塊通過不同的事件轉變為不同的狀態,可以理解為狀態機的改變。最后用一張書中的截圖簡單的展示AM模塊相關的調用過程。全部代碼的分析請點擊鏈接https://github.com/linyiqun/hadoop-yarn,后續將會繼續更新YARN其他方面的代碼分析。
參考文獻
《Hadoop技術內部–HDFS結構設計與實現原理》.蔡斌等
總結
以上是生活随笔為你收集整理的YARN源码分析(一)-----ApplicationMaster的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 腾讯、阿里、华为等各大互联网公司年终奖到
- 下一篇: 干货 | Elasticsearch 可