spark任务优先级设置:spark.yarn.priority
Spark對于Yarn priority的支持源碼詳解
Yarn的調度器
在Yarn中,提供了Capacity scheduler和Fair scheduler,它們都支持priority的。這里我們簡單介紹下概念,不做過多的描述。
Capacity Scheduler
Capacity scheuler設計的目的是為了讓Hadoop上的applications可以以一個多租戶的形式下分享資源運行,這種調度器一般應用在有一個較大的公有集群,按照隊列來分配資源給特定的用戶組。我們可以簡單的通過配置就可以設定隊列在cluster中資源或者用戶在隊列中的的使用限制(最低保障和最高上限等),當一個隊列的資源空余的時候,Yarn可以暫時利用剩余的資源分享給其他需要的隊列。
Fair Scheduler
Fair scheduler就如同它的名字一樣,他在分配資源的時候,是秉承著公平原則,applications在一段時間內分配到的平均資源會趨于相等。如果一個只有一個application在集群上運行的時候,資源都可供這一個application使用。如果有另外的application被提交到集群上時,空閑的資源就會被分配給新提交的application上,這樣最后每個運行的application都會分配到相等的資源。
Priority在Yarn中的使用
Capacity Scheduler
Capacity scheduler支持對應用的priority的設置。Yarn的priority是整數型,更大的數就代表更高的優先級,這個功能只支持在FIFO(默認)的策略下進行。priority可以針對cluster或者queue級別進行設置。
cluster level: 如果你的application設置的priority超過了cluster最大值,那按照最大的cluster priority對待。
queue level: 隊列有一個默認的priority值,queue下的applications如果沒有設置具體的priority會被設置成該默認值。如果application變更了queue,它的priority值不會更改。
Fair Scheduler
Fair scheduler支持把一個正在運行的application遷移到另一個priority不同的queue里,這樣這個application獲取資源的權重就會跟著queue變化。被遷移得application的資源就會算在新的queue上,如果所需資源超過了新的queue的最大限制,遷移就會失敗。
SparkOnYarn支持priority
如何為Spark app設置priority
只需要再SparkConf里進行設置即可,遵循Yarn對于priority的定義,數值越大,priority越高,在同一時間提交的job會有更高的優先級獲取資源:
val sparkConf = new SparkConf().set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup").set(MAX_APP_ATTEMPTS, 42).set("spark.app.name", "foo-test-app").set(QUEUE_NAME, "staging-queue").set(APPLICATION_PRIORITY, 1)Spark源碼
Spark目前已經有了對于Yarn的priority官方支持,這里給出一個在Jira上closed的SPARK-10879。這個Jira是很早以前的一個版本,diff僅供參考,用于讓大家理解Spark on Yarn如何設置priority的基本流程。
其實需要支持priority很簡單,一是需要在submit的時候提供priority參數的設置,官方是放在了SparkConf里去設置;另一個是需要在createApplicationSubmissionContext的時候,調用setPriority將priority傳入到Yarn。這里給出關鍵的地方的代碼:
/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
private[spark] val APPLICATION_PRIORITY = ConfigBuilder("spark.yarn.priority").doc("Application priority for YARN to define pending applications ordering policy, those" +" with higher value have a better opportunity to be activated. Currently, YARN only" +" supports application priority when using FIFO ordering policy.").intConf.createOptional/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala中對createApplicationSubmissionContext函數的修改:
/*** Set up the context for submitting our ApplicationMaster.* This uses the YarnClientApplication not available in the Yarn alpha API.*/def createApplicationSubmissionContext(newApp: YarnClientApplication,containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {val componentName = if (isClusterMode) {config.YARN_DRIVER_RESOURCE_TYPES_PREFIX} else {config.YARN_AM_RESOURCE_TYPES_PREFIX}val yarnAMResources = getYarnResourcesAndAmounts(sparkConf, componentName)val amResources = yarnAMResources ++getYarnResourcesFromSparkResources(SPARK_DRIVER_PREFIX, sparkConf)logDebug(s"AM resources: $amResources")val appContext = newApp.getApplicationSubmissionContextappContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))appContext.setQueue(sparkConf.get(QUEUE_NAME))appContext.setAMContainerSpec(containerContext)appContext.setApplicationType("SPARK")sparkConf.get(APPLICATION_TAGS).foreach { tags =>appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava))}sparkConf.get(MAX_APP_ATTEMPTS) match {case Some(v) => appContext.setMaxAppAttempts(v)case None => logDebug(s"${MAX_APP_ATTEMPTS.key} is not set. " +"Cluster's default value will be used.")}sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>appContext.setAttemptFailuresValidityInterval(interval)}val capability = Records.newRecord(classOf[Resource])capability.setMemory(amMemory + amMemoryOverhead)capability.setVirtualCores(amCores)if (amResources.nonEmpty) {ResourceRequestHelper.setResourceRequests(amResources, capability)}logDebug(s"Created resource capability for AM request: $capability")sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {case Some(expr) =>val amRequest = Records.newRecord(classOf[ResourceRequest])amRequest.setResourceName(ResourceRequest.ANY)amRequest.setPriority(Priority.newInstance(0))amRequest.setCapability(capability)amRequest.setNumContainers(1)amRequest.setNodeLabelExpression(expr)appContext.setAMContainerResourceRequest(amRequest)case None =>appContext.setResource(capability)}sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern =>try {val logAggregationContext = Records.newRecord(classOf[LogAggregationContext])logAggregationContext.setRolledLogsIncludePattern(includePattern)sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>logAggregationContext.setRolledLogsExcludePattern(excludePattern)}appContext.setLogAggregationContext(logAggregationContext)} catch {case NonFatal(e) =>logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +"does not support it", e)}}appContext.setUnmanagedAM(isClientUnmanagedAMEnabled)sparkConf.get(APPLICATION_PRIORITY).foreach { appPriority =>appContext.setPriority(Priority.newInstance(appPriority))}appContext}總結
以上是生活随笔為你收集整理的spark任务优先级设置:spark.yarn.priority的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark属性配置的优先级
- 下一篇: 【收藏】搭载nfs是客户端新建操作出现r