Spark on YARN cluster client 模式作业运行全过程分析
一、Spark:Yarn-Cluster 與 Yarn-Client 的區別與聯系
我們都知道Spark支持在yarn上運行,但是Spark on yarn有分為兩種模式yarn-cluster和yarn-client,它們究竟有什么區別與聯系?閱讀完本文,你將了解。
Spark支持可插拔的集群管理模式(Standalone、Mesos以及YARN ),集群管理負責啟動executor進程,編寫Spark application 的人根本不需要知道Spark用的是什么集群管理。Spark支持的三種集群模式,這三種集群模式都由兩個組件組成:master和slave。Master服務(YARN ResourceManager,Mesos master和Spark standalone master)決定哪些application可以運行,什么時候運行以及哪里去運行。而slave服務( YARN NodeManager, Mesos slave和Spark standalone slave)實際上運行executor進程。
當在YARN上運行Spark作業,每個Spark executor作為一個YARN容器(container)運行。Spark可以使得多個Tasks在同一個容器(container)里面運行。這是個很大的優點。
注意這里和Hadoop的MapReduce作業不一樣,MapReduce作業為每個Task開啟不同的JVM來運行。雖然說MapReduce可以通過參數來配置。詳見 mapreduce.job.jvm.numtasks。關于這個參數的介紹已經超過本篇文章的介紹。
從廣義上講,yarn-cluster適用于生產環境;而yarn-client適用于交互和調試,也就是希望快速地看到application的輸出。
在我們介紹yarn-cluster和yarn-client的深層次的區別之前,我們先明白一個概念:Application Master。在YARN中,每個Application實例都有一個Application Master進程,它是Application啟動的第一個容器。它負責和ResourceManager打交道,并請求資源。獲取資源之后告訴NodeManager為其啟動container。
從深層次的含義講,yarn-cluster和yarn-client模式的區別其實就是Application Master進程的區別,yarn-cluster模式下,driver運行在AM(Application Master)中,它負責向YARN申請資源,并監督作業的運行狀況。當用戶提交了作業之后,就可以關掉Client,作業會繼續在YARN上運行。然而yarn-cluster模式不適合運行交互類型的作業。而yarn-client模式下,Application Master僅僅向YARN請求executor,client會和請求的container通信來調度他們工作,也就是說Client不能離開。看下下面的兩幅圖應該會明白(上圖是yarn-cluster模式,下圖是yarn-client模式):
圖一:yarn cluste
圖二:yarn client
二、Spark on YARN cluster 模式作業運行全過程分析
下面是分析Spark on YARN的Cluster模式,從用戶提交作業到作業運行結束整個運行期間的過程分析。
客戶端進行操作
1、根據yarnConf來初始化yarnClient,并啟動yarnClient
2、創建客戶端Application,并獲取Application的ID,進一步判斷集群中的資源是否滿足executor和ApplicationMaster申請的資源,如果不滿足則拋出IllegalArgumentException;
3、設置資源、環境變量:其中包括了設置Application的Staging目錄、準備本地資源(jar文件、log4j.properties)、設置Application其中的環境變量、創建Container啟動的Context等;
4、設置Application提交的Context,包括設置應用的名字、隊列、AM的申請的Container、標記該作業的類型為Spark;
5、申請Memory,并最終通過yarnClient.submitApplication向ResourceManager提交該Application。
當作業提交到YARN上之后,客戶端就沒事了,甚至在終端關掉那個進程也沒事,因為整個作業運行在YARN集群上進行,運行的結果將會保存到HDFS或者日志中。
提交到YARN集群,YARN操作
1、運行ApplicationMaster的run方法;
2、設置好相關的環境變量。
3、創建amClient,并啟動;
4、在Spark UI啟動之前設置Spark UI的AmIpFilter;
5、在startUserClass函數專門啟動了一個線程(名稱為Driver的線程)來啟動用戶提交的Application,也就是啟動了Driver。在Driver中將會初始化SparkContext;
6、等待SparkContext初始化完成,最多等待spark.yarn.applicationMaster.waitTries次數(默認為10),如果等待了的次數超過了配置的,程序將會退出;否則用SparkContext初始化yarnAllocator;
怎么知道SparkContext初始化完成?
其實在5步驟中啟動Application的過程中會初始化SparkContext,在初始化SparkContext的時候將會創建YarnClusterScheduler,在SparkContext初始化完成的時候,會調用YarnClusterScheduler類中的postStartHook方法,而該方法會通知ApplicationMaster已經初始化好了SparkContext
7、當SparkContext、Driver初始化完成的時候,通過amClient向ResourceManager注冊ApplicationMaster
8、分配并啟動Executeors。在啟動Executeors之前,先要通過yarnAllocator獲取到numExecutors個Container,然后在Container中啟動Executeors。如果在啟動Executeors的過程中失敗的次數達到了maxNumExecutorFailures的次數,maxNumExecutorFailures的計算規則如下:
那么這個Application將失敗,將Application Status標明為FAILED,并將關閉SparkContext。其實,啟動Executeors是通過ExecutorRunnable實現的,而ExecutorRunnable內部是啟動CoarseGrainedExecutorBackend的。
9、最后,Task將在CoarseGrainedExecutorBackend里面運行,然后運行狀況會通過Akka通知CoarseGrainedScheduler,直到作業運行完成。
三、Spark on YARN client 模式作業運行全過程分析
在前篇文章中我介紹了Spark on YARN集群模式(yarn-cluster)作業從提交到運行整個過程的情況(詳情見《Spark on YARN集群模式作業運行全過程分析》),我們知道Spark on yarn有兩種模式:yarn-cluster和yarn-client。這兩種模式作業雖然都是在yarn上面運行,但是其中的運行方式很不一樣,今天我就來談談Spark on YARN yarn-client模式作業從提交到運行的過程剖析。
和yarn-cluster模式一樣,整個程序也是通過spark-submit腳本提交的。但是yarn-client作業程序的運行不需要通過Client類來封裝啟動,而是直接通過反射機制調用作業的main函數。下面就來分析:
1、通過SparkSubmit類的launch的函數直接調用作業的main函數(通過反射機制實現),如果是集群模式就會調用Client的main函數。
2、而應用程序的main函數一定都有個SparkContent,并對其進行初始化;
3、在SparkContent初始化中將會依次做如下的事情:設置相關的配置、注冊MapOutputTracker、BlockManagerMaster、BlockManager,創建taskScheduler和dagScheduler;其中比較重要的是創建taskScheduler和dagScheduler。在創建taskScheduler的時候會根據我們傳進來的master來選擇Scheduler和SchedulerBackend。由于我們選擇的是yarn-client模式,程序會選擇YarnClientClusterScheduler和YarnClientSchedulerBackend,并將YarnClientSchedulerBackend的實例初始化YarnClientClusterScheduler,上面兩個實例的獲取都是通過反射機制實現的,YarnClientSchedulerBackend類是CoarseGrainedSchedulerBackend類的子類,YarnClientClusterScheduler是TaskSchedulerImpl的子類,僅僅重寫了TaskSchedulerImpl中的getRackForHost方法。
4、初始化完taskScheduler后,將創建dagScheduler,然后通過taskScheduler.start()啟動taskScheduler,而在taskScheduler啟動的過程中也會調用SchedulerBackend的start方法。在SchedulerBackend啟動的過程中將會初始化一些參數,封裝在ClientArguments中,并將封裝好的ClientArguments傳進Client類中,并client.runApp()方法獲取Application ID。
5、client.runApp里面的做是和前面客戶端進行操作那節類似,不同的是在里面啟動是ExecutorLauncher(yarn-cluster模式啟動的是ApplicationMaster)。
6、在ExecutorLauncher里面會初始化并啟動amClient,然后向ApplicationMaster注冊該Application。注冊完之后將會等待driver的啟動,當driver啟動完之后,會創建一個MonitorActor對象用于和CoarseGrainedSchedulerBackend進行通信(只有事件AddWebUIFilter他們之間才通信,Task的運行狀況不是通過它和CoarseGrainedSchedulerBackend通信的)。然后就是設置addAmIpFilter,當作業完成的時候,ExecutorLauncher將通過amClient設置Application的狀態為FinalApplicationStatus.SUCCEEDED。
7、分配Executors,這里面的分配邏輯和yarn-cluster里面類似,就不再說了。
8、最后,Task將在CoarseGrainedExecutorBackend里面運行,然后運行狀況會通過Akka通知CoarseGrainedScheduler,直到作業運行完成。
9、在作業運行的時候,YarnClientSchedulerBackend會每隔1秒通過client獲取到作業的運行狀況,并打印出相應的運行信息,當Application的狀態是FINISHED、FAILED和KILLED中的一種,那么程序將退出等待。
10、最后有個線程會再次確認Application的狀態,當Application的狀態是FINISHED、FAILED和KILLED中的一種,程序就運行完成,并停止SparkContext。整個過程就結束了。
總結
以上是生活随笔為你收集整理的Spark on YARN cluster client 模式作业运行全过程分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: IDEA 2021.1.2中scala生
- 下一篇: spark mapreduce术语梳理