Spark源码分析之Worker
Spark中Worker的啟動有多種方式,但是最終調用的都是org.apache.spark.deploy.worker.Worker類,啟動Worker節點的時候可以傳很多的參數:內存、核、工作目錄等。如果你不知道如何傳遞,沒關系,help一下即可:
| [wyp@iteblogspark]$ ./bin/spark-classorg.apache.spark.deploy.worker.Worker -h Spark assembly has been built with Hive, including Datanucleus jars on classpath Usage: Worker [options] <master> Master must be a URL of the form spark://hostname:port Options: ??-c CORES, --cores CORES? Number of cores to use ??-m MEM, --memory MEM???? Amount of memory to use (e.g. 1000M, 2G) ??-d DIR, --work-dir DIR?? Directory to run apps in (default: SPARK_HOME/work) ??-i HOST, --ip IP???????? Hostname to listen on (deprecated, please use --host or -h) ??-h HOST, --host HOST???? Hostname to listen on ??-p PORT, --port PORT???? Port to listen on (default: random) ??--webui-port PORT??????? Portfor web UI (default:8081) |
從上面的輸出我們可以看出Worker的啟動支持多達7個參數!這樣每個都這樣輸入豈不是很麻煩?其實,我們不用擔心,Worker節點啟動地時候將先讀取conf/spark-env.sh里面的配置,這些參數配置的解析都是由Worker中的WorkerArguments類進行解析的。如果你沒有設置內存,那么將會把Worker啟動所在機器的所有內存(會預先留下1G內存給操作系統)分給Worker,具體的代碼實現如下:
| def inferDefaultMemory(): Int = { ????valibmVendor = System.getProperty("java.vendor").contains("IBM") ????vartotalMb = 0 ????try{ ??????valbean = ManagementFactory.getOperatingSystemMXBean() ??????if(ibmVendor) { ????????valbeanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean") ????????valmethod = beanClass.getDeclaredMethod("getTotalPhysicalMemory") ????????totalMb= (method.invoke(bean).asInstanceOf[Long] /1024 / 1024).toInt ??????}else { ????????valbeanClass = Class.forName("com.sun.management.OperatingSystemMXBean") ????????valmethod = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize") ????????totalMb= (method.invoke(bean).asInstanceOf[Long] /1024 / 1024).toInt ??????} ????}catch { ??????casee: Exception => { ????????totalMb= 2*1024 ????????System.out.println("Failed to get total physical memory. Using "+ totalMb + " MB") ??????} ????} ????// Leave out 1 GB for the operating system, but don't return a negative memory size ????math.max(totalMb -1024, 512) ??} |
同樣,如果你沒設置cores,那么Spark將會獲取你機器的所有可用的核作為參數傳進去。解析完參數之后,將運行preStart函數,進行一些啟動相關的操作,比如判斷是否已經向Master注冊過,創建工作目錄,啟動Worker的WEB UI,向Master進行注冊等操作,如下:
| overridedef preStart() { ??assert(!registered) ??logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( ????host, port, cores, Utils.megabytesToString(memory))) ??logInfo("Spark home: "+ sparkHome) ??createWorkDir() ??context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) ??webUi= newWorkerWebUI(this, workDir, Some(webUiPort)) ??webUi.bind() ??registerWithMaster() ??metricsSystem.registerSource(workerSource) ??metricsSystem.start() } |
Worker向Master注冊的超時時間為20秒,如果在這20秒內沒有成功地向Master注冊,那么將會進行重試,重試的次數為3,如過重試的次數大于等于3,那么將無法啟動Worker,這時候,你就該看看你的網絡環境或者你的Master是否存在問題了。
Worker在運行的過程中將會觸發許多的事件, 比如:RegisteredWorker、SendHeartbeat、WorkDirCleanup以及MasterChanged等等,收到不同的事件,Worker進行不同的操作。比如,如果需要運行一個作業,Worker將會啟動一個或多個ExecutorRunner,具體的代碼可參見receiveWithLogging函數:
| overridedef receiveWithLogging= { ????caseRegisteredWorker(masterUrl, masterWebUiUrl) => ????caseSendHeartbeat => ????caseWorkDirCleanup => ????caseMasterChanged(masterUrl, masterWebUiUrl) => ????caseHeartbeat => ????? ????caseRegisterWorkerFailed(message) => ????? ????caseLaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_)=> ?????? ????caseExecutorStateChanged(appId, execId, state, message, exitStatus)=> ??????? ????caseKillExecutor(masterUrl, appId, execId) => ??????? ????caseLaunchDriver(driverId, driverDesc) => { ?????? ????caseKillDriver(driverId) => { ????caseDriverStateChanged(driverId, state, exception) => { ?????? ????casex: DisassociatedEvent if x.remoteAddress == masterAddress => ???? ????caseRequestWorkerState => { ??} |
上面的代碼是經過處理的,其實receiveWithLogging 方法是從ActorLogReceive繼承下來的。
當Worker節點Stop的時候,將會執行postStop函數,如下:
| overridedef postStop() { ??metricsSystem.report() ??registrationRetryTimer.foreach(_.cancel()) ??executors.values.foreach(_.kill()) ??drivers.values.foreach(_.kill()) ??webUi.stop() ??metricsSystem.stop() } |
總結
以上是生活随笔為你收集整理的Spark源码分析之Worker的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark配置属性详解
- 下一篇: 自定义线程类中实例变量与其他线程共享与不