提高spark任务稳定性1 - Blacklist 机制
文章目錄
- 背景
- 介紹
- 相關參數
- 實現細節
- TaskSetBlacklist
- BlacklistTracker
- 黑名單判斷的時機
- 如果所有的節點都被加入了黑名單?
- 結語
背景
一個 spark 應用的產生過程: 獲取需求 -> 編寫spark代碼 -> 測試通過 -> 扔上平臺調度。
往往應用會正常運行一段時間,突然有一天運行失敗,或是失敗了一次才運行成功。
從開發者的角度看,我的代碼沒問題,測試也通過了,之前一段都運行好好的,怎么突然就失敗了呢?為什么我重新調度又能正常運行了,是不是你們平臺不穩定?是什么導致了上述問題?
分布式集群中,特別是高負載的情況下,就會引發很多意想不到的問題,例如:
為什么 task 失敗后還會被 schedular 重新調度在原來的 node 或是 executor上?
數據本地性(spark會優先把task調度在有相應數據的節點上)導致。
是否只能聽天由命,每次失敗后重新調度? 如果任務有SLA的限制怎么辦?
介紹
spark 2.1 中增加了 blacklist 機制,當前(2.3.0)還是試驗性質的功能,黑名單機制允許你設置 task 在 executor / node 上失敗次數的閾值, 從而避免了一路走到黑的情況出現。 ?
相關參數
| spark.blacklist.enabled | false | 是否開啟黑名單機制 |
| spark.blacklist.timeout | 1h | 對于被加入 application 黑名單的 executor/節點 ,多長時間后無條件的移出黑名單以運行新任務 |
| spark.blacklist.task.maxTaskAttemptsPerExecutor | 1 | 對于同一個 task 在某個 executor 中的失敗重試閾值。達到閾值后,在執行這個 task 時,該 executor 將被加入黑名單 |
| spark.blacklist.task.maxTaskAttemptsPerNode | 2 | 對于同一個 task 在某個節點上的失敗重試閾值。達到閾值后,在執行這個 task 時,該節點將被加入黑名單 |
| spark.blacklist.stage.maxFailedTasksPerExecutor | 2 | 一個 stage 中,不同的 task 在同一個 executor 的失敗閾值。達到閾值后,在執行這個 stage 時該 executor 將會被加入黑名單 |
| spark.blacklist.stage.maxFailedExecutorsPerNode | 2 | 一個 stage 中,不同的 executor 加入黑名單的閾值。達到閾值后,在執行這個 stage 時該節點將會被加入黑名單 |
| spark.blacklist.application.maxFailedTasksPerExecutor | 2 | 在同一個 executor 中,不同的 task的失敗閾值 。達到閾值后,在整個 appliction 運行期間,該 executor 都會被加入黑名單,加入時間超過 spark.blacklist.timeout 后,自動從黑名單中移除。值得注意的是,如果開啟了 dynamic allocation,這些 executor 可能會由于空閑時間過長被回收。 |
| spark.blacklist.application.maxFailedExecutorsPerNode | 2 | 在一個節點中,不同 executor 加入 application 黑名單的閾值。達到這個閾值后,該節點會進入 application 黑名單,加入時間超過 spark.blacklist.timeout 后,自動從黑名單中移除。值得注意的是,如果開啟了 dynamic allocation,該節點上的 executor 可能會由于空閑時間過長被回收。 |
| spark.blacklist.killBlacklistedExecutors | false | 如果開啟該配置,spark 會自動關閉并重啟加入黑名單的 executor,如果整個節點都加入了黑名單,則該節點上的所有 executor 都會被關閉。 |
| spark.blacklist.application.fetchFailure.enabled | false | 如果開啟該配置,當發生 fetch failure時,立即將該 executor 加入到黑名單。要是開啟了 external shuffle service,整個節點都會被加入黑名單。 |
實現細節
因為是實驗性質的功能,所以代碼可能會隨時變動。
只貼出部分核心代碼。
TaskSetBlacklist
黑名單賬本:
//k:executor v:該executor上每個 task 的失敗情況(task失敗的次數和最近一次失敗時間) val execToFailures = new HashMap[String, ExecutorFailuresInTaskSet]()//k:節點,v:該節點上有失敗任務的 executor private val nodeToExecsWithFailures = new HashMap[String, HashSet[String]]() //k:節點, v:該節點上加入黑名單的 taskId private val nodeToBlacklistedTaskIndexes = new HashMap[String, HashSet[Int]]()//加入黑名單的 executor private val blacklistedExecs = new HashSet[String]() //加入黑名單的 node private val blacklistedNodes = new HashSet[String]() // 判斷 executor 是否加入了給定 task 的黑名單 def isExecutorBlacklistedForTask(executorId: String, index: Int): Boolean = {execToFailures.get(executorId).exists { execFailures =>execFailures.getNumTaskFailures(index) >= MAX_TASK_ATTEMPTS_PER_EXECUTOR} }//判斷 node 是否加入了給定 task 的黑名單 def isNodeBlacklistedForTask(node: String, index: Int): Boolean = {nodeToBlacklistedTaskIndexes.get(node).exists(_.contains(index)) }當有task失敗時,TaskSetManager 會調用更新黑名單的操作:
閾值參數:
- MAX_TASK_ATTEMPTS_PER_EXECUTOR:每個 executor 上最大的任務重試次數
- MAX_TASK_ATTEMPTS_PER_NODE:每個 node 上最大的任務重試次數
- MAX_FAILURES_PER_EXEC_STAGE:一個 stage 中,每個executor 上最多任務失敗次數
- MAX_FAILED_EXEC_PER_NODE_STAGE:一個 stage 中,每個節點上 executor 的最多失敗次數
BlacklistTracker
實現原理和TaskSetBlacklist,下文就不再貼出黑名單判斷,黑名單對象等代碼。
與 TaskSetBlacklist 不同的是,在一個 taskSet 完全成功之前,BlacklistTracker 無法獲取到任務失敗的情況。
核心代碼:
當一個 taskSet 執行成功時會調用以下代碼,流程如下:
- 將 executor 及其對應的到期時間加入到 application 的黑名單中,從executor失敗列表中移除該 executor,并更新 nextExpiryTime,用于下次啟動任務的時候判斷黑名單是否已到期
- 根據 spark.blacklist.killBlacklistedExecutors 判斷是否要殺死 executor
- 更新 node 上的 executor 失敗次數
- 如果一個節點上的 executor 的失敗次數大于等于閾值并且不在黑名單中
- 將 node 及其對應的到期時間加入到 application 的黑名單中
- 如果開啟了 spark.blacklist.killBlacklistedExecutors,則將此 node 上的所有 executor 殺死
- BLACKLIST_TIMEOUT_MILLIS:加入黑名單后的過期時間
- MAX_FAILURES_PER_EXEC:每個executor上最多的task失敗次數
- MAX_FAILED_EXEC_PER_NODE: 每個節點上加入黑名單的executor的最大數量
黑名單判斷的時機
一個 stage 提交的調用鏈:
TaskSchedulerImpl.submitTasks ->
CoarseGrainedSchedulerBackend.reviveOffers ->
CoarseGrainedSchedulerBackend.makeOffers ->
TaskSchedulerImpl.resourceOffers ->
TaskSchedulerImpl.resourceOfferSingleTaskSet ->
CoarseGrainedSchedulerBackend.launchTasks
appliaction 級別的黑名單在 TaskSchedulerImpl.resourceOffers 中完成判斷,stage/task 級別的黑名單在 TaskSchedulerImpl.resourceOfferSingleTaskSet 中完成判斷。
如果所有的節點都被加入了黑名單?
如果將task的重試次數設置的比較高,有可能會出現這個問題,這個時候。將會中斷這個 stage 的執行
TaskSchedulerImpl.resourceOffers
if (!launchedAnyTask) {taskSet.abortIfCompletelyBlacklisted(hostToExecutors) }結語
簡單的來說,對于一個 application ,提供了三種級別的黑名單可以用于 executor/node: task blacklist -> stage blacklist -> application blacklist
通過這些黑名單的設置可以避免由于 task 反復調度在有問題的 executor/node (壞盤,磁盤滿了,shuffle fetch 失敗,環境錯誤等)上,進而導致整個 Application 運行失敗的情況。
tip: BlacklistTracker.updateBlacklistForFetchFailure 在當前版本(2.3.0)存在BUG SPARK-24021,將在 2.3.1 進行修復。如果打開了 spark.blacklist.application.fetchFailure.enabled 配置將會受到影響。
總結
以上是生活随笔為你收集整理的提高spark任务稳定性1 - Blacklist 机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 120亿光年外发现大量水:为地球储量14
- 下一篇: 基带信号与载波信号