深入理解Spark 2.1 Core (四):运算结果处理和容错的原理与源码分析
生活随笔
收集整理的這篇文章主要介紹了
深入理解Spark 2.1 Core (四):运算结果处理和容错的原理与源码分析
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
這篇博文,我們來看看當executor計算完任務后,
Spark是如何處理獲取的計算結果與容錯的。
調用棧如下:
- TaskSchedulerImpl.statusUpdate
- TaskResultGetter.enqueueSuccessfulTask
- TaskSchedulerImpl.handleSuccessfulTask
- TaskSetManager.handleSuccessfulTask
- DAGScheduler.taskEnded
- DAGSchedulerEventProcessLoop.doOnReceive
- DAGScheduler.handleTaskCompletion
- DAGSchedulerEventProcessLoop.doOnReceive
- DAGScheduler.taskEnded
- TaskSetManager.handleSuccessfulTask
- TaskSchedulerImpl.handleSuccessfulTask
- TaskResultGetter.enqueueFailedTask
- TaskSchedulerImpl.handleFailedTask
- TaskSetManager.handleFailedTask
- DAGScheduler.taskEnded
- DAGSchedulerEventProcessLoop.doOnReceive
- DAGScheduler.handleTaskCompletion
- DAGSchedulerEventProcessLoop.doOnReceive
- DAGScheduler.taskEnded
- TaskSetManager.handleFailedTask
- TaskSchedulerImpl.handleFailedTask
- TaskResultGetter.enqueueSuccessfulTask
TaskSchedulerImpl.statusUpdate
TaskRunner將任務的執行結果發送給DriverEndPoint,DriverEndPoint會轉給TaskSchedulerImpl的statusUpdate:
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {var failedExecutor: Option[String] = Nonevar reason: Option[ExecutorLossReason] = Nonesynchronized {try {taskIdToTaskSetManager.get(tid) match {case Some(taskSet) =>//這只針對Mesos調度模式if (state == TaskState.LOST) {val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException("taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))if (executorIdToRunningTaskIds.contains(execId)) {reason = Some(SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))removeExecutor(execId, reason.get)failedExecutor = Some(execId)}}//FINISHED KILLED LOST 都屬于 isFinishedif (TaskState.isFinished(state)) {cleanupTaskState(tid)taskSet.removeRunningTask(tid)//若FINISHED調用taskResultGetter.enqueueSuccessfulTask,//否則調用taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)if (state == TaskState.FINISHED) {taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)}}case None =>logError(("Ignoring update with state %s for TID %s because its task set is gone (this is " +"likely the result of receiving duplicate task finished status updates) or its " +"executor has been marked as failed.").format(state, tid))}} catch {case e: Exception => logError("Exception in statusUpdate", e)}}if (failedExecutor.isDefined) {assert(reason.isDefined)dagScheduler.executorLost(failedExecutor.get, reason.get)backend.reviveOffers()}}處理執行成功的結果
我們先來看下處理執行成功的結果的運行機制:
TaskResultGetter.enqueueSuccessfulTask
def enqueueSuccessfulTask(taskSetManager: TaskSetManager,tid: Long,serializedData: ByteBuffer): Unit = {//通過線程池來獲取結果getTaskResultExecutor.execute(new Runnable {override def run(): Unit = Utils.logUncaughtExceptions {try {val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {//可以直接獲取到的結果case directResult: DirectTaskResult[_] =>//判斷大小是否符合要求if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {return}directResult.value(taskResultSerializer.get())(directResult, serializedData.limit())//若不能直接獲取到結果case IndirectTaskResult(blockId, size) =>if (!taskSetManager.canFetchMoreResults(size)) {// 判斷大小是否符合要求,//若不符合則遠程的刪除計算結果sparkEnv.blockManager.master.removeBlock(blockId)return}logDebug("Fetching indirect task result for TID %s".format(tid))scheduler.handleTaskGettingResult(taskSetManager, tid)val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)//從遠程獲取計算結果if (!serializedTaskResult.isDefined) {//若在任務執行結束后與我們去獲取結果之間機器出現故障了//或者block manager 不得不刷新結果了//那么我們將不能夠獲取到結果scheduler.handleFailedTask(taskSetManager, tid, TaskState.FINISHED, TaskResultLost)return}val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](serializedTaskResult.get.toByteBuffer)// 反序列化deserializedResult.value(taskResultSerializer.get())sparkEnv.blockManager.master.removeBlock(blockId)(deserializedResult, size)}result.accumUpdates = result.accumUpdates.map { a =>if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {val acc = a.asInstanceOf[LongAccumulator]assert(acc.sum == 0L, "task result size should not have been set on the executors")acc.setValue(size.toLong)acc} else {a}}//處理獲取到的計算結果scheduler.handleSuccessfulTask(taskSetManager, tid, result)} catch {case cnf: ClassNotFoundException =>val loader = Thread.currentThread.getContextClassLoadertaskSetManager.abort("ClassNotFound with classloader: " + loader)case NonFatal(ex) =>logError("Exception while getting task result", ex)taskSetManager.abort("Exception while getting task result: %s".format(ex))}}})}- 4
TaskSchedulerImpl.handleSuccessfulTask
調用taskSetManager.handleSuccessfulTask
def handleSuccessfulTask(taskSetManager: TaskSetManager,tid: Long,taskResult: DirectTaskResult[_]): Unit = synchronized {taskSetManager.handleSuccessfulTask(tid, taskResult)}- 1
TaskSetManager.handleSuccessfulTask
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {val info = taskInfos(tid)val index = info.indexinfo.markFinished(TaskState.FINISHED)//從RunningTask中移除該taskremoveRunningTask(tid)//通知dagScheduler該task完成sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)//殺死所有其他與之相同的task的嘗試for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true)}if (!successful(index)) {//計數tasksSuccessful += 1logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" +s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" +s" ($tasksSuccessful/$numTasks)")//若果有所task成功了,//那么標記successful,并且停止successful(index) = trueif (tasksSuccessful == numTasks) {isZombie = true}} else {logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +" because task " + index + " has already completed successfully")}maybeFinishTaskSet()}- 1
DAGScheduler.taskEnded
我們再深入看下是如何通知dagScheduler該task完成的:
def taskEnded(task: Task[_],reason: TaskEndReason,result: Any,accumUpdates: Seq[AccumulatorV2[_, _]],taskInfo: TaskInfo): Unit = {//發送CompletionEvent信號eventProcessLoop.post(CompletionEvent(task, reason, result, accumUpdates, taskInfo))}- 1
DAGSchedulerEventProcessLoop.doOnReceive
上一篇博文講過,DAGSchedulerEventProcessLoop的doOnReceive會對信號進行監聽:
case completion: CompletionEvent =>dagScheduler.handleTaskCompletion(completion)- 1
DAGScheduler.handleTaskCompletion
我們來看下DAGScheduler.handleTaskCompletion部分核心代碼:
***//根據stageId 得到stageval stage = stageIdToStage(task.stageId)//這里的event就是completionevent.reason match {//這里只看成功的流程case Success =>//將這個task 從stage等待處理分區中刪去stage.pendingPartitions -= task.partitionIdtask match {//若是最后一個Stage的taskcase rt: ResultTask[_, _] =>//將stage 轉為 ResultStageval resultStage = stage.asInstanceOf[ResultStage]resultStage.activeJob match {//獲取這Stage的jobcase Some(job) =>if (!job.finished(rt.outputId)) {updateAccumulators(event)//標記狀態job.finished(rt.outputId) = true//計數job.numFinished += 1// 若Job的所有partition都完成了,//移除這個Jobif (job.numFinished == job.numPartitions) {markStageAsFinished(resultStage)cleanupStateForJobAndIndependentStages(job)listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))}//通知 JobWaiter 有任務成功//但 taskSucceeded 會運行用戶自定義的代碼//因此可能拋出異常 try {job.listener.taskSucceeded(rt.outputId, event.result)} catch {case e: Exception =>// 標記為失敗job.listener.jobFailed(new SparkDriverExecutionException(e))}}case None =>logInfo("Ignoring result from " + rt + " because its job has finished")}//若不是最后一個Stage的Taskcase smt: ShuffleMapTask =>val shuffleStage = stage.asInstanceOf[ShuffleMapStage]updateAccumulators(event)val status = event.result.asInstanceOf[MapStatus]val execId = status.location.executorIdlogDebug("ShuffleMapTask finished on " + execId)if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")} else {//將Task的partitionId和status//追加到OutputLocshuffleStage.addOutputLoc(smt.partitionId, status)}if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {markStageAsFinished(shuffleStage)logInfo("looking for newly runnable stages")logInfo("running: " + runningStages)logInfo("waiting: " + waitingStages)logInfo("failed: " + failedStages)//將outputLoc信息注冊到mapOutputTracker//上篇博文中有提到://首先ShuffleMapTask的計算結果(其實是計算結果數據所在的位置、大小等元數據信息)都會傳給Driver的mapOutputTracker。// 所以 DAGScheduler.newOrUsedShuffleStage需要先判斷Stage是否已經被計算過///若計算過,DAGScheduler.newOrUsedShuffleStage則把結果復制到新創建的stage//如果沒計算過,DAGScheduler.newOrUsedShuffleStage就向注冊mapOutputTracker Stage,為存儲元數據占位mapOutputTracker.registerMapOutputs(shuffleStage.shuffleDep.shuffleId,shuffleStage.outputLocInMapOutputTrackerFormat(),changeEpoch = true)clearCacheLocs()if (!shuffleStage.isAvailable) {//若Stage不可用(一些任務失敗),則從新提交Stage logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +") because some of its tasks had failed: " +shuffleStage.findMissingPartitions().mkString(", "))submitStage(shuffleStage)} else {// 若該Stage的所有分區都完成了if (shuffleStage.mapStageJobs.nonEmpty) {val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)//將各個Task的標記為Finishedfor (job <- shuffleStage.mapStageJobs) {markMapStageJobAsFinished(job, stats)}}//提交該Stage的正在等在的Child StagessubmitWaitingChildStages(shuffleStage)}}}***- 1
- 2
- 10
處理執行失敗的結果
TaskResultGetter.enqueueFailedTask
下面,我們回歸頭來看如何處理失敗的結果。
def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState,serializedData: ByteBuffer) {var reason : TaskFailedReason = UnknownReasontry {//通過線程池來處理結果getTaskResultExecutor.execute(new Runnable {override def run(): Unit = Utils.logUncaughtExceptions {val loader = Utils.getContextOrSparkClassLoadertry {//若序列化數據,即TaskFailedReason,存在且長度大于0//則反序列化獲取它if (serializedData != null && serializedData.limit() > 0) {reason = serializer.get().deserialize[TaskFailedReason](serializedData, loader)}} catch {//若是ClassNotFoundException,//打印logcase cnd: ClassNotFoundException =>logError("Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)//若其他異常,//不進行操作case ex: Exception => }//處理失敗的任務scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)}})} catch {case e: RejectedExecutionException if sparkEnv.isStopped =>}}TaskSchedulerImpl.handleFailedTask
def handleFailedTask(taskSetManager: TaskSetManager,tid: Long,taskState: TaskState,reason: TaskFailedReason): Unit = synchronized {//處理失敗任務taskSetManager.handleFailedTask(tid, taskState, reason)if (!taskSetManager.isZombie && taskState != TaskState.KILLED) {//handleFailedTask會將失敗任務放入待運行的隊列等待下一次調度//所以這里開始新的一輪調度backend.reviveOffers()}}TaskSetManager.handleFailedTask
我們來看下handleFailedTask核心代碼:
***//調用dagScheduler處理失敗任務sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)if (successful(index)) {logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " +"but another instance of the task has already succeeded, " +"so not re-queuing the task to be re-executed.")} else {//將這個任務重新加入到等待隊列中addPendingTask(index)}if (!isZombie && reason.countTowardsTaskFailures) {taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(info.host, info.executorId, index))assert (null != failureReason)//計數 這個任務的重試次數numFailures(index) += 1//若大于等于最大重試次數,默認為4,//則取消這個任務if (numFailures(index) >= maxTaskFailures) {logError("Task %d in stage %s failed %d times; aborting job".format(index, taskSet.id, maxTaskFailures))abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:".format(index, taskSet.id, maxTaskFailures, failureReason), failureException)return}}maybeFinishTaskSet()}- 33
DAGScheduler.handleTaskCompletion
與處理成功結果的過程相同,接下來也會調用DAGScheduler.taskEnded。DAGSchedulerEventProcessLoop的doOnReceive接收CompletionEvent信號,調用dagScheduler.handleTaskCompletion(completion)
我們來看下DAGScheduler.handleTaskCompletion 處理失敗任務部分的核心代碼:
//重新提交任務case Resubmitted =>logInfo("Resubmitted " + task + ", so marking it as still running")//把任務加入的等待隊列stage.pendingPartitions += task.partitionId//獲取結果失敗case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>val failedStage = stageIdToStage(task.stageId)val mapStage = shuffleIdToMapStage(shuffleId)//若失敗的嘗試ID 不是 stage嘗試ID,//則忽略這個失敗if (failedStage.latestInfo.attemptId != task.stageAttemptId) {logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +s"(attempt ID ${failedStage.latestInfo.attemptId}) running")} else {//若失敗的Stage還在運行隊列,//標記這個Stage完成if (runningStages.contains(failedStage)) {logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +s"due to a fetch failure from $mapStage (${mapStage.name})")markStageAsFinished(failedStage, Some(failureMessage))} else {logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " +s"longer running")}//若不允許重試,//則停止這個Stageif (disallowStageRetryForTest) {abortStage(failedStage, "Fetch failure will not retry stage due to testing config",None)} //若達到最大重試次數,//則停止這個Stageelse if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {abortStage(failedStage, s"$failedStage (${failedStage.name}) " +s"has failed the maximum allowable number of " +s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +s"Most recent failure reason: ${failureMessage}", None)} else {if (failedStages.isEmpty) {//若失敗的Stage中,沒有個task完成了,//則重新提交Stage。//若果有完成的task的話,我們不能重新提交Stage,//因為有些task已經被調度過了。//task級別的重新提交是在TaskSetManager.handleFailedTask進行的logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +s"$failedStage (${failedStage.name}) due to fetch failure")messageScheduler.schedule(new Runnable {override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)}failedStages += failedStagefailedStages += mapStage}// 移除OutputLoc中的數據// 取消注冊mapOutputTrackerif (mapId != -1) {mapStage.removeOutputLoc(mapId, bmAddress)mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)}//當有executor上發生多次獲取結果失敗,//則標記這個executor丟失if (bmAddress != null) {handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))}}//拒絕處理case commitDenied: TaskCommitDenied =>// 不做任何事,//讓 TaskScheduler 來決定如何處理//異常case exceptionFailure: ExceptionFailure =>// 更新accumulatorupdateAccumulators(event)//task結果丟失case TaskResultLost =>// 不做任何事,// 讓 TaskScheduler 處理這些錯誤和重新提交任務// executor 丟失// 任務被殺死// 未知錯誤case _: ExecutorLostFailure | TaskKilled | UnknownReason =>// 不做任何事,// 若這task不斷的錯誤,// TaskScheduler 會停止 job總結
以上是生活随笔為你收集整理的深入理解Spark 2.1 Core (四):运算结果处理和容错的原理与源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解Spark 2.1 Core (
- 下一篇: 深入理解Spark 2.1 Core (