Spark 任务调度机制详解
Spark 任務調度機制
在工廠環境下,Spark 集群的部署方式一般為 YARN-Cluster 模式,之后的內核分析內容中我們默認集群的部署方式為 YARN-Cluster 模式。
4.1 Spark 任務提交流程
在上一章中我們講解了 Spark YARN-Cluster 模式下的任務提交流程,如下圖所示:
下面的時序圖清晰地說明了一個 Spark 應用程序從提交到運行的完整流程:
提交一個 Spark 應用程序,首先通過 Client 向 ResourceManager 請求啟動一個Application,同時檢查是否有足夠的資源滿足 Application 的需求,如果資源條件滿足,則準備 ApplicationMaster 的啟動上下文,交給 ResourceManager,并循環監控Application 狀態。
當提交的資源隊列中有資源時,ResourceManager 會在某個 NodeManager 上啟動 ApplicationMaster 進程,ApplicationMaster 會單獨啟動 Driver 后臺線程,當 Driver啟動后,ApplicationMaster 會通過本地的 RPC 連接 Driver,并開始向 ResourceManager申請 Container 資源運行 Executor 進程(一個 Executor 對應與一個 Container),當ResourceManager 返回Container 資源,ApplicationMaster 則在對應的 Container 上啟動 Executor。
Driver 線程主要是初始化 SparkContext 對象,準備運行所需的上下文,然后一方面保持與 ApplicationMaster 的 RPC 連接,通過 ApplicationMaster 申請資源,另一方面根據用戶業務邏輯開始調度任務,將任務下發到已有的空閑 Executor 上。
當 ResourceManager 向 ApplicationMaster 返 回 Container 資源時,ApplicationMaster 就嘗試在對應的 Container 上啟動 Executor 進程,Executor 進程起來后,會向 Driver 反向注冊,注冊成功后保持與 Driver 的心跳,同時等待 Driver分發任務,當分發的任務執行完畢后,將任務狀態上報給 Driver。
從上述時序圖可知,Client 只負責提交 Application 并監控 Application 的狀態。對于 Spark 的任務調度主要是集中在兩個方面: 資源申請和任務分發,其主要是通過 ApplicationMaster、Driver 以及 Executor 之間來完成。
4.2 Spark 任務調度概述
當 Driver 起來后,Driver 則會根據用戶程序邏輯準備任務,并根據 Executor 資源情況逐步分發任務。在詳細闡述任務調度前,首先說明下 Spark 里的幾個概念。
一個 Spark 應用程序包括 Job、Stage 以及 Task 三個概念:
-
Job 是以 Action 方法為界,遇到一個 Action 方法則觸發一個 Job;
-
Stage 是 Job 的子集,以 RDD 寬依賴(即 Shuffle)為界,遇到 Shuffle 做一次劃分;
-
Task 是 Stage 的子集,以并行度(分區數)來衡量,分區數是多少,則有多少個 task。
Spark 的任務調度總體來說分兩路進行,一路是 Stage 級的調度,一路是 Task級的調度,總體調度流程如下圖所示:
Spark RDD 通過其 Transactions 操作,形成了 RDD 血緣關系圖,即 DAG,最后通過 Action 的調用,觸發 Job 并調度執行。DAGScheduler 負責 Stage 級的調度,主要是將 DAG 切分成若干 Stages,并將每個 Stage 打包成 TaskSet 交給 TaskScheduler調度。TaskScheduler 負責 Task 級的調度,將 DAGScheduler 給過來的 TaskSet 按照指定的調度策略分發到 Executor 上執行,調度過程中 SchedulerBackend 負責提供可用資源,其中 SchedulerBackend 有多種實現,分別對接不同的資源管理系統。有了上述感性的認識后,下面這張圖描述了 Spark-On-Yarn 模式下在任務調度期間,ApplicationMaster、Driver 以及 Executor 內部模塊的交互過程:
Driver 初始化 SparkContext 過 程 中 , 會 分 別 初 始 化 DAGScheduler 、TaskScheduler、SchedulerBackend 以及 HeartbeatReceiver,并啟動 SchedulerBackend以及HeartbeatReceiver。SchedulerBackend 通過 ApplicationMaster 申請資源,并不斷從 TaskScheduler 中拿到合適的 Task 分發到 Executor 執行。HeartbeatReceiver 負責接收 Executor 的心跳信息,監控 Executor 的存活狀況,并通知到 TaskScheduler。
4.3 Spark Stage 級調度
Spark 的任務調度是從 DAG 切割開始,主要是由 DAGScheduler 來完成。當遇到一個 Action 操作后就會觸發一個 Job 的計算,并交給 DAGScheduler 來提交,下圖是涉及到 Job 提交的相關方法調用流程圖。
Job 由 最 終 的 RDD 和 Action 方 法 封 裝 而 成 , SparkContext 將 Job 交 給DAGScheduler 提交,它會根據 RDD 的血緣關系構成的 DAG 進行切分,將一個 Job劃分為若干 Stages,具體劃分策略是,由最終的 RDD 不斷通過依賴回溯判斷父依賴是否是寬依賴,即以 Shuffle 為界,劃分 Stage,窄依賴的 RDD 之間被劃分到同一個Stage 中,可以進行 pipeline 式的計算,如上圖紫色流程部分。劃分的 Stages 分兩類,一類叫做 ResultStage,為 DAG 最下游的 Stage,由 Action 方法決定,另一類叫做ShuffleMapStage,為下游 Stage 準備數據,下面看一個簡單的例子 WordCount。
Job 由 saveAsTextFile 觸發,該 Job 由 RDD-3 和 saveAsTextFile 方法組成,根據RDD 之間的依賴關系從 RDD-3 開始回溯搜索,直到沒有依賴的 RDD-0,在回溯搜索過程中,RDD-3 依賴 RDD-2,并且是寬依賴,所以在 RDD-2 和 RDD-3 之間劃分Stage,RDD-3 被劃到最后一個 Stage,即 ResultStage 中,RDD-2 依賴 RDD-1,RDD-1依賴 RDD-0,這些依賴都是窄依賴,所以將 RDD-0、RDD-1 和 RDD-2 劃分到同一個 Stage,即 ShuffleMapStage 中,實際執行的時候,數據記錄會一氣呵成地執行RDD-0 到 RDD-2 的轉化。不難看出,其本質上是一個深度優先搜索算法。
一個 Stage 是否被提交,需要判斷它的父 Stage 是否執行,只有在父 Stage 執行完畢才能提交當前 Stage,如果一個 Stage 沒有父 Stage,那么從該 Stage 開始提交。Stage 提交時會將 Task 信息(分區信息以及方法等)序列化并被打包成 TaskSet 交給TaskScheduler,一個 Partition 對應一個 Task,另一方面 TaskScheduler 會監控 Stage的運行狀態,只有 Executor 丟失或者 Task 由于 Fetch 失敗才需要重新提交失敗的Stage 以調度運行失敗的任務,其他類型的 Task 失敗會在 TaskScheduler 的調度過程中重試。
相對來說 DAGScheduler 做的事情較為簡單,僅僅是在 Stage 層面上劃分 DAG,提交 Stage 并監控相關狀態信息。TaskScheduler 則相對較為復雜,下面詳細闡述其細節。
4.4 Spark Task 級調度
Spark Task 的調度是由 TaskScheduler 來完成,由前文可知,DAGScheduler 將Stage 打 包 到 TaskSet 交 給 TaskScheduler, TaskScheduler 會 將 TaskSet 封裝為TaskSetManager 加入到調度隊列中,TaskSetManager 結構如下圖所示。
TaskSetManager 負責監控管理同一個 Stage 中的 Tasks,TaskScheduler 就是以TaskSetManager 為單元來調度任務。前面也提到,TaskScheduler 初始化后會啟動 SchedulerBackend,它負責跟外界打交道,接收 Executor 的注冊信息,并維護 Executor 的狀態,所以說 SchedulerBackend是管“糧食”的,同時它在啟動后會定期地去“詢問”TaskScheduler 有沒有任務要運行,也就是說, 它會定期地 “ 問 ”TaskScheduler“ 我有這么余量,你 要不要啊 ” ,TaskScheduler 在 SchedulerBackend“問”它的時候,會從調度隊列中按照指定的調度策略選擇 TaskSetManager 去調度運行,大致方法調用流程如下圖所示:
圖 3-7 中,將 TaskSetManager 加入 rootPool 調度池中之后,調用 SchedulerBackend的 riviveOffers 方法給 driverEndpoint 發送 ReviveOffer 消息;driverEndpoint 收到ReviveOffer 消息后調用 makeOffers 方法,過濾出活躍狀態的 Executor(這些 Executor都是任務啟動時反向注冊到 Driver 的 Executor),然后將 Executor 封裝成 WorkerOffer對 象 ; 準 備 好 計 算 資 源 (WorkerOffer ) 后 , taskScheduler 基 于 這 些 資 源 調 用resourceOffer 在 Executor 上分配 task。
4.4.1 調度策略
前 面 講 到 , TaskScheduler 會 先 把 DAGScheduler 給 過 來 的 TaskSet 封裝成TaskSetManager 扔到任務隊列里,然后再從任務隊列里按照一定的規則把它們取出來在 SchedulerBackend 給過來的 Executor 上運行。這個調度過程實際上還是比較粗粒度的,是面向 TaskSetManager 的。
TaskScheduler 是以樹的方式來管理任務隊列,樹中的節點類型為 Schdulable,葉子節點為 TaskSetManager,非葉子節點為 Pool,下圖是它們之間的繼承關系。
TaskScheduler 支持兩種調度策略,一種是 FIFO,也是默認的調度策略,另一種是 FAIR。在 TaskScheduler 初始化過程中會實例化 rootPool,表示樹的根節點,是Pool 類型。
1. FIFO 調度策略
如果是采用 FIFO 調度策略,則直接簡單地將 TaskSetManager 按照先來先到的方式入隊,出隊時直接拿出最先進隊的 TaskSetManager,其樹結構如下圖所示,TaskSetManager 保存在一個 FIFO 隊列中。
2. FAIR 調度策略
FAIR 調度策略的樹結構如下圖所示:
FAIR 模式中有一個 rootPool 和多個子 Pool,各個子 Pool 中存儲著所有待分配的 TaskSetMagager。
在 FAIR 模 式 中 , 需 要 先 對 子 Pool 進 行 排 序 , 再 對 子 Pool 里 面 的TaskSetMagager 進行排序,因為 Pool 和 TaskSetMagager 都繼承了 Schedulable 特質,因此使用相同的排序算法。
排序過程的比較是基于 Fair-share 來比較的,每個要排序的對象包含三個屬性:runningTasks 值(正在運行的 Task 數)、minShare 值、weight 值,比較時會綜合考量 runningTasks 值,minShare 值以及 weight 值。
注意,minShare、weight 的值均在公平調度配置文件 fairscheduler.xml 中被指定,調度池在構建階段會讀取此文件的相關配置。
1) 如果 A 對象的 runningTasks 大于它的 minShare,B 對象的 runningTasks 小于它的 minShare,那么 B 排在 A 前面;(runningTasks 比 minShare 小的先執行)
2) 如果 A、B 對象的 runningTasks 都小于它們的 minShare,那么就比較runningTasks 與 minShare 的比值(minShare 使用率),誰小誰排前面;(minShare使用率低的先執行)
3) 如果 A、B 對象的 runningTasks 都大于它們的 minShare,那么就比較runningTasks 與 weight 的比值(權重使用率),誰小誰排前面。(權重使用率低的先執行)
4) 如果上述比較均相等,則比較名字。整體上來說就是通過 minShare 和 weight 這兩個參數控制比較過程,可以做到讓 minShare 使用率和權重使用率少(實際運行 task 比例較少)的先運行。
FAIR 模式排序完成后,所有的 TaskSetManager 被放入一個 ArrayBuffer 里,之后依次被取出并發送給 Executor 執行。
從調度隊列中拿到 TaskSetManager 后,由于 TaskSetManager 封裝了一個 Stage的所有 Task,并負責管理調度這些 Task,那么接下來的工作就是 TaskSetManager按照一定的規則一個個取出 Task 給 TaskScheduler , TaskScheduler 再交給SchedulerBackend 去發到 Executor 上執行。
4.4.2 本地化調度
DAGScheduler 切割 Job,劃分 Stage, 通過調用 submitStage 來提交一個 Stage對應的 tasks,submitStage 會調用 submitMissingTasks,submitMissingTasks 確定每個需要計算的 task 的 preferredLocations,通過調用 getPreferrdeLocations()得到partition 的優先位置,由于一個 partition 對應一個 task,此 partition 的優先位置就是 task 的優先位置,對于要提交到TaskScheduler 的 TaskSet 中的每一個 task,該 task優先位置與其對應的 partition 對應的優先位置一致。
從調度隊列中拿到 TaskSetManager 后,那么接下來的工作就是 TaskSetManager 按照一定的規則一個個取出 task 給 TaskScheduler,TaskScheduler 再交給 SchedulerBackend 去發到Executor 上執行。前面也提到,TaskSetManager 封裝了一個 Stage 的所有 task,并負責管理調度這些 task。
根據每個 task 的優先位置,確定 task 的 Locality 級別,Locality 一共有五種,優先級由高到低順序:
在調度執行時,Spark 調度總是會盡量讓每個 task 以最高的本地性級別來啟動,當一個 task 以 X 本地性級別啟動,但是該本地性級別對應的所有節點都沒有空閑資源而啟動失敗,此時并不會馬上降低本地性級別啟動而是在某個時間長度內再次以X 本地性級別來啟動該 task,若超過限時時間則降級啟動,去嘗試下一個本地性級別,依次類推。
可以通過調大每個類別的最大容忍延遲時間,在等待階段對應的 Executor 可能就會有相應的資源去執行此 task,這就在在一定程度上提到了運行性能。
4.4.3 失敗重試與黑名單機制
除了選擇合適的 Task 調度運行外,還需要監控 Task 的執行狀態,前面也提到,與外部打交道的是 SchedulerBackend,Task 被提交到 Executor 啟動執行后,Executor會將執行狀態上報給 SchedulerBackend,SchedulerBackend 則告訴 TaskScheduler,TaskScheduler 找到該 Task 對應的 TaskSetManager,并通知到該 TaskSetManager,這樣 TaskSetManager 就知道 Task 的失敗與成功狀態,對于失敗的 Task,會記錄它失敗的次數,如果失敗次數還沒有超過最大重試次數,那么就把它放回待調度的 Task池子中,否則整個 Application 失敗。
在記錄 Task 失敗次數過程中,會記錄它上一次失敗所在的 Executor Id 和 Host,這樣下次再調度這個 Task 時,會使用黑名單機制,避免它被調度到上一次失敗的節點上,起到一定的容錯作用。黑名單記錄 Task 上一次失敗所在的 Executor Id 和 Host,以及其對應的“拉黑”時間,“拉黑”時間是指這段時間內不要再往這個節點上調度這個 Task 了。
參考鏈接:https://www.cnblogs.com/LXL616/p/11165826.html
總結
以上是生活随笔為你收集整理的Spark 任务调度机制详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: VMware vCenter Conve
- 下一篇: 【收藏】mydockfinder下载地址