Spark系列(八)Worker工作原理
工作原理圖
?
源代碼分析
包名:org.apache.spark.deploy.worker
啟動driver入口點:registerWithMaster方法中的case LaunchDriver
| 1? | case?LaunchDriver(driverId,?driverDesc)?=>?{ |
| 2? | ??? logInfo(s"Asked?to?launch?driver?$driverId") |
| 3? | ??? //?創建DriverRunner對象啟動Driver |
| 4? | ??? val?driver?=?new?DriverRunner( |
| 5? | ??? conf, |
| 6? | ??? driverId, |
| 7? | ??? workDir, |
| 8? | ??? sparkHome, |
| 9? | ??? driverDesc.copy(command?=?Worker.maybeUpdateSSLSettings(driverDesc.command,?conf)), |
| 10? | ??? self, |
| 11? | ??? akkaUrl) |
| 12? | ??? //?將driver加入本地緩存 |
| 13? | ??? drivers(driverId)?=?driver |
| 14? | ??? driver.start() |
| 15? | ? |
| 16? | ??? //?增加已使用core |
| 17? | ??? coresUsed?+=?driverDesc.cores |
| 18? | ??? //?增加已使用內存 |
| 19? | ??? memoryUsed?+=?driverDesc.mem |
| 20? | } |
?
DriverRunner
管理一個driver的執行,包括失敗時自動重啟driver,這種方式僅僅適用于standalone集群部署模式
DriverRunner類中start方法實現
| 1? | def?start()?=?{ |
| 2? | ??? //?創建新線程 |
| 3? | ??? new?Thread("DriverRunner?for?"?+?driverId)?{ |
| 4? | ??? ??override?def?run()?{ |
| 5? | ??? ??? try?{ |
| 6? | ??? ??? ??//?創建driver工作目錄 |
| 7? | ??? ??? ??val?driverDir?=?createWorkingDirectory() |
| 8? | ??? ??? ??//?下載應用所需的的Jar包 |
| 9? | ??? ??? ??val?localJarFilename?=?downloadUserJar(driverDir) |
| 10? | ? |
| 11? | ??? ??? ??def?substituteVariables(argument:?String):?String?=?argument?match?{ |
| 12? | ??? ??? ??? case?"{{WORKER_URL}}"?=>?workerUrl |
| 13? | ??? ??? ??? case?"{{USER_JAR}}"?=>?localJarFilename |
| 14? | ??? ??? ??? case?other?=>?other |
| 15? | ??? ??? ??} |
| 16? | ? |
| 17? | ??? ??? ??//?TODO:?If?we?add?ability?to?submit?multiple?jars?they?should?also?be?added?here |
| 18? | ??? ??? ??//?構建ProcessBuilder對象,傳入啟動driver命令(所需內存大小) |
| 19? | ??? ??? ??val?builder?=?CommandUtils.buildProcessBuilder(driverDesc.command,?driverDesc.mem, |
| 20? | ??? ??? ??? sparkHome.getAbsolutePath,?substituteVariables) |
| 21? | ??? ??? ??//?啟動driver進程 |
| 22? | ??? ??? ??launchDriver(builder,?driverDir,?driverDesc.supervise) |
| 23? | ??? ??? } |
| 24? | ??? ??? catch?{ |
| 25? | ??? ??? ??case?e:?Exception?=>?finalException?=?Some(e) |
| 26? | ??? ??? } |
| 27? | ? |
| 28? | ??? ??? //?Driver退出狀態處理 |
| 29? | ??? ??? val?state?= |
| 30? | ??? ??? ??if?(killed)?{ |
| 31? | ??? ??? ??? DriverState.KILLED |
| 32? | ??? ??? ??}?else?if?(finalException.isDefined)?{ |
| 33? | ??? ??? ??? DriverState.ERROR |
| 34? | ??? ??? ??}?else?{ |
| 35? | ??? ??? ??? finalExitCode?match?{ |
| 36? | ??? ??? ??? ??case?Some(0)?=>?DriverState.FINISHED |
| 37? | ??? ??? ??? ??case?_?=>?DriverState.FAILED |
| 38? | ??? ??? ??? } |
| 39? | ??? ??? ??} |
| 40? | ? |
| 41? | ??? ??? finalState?=?Some(state) |
| 42? | ??? ??? //?向Driver所屬worker發送DriverStateChanged消息 |
| 43? | ??? ??? worker?!?DriverStateChanged(driverId,?state,?finalException) |
| 44? | ??? ??} |
| 45? | ??? }.start() |
| 46? | } |
?
LaunchExecutor
管理LaunchExecutor的啟動
| 1? | case?LaunchExecutor(masterUrl,?appId,?execId,?appDesc,?cores_,?memory_)?=> |
| 2? | ??? if?(masterUrl?!=?activeMasterUrl)?{ |
| 3? | ??? logWarning("Invalid?Master?("?+?masterUrl?+?")?attempted?to?launch?executor.") |
| 4? | ??? }?else?{ |
| 5? | ??? try?{ |
| 6? | ??? ??logInfo("Asked?to?launch?executor?%s/%d?for?%s".format(appId,?execId,?appDesc.name)) |
| 7? | ? |
| 8? | ??? ??//?Create?the?executor's?working?directory |
| 9? | ??? ??//?創建executor本地工作目錄 |
| 10? | ??? ??val?executorDir?=?new?File(workDir,?appId?+?"/"?+?execId) |
| 11? | ??? ??if?(!executorDir.mkdirs())?{ |
| 12? | ??? ??? throw?new?IOException("Failed?to?create?directory?"?+?executorDir) |
| 13? | ??? ??} |
| 14? | ? |
| 15? | ??? ??//?Create?local?dirs?for?the?executor.?These?are?passed?to?the?executor?via?the |
| 16? | ??? ??//?SPARK_LOCAL_DIRS?environment?variable,?and?deleted?by?the?Worker?when?the |
| 17? | ??? ??//?application?finishes. |
| 18? | ??? ??val?appLocalDirs?=?appDirectories.get(appId).getOrElse?{ |
| 19? | ??? ??? Utils.getOrCreateLocalRootDirs(conf).map?{?dir?=> |
| 20? | ??? ??? ??Utils.createDirectory(dir).getAbsolutePath() |
| 21? | ??? ??? }.toSeq |
| 22? | ??? ??} |
| 23? | ??? ??appDirectories(appId)?=?appLocalDirs |
| 24? | ??? ??//?創建ExecutorRunner對象 |
| 25? | ??? ??val?manager?=?new?ExecutorRunner( |
| 26? | ??? ??? appId, |
| 27? | ??? ??? execId, |
| 28? | ??? ??? appDesc.copy(command?=?Worker.maybeUpdateSSLSettings(appDesc.command,?conf)), |
| 29? | ??? ??? cores_, |
| 30? | ??? ??? memory_, |
| 31? | ??? ??? self, |
| 32? | ??? ??? workerId, |
| 33? | ??? ??? host, |
| 34? | ??? ??? webUi.boundPort, |
| 35? | ??? ??? publicAddress, |
| 36? | ??? ??? sparkHome, |
| 37? | ??? ??? executorDir, |
| 38? | ??? ??? akkaUrl, |
| 39? | ??? ??? conf, |
| 40? | ??? ??? appLocalDirs,?ExecutorState.LOADING) |
| 41? | ??? ??//?executor加入本地緩存 |
| 42? | ??? ??executors(appId?+?"/"?+?execId)?=?manager |
| 43? | ??? ??manager.start() |
| 44? | ??? ??//?增加worker已使用core |
| 45? | ??? ??coresUsed?+=?cores_ |
| 46? | ??? ??//?增加worker已使用memory |
| 47? | ??? ??memoryUsed?+=?memory_ |
| 48? | ??? ??//?通知master發送ExecutorStateChanged消息 |
| 49? | ??? ??master?!?ExecutorStateChanged(appId,?execId,?manager.state,?None,?None) |
| 50? | ??? } |
| 51? | ??? //?異常情況處理,通知master發送ExecutorStateChanged?FAILED消息 |
| 52? | ??? catch?{ |
| 53? | ??? ??case?e:?Exception?=>?{ |
| 54? | ??? ??? logError(s"Failed?to?launch?executor?$appId/$execId?for?${appDesc.name}.",?e) |
| 55? | ??? ??? if?(executors.contains(appId?+?"/"?+?execId))?{ |
| 56? | ??? ??? ??executors(appId?+?"/"?+?execId).kill() |
| 57? | ??? ??? ??executors?-=?appId?+?"/"?+?execId |
| 58? | ??? ??? } |
| 59? | ??? ??? master?!?ExecutorStateChanged(appId,?execId,?ExecutorState.FAILED, |
| 60? | ??? ??? ??Some(e.toString),?None) |
| 61? | ??? ??} |
| 62? | ??? } |
| 63? | } |
?
總結
1、Worker、Driver、Application啟動后都會向Master進行注冊,并緩存到Master內存數據模型中
2、完成注冊后發送LaunchExecutor、LaunchDriver到Worker
3、Worker收到消息后啟動executor和driver進程,并調用Worker的ExecutorStateChanged和DriverStateChanged方法
4、發送ExecutorStateChanged和DriverStateChanged消息到Master的,根據各自的狀態信息進行處理,最重要的是會調用schedule方法進行資源的重新調度
轉載于:https://www.cnblogs.com/jianyuan/p/Spark%E7%B3%BB%E5%88%97%E4%B9%8BWorker%E5%B7%A5%E4%BD%9C%E5%8E%9F%E7%90%86.html
總結
以上是生活随笔為你收集整理的Spark系列(八)Worker工作原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Educational Codeforc
- 下一篇: MongoDB 连接数高产生原因及解决