深入理解Spark 2.1 Core (三):任务调度器的原理与源码分析
提交Task
調用棧如下:
-
TaskSchedulerImpl.submitTasks
- CoarseGrainedSchedulerBackend.reviveOffers
-
CoarseGrainedSchedulerBackend.DriverEndpoint.makeOffers
- TaskSchedulerImpl.resourceOffers
- TaskSchedulerImpl.resourceOfferSingleTaskSet
- CoarseGrainedSchedulerBackend.DriverEndpoint.launchTasks
- TaskSchedulerImpl.resourceOffers
TaskSchedulerImpl.submitTasks
TaskSchedulerImpl是TaskScheduler的子類,重寫了submitTasks:
override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {//生成TaskSetManagerval manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])stageTaskSets(taskSet.stageAttemptId) = managerval conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}//將manager等信息放入調度器schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)}hasReceivedTask = true}//分配資源backend.reviveOffers()}- 1
CoarseGrainedSchedulerBackend.reviveOffers
下面我們來講講上一節代碼中最后一句:
backend.reviveOffers()我們先回過頭來看TaskScheduler是如何啟動的:
override def start() {backend.start()if (!isLocal && conf.getBoolean("spark.speculation", false)) {logInfo("Starting speculative execution thread")speculationScheduler.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryOrStopSparkContext(sc) {checkSpeculatableTasks()}}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)}}- 1
- 2
- 3
我們可以看到TaskScheduler.start會調用backend.start()。
backend是一個SchedulerBackend接口。SchedulerBackend接口由CoarseGrainedSchedulerBackend類實現。我們看下CoarseGrainedSchedulerBackend的start:
override def start() {val properties = new ArrayBuffer[(String, String)]for ((key, value) <- scheduler.sc.conf.getAll) {if (key.startsWith("spark.")) {properties += ((key, value))}}driverEndpoint = createDriverEndpointRef(properties)}- 1
- 2
我們可以看到CoarseGrainedSchedulerBackend的start會生成driverEndpoint,它是一個rpc的終端,一個RpcEndpoint接口,它由ThreadSafeRpcEndpoint接口實現,而ThreadSafeRpcEndpoint由CoarseGrainedSchedulerBackend的內部類DriverEndpoint實現。
CoarseGrainedSchedulerBackend的reviveOffers就是發送給這個rpc的終端ReviveOffers信號。
override def reviveOffers() {driverEndpoint.send(ReviveOffers)}- 1
CoarseGrainedSchedulerBackend.DriverEndpoint.makeOffers
DriverEndpoint有兩種發送信息的函數。一個是send,發送信息后不需要對方回復。一個是ask,發送信息后需要對方回復。
對應著,也有兩種接收信息的函數。一個是receive,接收后不回復對方:
- 1
- 2
另外一個是receiveAndReply,接收后回復對方:
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>if (executorDataMap.contains(executorId)) {executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))context.reply(true)} else {val executorAddress = if (executorRef.address != null) {executorRef.address} else {context.senderAddress}logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")addressToExecutorId(executorAddress) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)val data = new ExecutorData(executorRef, executorRef.address, hostname,cores, cores, logUrls)CoarseGrainedSchedulerBackend.this.synchronized {executorDataMap.put(executorId, data)if (currentExecutorIdCounter < executorId.toInt) {currentExecutorIdCounter = executorId.toInt}if (numPendingExecutors > 0) {numPendingExecutors -= 1logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")}}executorRef.send(RegisteredExecutor)context.reply(true)listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))makeOffers()}case StopDriver =>context.reply(true)stop()case StopExecutors =>logInfo("Asking each executor to shut down")for ((_, executorData) <- executorDataMap) {executorData.executorEndpoint.send(StopExecutor)}context.reply(true)case RemoveExecutor(executorId, reason) =>executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))removeExecutor(executorId, reason)context.reply(true)case RetrieveSparkAppConfig =>val reply = SparkAppConfig(sparkProperties,SparkEnv.get.securityManager.getIOEncryptionKey())context.reply(reply)}private def makeOffers() {val activeExecutors = executorDataMap.filterKeys(executorIsAlive)val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toIndexedSeqlaunchTasks(scheduler.resourceOffers(workOffers))}- 1
我們可以看到之前在CoarseGrainedSchedulerBackend的reviveOffers發送的ReviveOffers信號會在receive中被接收,從而調用makeOffers:
case ReviveOffers =>makeOffers()- 1
makeOffers做的工作為:
private def makeOffers() {//過濾掉被殺死的Executorval activeExecutors = executorDataMap.filterKeys(executorIsAlive)//根據activeExecutors生成workOffers,//即executor所能提供的資源信息。val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toIndexedSeq//scheduler.resourceOffers分配資源,//并launchTasks發送任務launchTasks(scheduler.resourceOffers(workOffers))}- 1
- 4
launchTasks主要的實現是向executor發送LaunchTask信號:
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))TaskSchedulerImpl.resourceOffers
下面我們來深入上節scheduler.resourceOffers分配資源的函數:
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {//標記每個活的節點并記錄它的主機名//并且追蹤是否有新的executor加入var newExecAvail = falsefor (o <- offers) {if (!hostToExecutors.contains(o.host)) {hostToExecutors(o.host) = new HashSet[String]()}if (!executorIdToRunningTaskIds.contains(o.executorId)) {hostToExecutors(o.host) += o.executorIdexecutorAdded(o.executorId, o.host)executorIdToHost(o.executorId) = o.hostexecutorIdToRunningTaskIds(o.executorId) = HashSet[Long]()newExecAvail = true}for (rack <- getRackForHost(o.host)) {hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host}}// 為了避免將Task集中分配到某些機器,隨機的打散它們val shuffledOffers = Random.shuffle(offers)// 建立每個worker的TaskDescription數組val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))//記錄各個worker的available Cpusval availableCpus = shuffledOffers.map(o => o.cores).toArray//獲取按照調度策略排序好的TaskSetManagerval sortedTaskSets = rootPool.getSortedTaskSetQueuefor (taskSet <- sortedTaskSets) {logDebug("parentName: %s, name: %s, runningTasks: %s".format(taskSet.parent.name, taskSet.name, taskSet.runningTasks))//如果有新的executor加入//則需要從新計算TaskSetManager的就近原則if (newExecAvail) {taskSet.executorAdded()}}// 得到調度序列中的每個TaskSet,// 然后按節點的locality級別增序分配資源// Locality優先序列為: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANYfor (taskSet <- sortedTaskSets) {var launchedAnyTask = falsevar launchedTaskAtCurrentMaxLocality = false//按照就近原則分配for (currentMaxLocality <- taskSet.myLocalityLevels) {do {//resourceOfferSingleTaskSet為單個TaskSet分配資源,//若該LocalityLevel的節點下不能再為之分配資源了,//則返回falselaunchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)launchedAnyTask |= launchedTaskAtCurrentMaxLocality} while (launchedTaskAtCurrentMaxLocality)}if (!launchedAnyTask) {taskSet.abortIfCompletelyBlacklisted(hostToExecutors)}}if (tasks.size > 0) {hasLaunchedTask = true}return tasks}- 1
- 63
這里涉及到兩個排序,首先調度器會對TaskSet進行排序:
val sortedTaskSets = rootPool.getSortedTaskSetQueue取出每個TaskSet后,我們又會根據從近到遠的Locality Level 的來對各個Task進行資源的分配。
TaskSchedulerImpl.resourceOfferSingleTaskSet
接下來我們來看下為單個TaskSet分配資源的具體實現:
private def resourceOfferSingleTaskSet(taskSet: TaskSetManager,maxLocality: TaskLocality,shuffledOffers: Seq[WorkerOffer],availableCpus: Array[Int],tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {var launchedTask = false//遍歷各個executorfor (i <- 0 until shuffledOffers.size) {val execId = shuffledOffers(i).executorIdval host = shuffledOffers(i).hostif (availableCpus(i) >= CPUS_PER_TASK) {try {//獲取taskSet中,相對于該execId, host所能接收的最大距離maxLocality的task//maxLocality的值在TaskSchedulerImpl.resourceOffers中從近到遠的遍歷for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {tasks(i) += taskval tid = task.taskIdtaskIdToTaskSetManager(tid) = taskSettaskIdToExecutorId(tid) = execIdexecutorIdToRunningTaskIds(execId).add(tid)availableCpus(i) -= CPUS_PER_TASKassert(availableCpus(i) >= 0)launchedTask = true}} catch {case e: TaskNotSerializableException =>logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")return launchedTask}}}return launchedTask}- 1
- 17
CoarseGrainedSchedulerBackend.DriverEndpoint.launchTasks
我們回到CoarseGrainedSchedulerBackend.DriverEndpoint.makeOffers,看最后一步,發送任務的函數launchTasks:
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {for (task <- tasks.flatten) {val serializedTask = ser.serialize(task)//若序列話Task大小達到Rpc限制,//則停止if (serializedTask.limit >= maxRpcMessageSize) {scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +"spark.rpc.message.maxSize (%d bytes). Consider increasing " +"spark.rpc.message.maxSize or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)taskSetMgr.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {// 減少改task所對應的executor信息的core數量val executorData = executorDataMap(task.executorId)executorData.freeCores -= scheduler.CPUS_PER_TASKlogDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +s"${executorData.executorHost}.")//向executorEndpoint 發送LaunchTask 信號executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}}- 1
- 2
executorEndpoint接收到LaunchTask信號(包含SerializableBuffer(serializedTask) )后,會開始執行任務。
調度任務
Pool.getSortedTaskSetQueue
上一章我們講到TaskSchedulerImpl.resourceOffers中會調用:
val sortedTaskSets = rootPool.getSortedTaskSetQueue獲取按照調度策略排序好的TaskSetManager。接下來我們深入講解這行代碼。
rootPool是一個Pool對象。Pool定義為:一個可調度的實體,代表著Pool的集合或者TaskSet的集合,即Schedulable為一個接口,由Pool類和TaskSetManager類實現
getSortedTaskSetQueue:
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {//生成TaskSetManager數組var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]//對調度實體進行排序val sortedSchedulableQueue =schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)for (schedulable <- sortedSchedulableQueue) {//從調度實體中取得TaskSetManager數組sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue}sortedTaskSetQueue}- 1
其中調度算法taskSetSchedulingAlgorithm,會在Pool被生成時候根據SchedulingMode被設定為FairSchedulingAlgorithm或者FIFOSchedulingAlgorithm
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {schedulingMode match {case SchedulingMode.FAIR =>new FairSchedulingAlgorithm()case SchedulingMode.FIFO =>new FIFOSchedulingAlgorithm()case _ =>val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."throw new IllegalArgumentException(msg)}}- 1
- 7
TaskSchedulerImpl.initialize
Pool被生成是什么時候被生成的呢?我們來看下TaskSchedulerImpl的初始化就能發現:
def initialize(backend: SchedulerBackend) {this.backend = backend// 創建一個名字為空的rootPoolrootPool = new Pool("", schedulingMode, 0, 0)schedulableBuilder = {schedulingMode match {//TaskSchedulerImpl在初始化時,//根據SchedulingMode來創建不同的schedulableBuildercase SchedulingMode.FIFO =>new FIFOSchedulableBuilder(rootPool)case SchedulingMode.FAIR =>new FairSchedulableBuilder(rootPool, conf)case _ =>throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")}}schedulableBuilder.buildPools()}- 1
- 15
FIFO 調度
FIFOSchedulableBuilder.addTaskSetManager
接下來,我們回過頭看TaskSchedulerImpl.submitTasks中的schedulableBuilder.addTaskSetManager。
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)我們深入講一下addTaskSetManager:
schedulableBuilder是一個SchedulableBuilder接口,SchedulableBuilder接口由兩個類FIFOSchedulableBuilder和FairSchedulableBuilder實現。
我們這里先講解FIFOSchedulableBuilder,FIFOSchedulableBuilder的addTaskSetManager:
override def addTaskSetManager(manager: Schedulable, properties: Properties) {rootPool.addSchedulable(manager)}- 1
再看addSchedulable:
override def addSchedulable(schedulable: Schedulable) {require(schedulable != null)schedulableQueue.add(schedulable)schedulableNameToSchedulable.put(schedulable.name, schedulable)schedulable.parent = this}- 1
實際上是將manager加入到schedulableQueue(這里是FIFO的queue),將manger的name加入到一個名為schedulableNameToSchedulable的 ConcurrentHashMap[String, Schedulable]中,并將manager的parent設置為rootPool。
FIFOSchedulableBuilder.buildPools()
上述后一行代碼:
schedulableBuilder.buildPools()- 1
buildPools會因不同的調度器而異。如果是FIFOSchedulableBuilder,那么就為空:
override def buildPools() {// nothing}這是因為rootPool里面不包含其他的Pool,而是像上述所講的直接將manager的parent設置為rootPool。實際上,這是一種2層的樹形結構,第0層為rootPool,第二層葉子節點為各個manager:
FIFOSchedulingAlgorithm
一切就緒后,我們可以來看FIFO的核心調度算法了:
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {//priority實際上是 Job IDval priority1 = s1.priorityval priority2 = s2.priority//先比較Job IDvar res = math.signum(priority1 - priority2)if (res == 0) {//若Job ID相同,//則比較 Stage IDval stageId1 = s1.stageIdval stageId2 = s2.stageIdres = math.signum(stageId1 - stageId2)}res < 0} }FAIR 調度
FairSchedulableBuilder.addTaskSetManager
FairSchedulableBuilder的addTaskSetManager會比FIFOSchedulableBuilder的復雜:
override def addTaskSetManager(manager: Schedulable, properties: Properties) {//先生成一個默認的parentPoolvar poolName = DEFAULT_POOL_NAMEvar parentPool = rootPool.getSchedulableByName(poolName)//若有配置信息,//則根據配置信息得到poolNameif (properties != null) {//FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)parentPool = rootPool.getSchedulableByName(poolName)//若rootPool中沒有這個poolif (parentPool == null) {//我們會根據用戶在app上的配置生成新的pool,//而不是根據xml 文件parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)rootPool.addSchedulable(parentPool)logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))}}//將這個manager加入到這個poolparentPool.addSchedulable(manager)logInfo("Added task set " + manager.name + " tasks to pool " + poolName)} }- 3
FairSchedulableBuilder.buildPools()
FairSchedulableBuilder.buildPools需要根據$SPARK_HOME/conf/fairscheduler.xml文件來構建調度樹。配置文件大致如下:
<allocations><pool name="production"><schedulingMode>FAIR</schedulingMode><weight>1</weight><minShare>2</minShare></pool><pool name="test"><schedulingMode>FIFO</schedulingMode><weight>2</weight><minShare>3</minShare></pool> </allocations>- 1
- 5
buildFairSchedulerPool:
private def buildFairSchedulerPool(is: InputStream) {//加載xml 文件val xml = XML.load(is)//遍歷for (poolNode <- (xml \\ POOLS_PROPERTY)) {val poolName = (poolNode \ POOL_NAME_PROPERTY).textvar schedulingMode = DEFAULT_SCHEDULING_MODEvar minShare = DEFAULT_MINIMUM_SHAREvar weight = DEFAULT_WEIGHTval xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).textif (xmlSchedulingMode != "") {try {schedulingMode = SchedulingMode.withName(xmlSchedulingMode)} catch {case e: NoSuchElementException =>logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " +s"using the default schedulingMode: $schedulingMode")}}val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).textif (xmlMinShare != "") {minShare = xmlMinShare.toInt}val xmlWeight = (poolNode \ WEIGHT_PROPERTY).textif (xmlWeight != "") {weight = xmlWeight.toInt}//根據xml的配置,//最終生成一個新的Poolval pool = new Pool(poolName, schedulingMode, minShare, weight)//將這個Pool加入到rootPool中rootPool.addSchedulable(pool)logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(poolName, schedulingMode, minShare, weight))}}- 1
- 26
可想而知,FAIR 調度并不是簡單的公平調度。我們會先根據xml配置文件生成很多pool加入rootPool中,而每個app會根據配置“Spark.scheduler.pool”的poolName,將TaskSetManager加入到某個pool中。其實,rootPool還會對Pool也進程一次調度。
所以,在FAIR調度策略中包含了兩層調度。第一層的rootPool內的多個Pool,第二層是Pool內的多個TaskSetManager。fairscheduler.xml文件中, weight(任務權重)和minShare(最小任務數)是來設置第一層調度的,該調度使用的是FAIR算法。而第二層調度由schedulingMode設置。
但對于Standalone模式下的單個app,FAIR調度的多個Pool顯得雞肋,因為app只能選擇一個Pool。但是我們可以在代碼級別硬編碼的去分配:
sc.setLocalProperty("spark.scheduler.pool", "Pool_1")FAIRSchedulingAlgorithm
接下來,我們就來講解FAIR算法:
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val minShare1 = s1.minShareval minShare2 = s2.minShareval runningTasks1 = s1.runningTasksval runningTasks2 = s2.runningTasks//若s1運行的任務數小于s1的最小任務數val s1Needy = runningTasks1 < minShare1//若s2運行的任務數小于s2的最小任務數val s2Needy = runningTasks2 < minShare2//minShareRatio = 運行的任務數/最小任務數 //代表著負載程度,越小,負載越小val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)//taskToWeightRatio = 運行的任務數/權重//權重越大,越優先//即taskToWeightRatio 越小 越優先val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDoubleval taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDoublevar compare = 0//若s1運行的任務小于s1的最小任務數,而s2不然//則s1優先if (s1Needy && !s2Needy) {return true} //若s2運行的任務小于s2的最小任務數,而s1不然//則s2優先else if (!s1Needy && s2Needy) {return false} //若s1 s2 運行的任務都小于自己的的最小任務數//比較minShareRatio,哪個小,哪個優先else if (s1Needy && s2Needy) {compare = minShareRatio1.compareTo(minShareRatio2)} //若s1 s2 運行的任務都不小于自己的的最小任務數//比較taskToWeightRatio,哪個小,哪個優先else {compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)}if (compare < 0) {true} else if (compare > 0) {false} else {s1.name < s2.name}} }至此,TaskScheduler在發送任務給executor前的工作就全部完成了。
總結
以上是生活随笔為你收集整理的深入理解Spark 2.1 Core (三):任务调度器的原理与源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解Spark 2.1 Core (
- 下一篇: 深入理解Spark 2.1 Core (