Spark读取配置源码剖析
我們知道,有一些配置可以在多個地方配置。以配置executor的memory為例,有以下三種方式:1. spark-submit的--executor-memory選項2. spark-defaults.conf的spark.executor.memory配置3. spark-env.sh的SPARK_EXECUTOR_MEMORY配置
同一個配置可以在多處設(shè)置,這顯然會造成迷惑,不知道spark為什么到現(xiàn)在還保留這樣的邏輯。如果我分別在這三處對executor的memory設(shè)置了不同的值,最終在Application中生效的是哪個?
處理這一問題的類是SparkSubmitArguments。在其構(gòu)造函數(shù)中就完成了從 『spark-submit --選項』、『spark-defaults.conf』、『spark-env.sh』中讀取配置,并根據(jù)策略決定使用哪個配置。下面分幾步來分析這個重要的構(gòu)造函數(shù)。
Step0:讀取spark-env.sh配置并寫入環(huán)境變量中
SparkSubmitArguments的參數(shù)列表包含一個env: Map[String, String] = sys.env參數(shù)。該參數(shù)包含一些系統(tǒng)環(huán)境變量的值和從spark-env.sh中讀取的配置值,如圖是我一個demo中env值的部分截圖
這一步之所以叫做Step0,是因為env的值在構(gòu)造SparkSubmitArguments對象之前就確認(rèn),即spark-env.sh在構(gòu)造SparkSubmitArguments對象前就讀取并將配置存入env中。
Step1:創(chuàng)建各配置成員并賦空值
這一步比較簡單,定義了所有要從『spark-submit --選項』、『spark-defaults.conf』、『spark-env.sh』中讀取的配置,并賦空值。下面的代碼展示了其中一部分?:
?
?var master: String = null
var deployMode: String = null
var executorMemory: String = null
var executorCores: String = null
var totalExecutorCores: String = null
var propertiesFile: String = null
var driverMemory: String = null
var driverExtraClassPath: String = null
var driverExtraLibraryPath: String = null
var driverExtraJavaOptions: String = null
var queue: String = null
var numExecutors: String = null
var files: String = null
var archives: String = null
var mainClass: String = null
var primaryResource: String = null
var name: String = null
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var jars: String = null
var packages: String = null
var repositories: String = null
var ivyRepoPath: String = null
var packagesExclusions: String = null
var verbose: Boolean = false
...
Step2:調(diào)用父類parse方法解析 spark-submit --選項
?try {
parse(args.toList)
} catch {
case e: IllegalArgumentException => SparkSubmit.printErrorAndExit(e.getMessage())
}
?
這里調(diào)用父類的SparkSubmitOptionParser#parse(List<String> args)。parse函數(shù)查找args中設(shè)置的--選項和值并解析為name和value,如--master yarn-client會被解析為值為--master的name和值為yarn-client的value。這之后調(diào)用SparkSubmitArguments#handle(MASTER, "yarn-client")進行處理。
來看看handle函數(shù)干了什么:
?/** Fill in values by parsing user options. */
override protected def handle(opt: String, value: String): Boolean = {
opt match {
case NAME =>
name = value
case MASTER =>
master = value
case CLASS =>
mainClass = value
case DEPLOY_MODE =>
if (value != "client" && value != "cluster") {
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
case NUM_EXECUTORS =>
numExecutors = value
case TOTAL_EXECUTOR_CORES =>
totalExecutorCores = value
case EXECUTOR_CORES =>
executorCores = value
case EXECUTOR_MEMORY =>
executorMemory = value
case DRIVER_MEMORY =>
driverMemory = value
case DRIVER_CORES =>
driverCores = value
case DRIVER_CLASS_PATH =>
driverExtraClassPath = value
...
case _ =>
throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
}
true
}
?
這個函數(shù)也很簡單,根據(jù)參數(shù)opt及value,設(shè)置各個成員的值。接上例,parse中調(diào)用handle("--master", "yarn-client")后,在handle函數(shù)中,master成員將被賦值為yarn-client。
注意,case MASTER中的MASTER的值在SparkSubmitOptionParser定義為--master,MASTER與其他值定義如下:
?protected final String MASTER = "--master";
protected final String CLASS = "--class";
protected final String CONF = "--conf";
protected final String DEPLOY_MODE = "--deploy-mode";
protected final String DRIVER_CLASS_PATH = "--driver-class-path";
protected final String DRIVER_CORES = "--driver-cores";
protected final String DRIVER_JAVA_OPTIONS = "--driver-java-options";
protected final String DRIVER_LIBRARY_PATH = "--driver-library-path";
protected final String DRIVER_MEMORY = "--driver-memory";
protected final String EXECUTOR_MEMORY = "--executor-memory";
protected final String FILES = "--files";
protected final String JARS = "--jars";
protected final String KILL_SUBMISSION = "--kill";
protected final String NAME = "--name";
protected final String PACKAGES = "--packages";
protected final String PACKAGES_EXCLUDE = "--exclude-packages";
protected final String PROPERTIES_FILE = "--properties-file";
protected final String PROXY_USER = "--proxy-user";
protected final String PY_FILES = "--py-files";
protected final String REPOSITORIES = "--repositories";
protected final String STATUS = "--status";
protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";
...
?
總結(jié)來說,parse函數(shù)解析了spark-submit中的--選項,并根據(jù)解析出的name和value給SparkSubmitArguments的各個成員(例如master、deployMode、executorMemory等)設(shè)置值。
Step3:mergeDefaultSparkProperties加載spark-defaults.conf中配置
Step3讀取spark-defaults.conf中的配置文件并存入sparkProperties中,sparkProperties將在下一步中發(fā)揮作用
?//< 保存從spark-defaults.conf讀取的配置
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
//< 獲取配置文件路徑,若在spark-env.sh中設(shè)置SPARK_CONF_DIR,則以該值為準(zhǔn);否則為 $SPARK_HOME/conf/spark-defaults.conf
def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
env.get("SPARK_CONF_DIR")
.orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
.map { t => new File(s"$t${File.separator}spark-defaults.conf")}
.filter(_.isFile)
.map(_.getAbsolutePath)
.orNull
}
//< 讀取spark-defaults.conf配置并存入sparkProperties中
private def mergeDefaultSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(env))
// Honor --conf before the defaults file
defaultSparkProperties.foreach { case (k, v) =>
if (!sparkProperties.contains(k)) {
sparkProperties(k) = v
}
}
}
Step4:loadEnvironmentArguments確認(rèn)每個配置成員最終值
先來看看代碼(由于篇幅太長,省略了一部分)
?private def loadEnvironmentArguments(): Unit = {
master = Option(master)
.orElse(sparkProperties.get("spark.master"))
.orElse(env.get("MASTER"))
.orNull
driverExtraClassPath = Option(driverExtraClassPath)
.orElse(sparkProperties.get("spark.driver.extraClassPath"))
.orNull
driverExtraJavaOptions = Option(driverExtraJavaOptions)
.orElse(sparkProperties.get("spark.driver.extraJavaOptions"))
.orNull
driverExtraLibraryPath = Option(driverExtraLibraryPath)
.orElse(sparkProperties.get("spark.driver.extraLibraryPath"))
.orNull
driverMemory = Option(driverMemory)
.orElse(sparkProperties.get("spark.driver.memory"))
.orElse(env.get("SPARK_DRIVER_MEMORY"))
.orNull
...
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull
// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && !isR && primaryResource != null) {
val uri = new URI(primaryResource)
val uriScheme = uri.getScheme()
uriScheme match {
case "file" =>
try {
val jar = new JarFile(uri.getPath)
// Note that this might still return null if no main-class is set; we catch that later
mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
} catch {
case e: Exception =>
SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
}
case _ =>
SparkSubmit.printErrorAndExit(
s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
"Please specify a class through --class.")
}
}
// Global defaults. These should be keep to minimum to avoid confusing behavior.
master = Option(master).getOrElse("local[*]")
// In YARN mode, app name can be set via SPARK_YARN_APP_NAME (see SPARK-5222)
if (master.startsWith("yarn")) {
name = Option(name).orElse(env.get("SPARK_YARN_APP_NAME")).orNull
}
// Set name from main class if not given
name = Option(name).orElse(Option(mainClass)).orNull
if (name == null && primaryResource != null) {
name = Utils.stripDirectory(primaryResource)
}
// Action should be SUBMIT unless otherwise specified
action = Option(action).getOrElse(SUBMIT)
}
我們單獨以確定master值的那部分代碼來說明,相關(guān)代碼如下
master = Option(master)
.orElse(sparkProperties.get("spark.master"))
.orElse(env.get("MASTER"))
.orNull
// Global defaults. These should be keep to minimum to avoid confusing behavior.
master = Option(master).getOrElse("local[*]")
確定master的值的步驟如下:1.?Option(master):若master值不為null,則以master為準(zhǔn);否則進入2。若master不為空,從上文的分析我們可以知道是從解析spark-submit --master選項得到的值2..orElse(sparkProperties.get("spark.master")):若sparkProperties.get("spark.master")范圍非null則以該返回值為準(zhǔn);否則進入3。從Step3中可以知道sparkProperties中的值都是從spark-defaults.conf中讀取3..orElse(env.get("MASTER")):若env.get("MASTER")返回非null,則以該返回值為準(zhǔn);否則進入4。env中的值從spark-env.sh讀取而來4. 若以上三處均為設(shè)置master,則取默認(rèn)值local[*]
查看其余配置成員的值的決定過程也和master一致,稍有不同的是并不是所有配置都能在spark-defaults.conf、spark-env.sh和spark-submit選項中設(shè)置。但優(yōu)先級還是一致的。
由此,我們可以得出結(jié)論,對于spark配置。若一個配置在多處設(shè)置,則優(yōu)先級如下:spark-submit --選項 > spark-defaults.conf配置 > spark-env.sh配置 > 默認(rèn)值
最后,附上流程圖
文章來源:https://github.com/keepsimplefocus/spark-sourcecodes-analysis/blob/master/markdowns/Spark%E8%AF%BB%E5%8F%96%E9%85%8D%E7%BD%AE.md
總結(jié)
以上是生活随笔為你收集整理的Spark读取配置源码剖析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark加载外部配置文件
- 下一篇: Hive JOIN使用详解