深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析
第五、第六、第七篇博文,我們講解了Standalone模式集群是如何啟動的,一個App起來了后,集群是如何分配資源,Worker啟動Executor的,Task來是如何執行它,執行得到的結果如何處理,以及app退出后,分配了的資源如何回收。
但在分布式系統中,由于機器眾多,所有發生故障是在所難免的,若運行過程中Executor、Worker或者Master異常退出了,那該怎么辦呢?這篇博文,我們就來講講在Standalone模式下,Spark的集群容錯與高可用性(HA)。
Executor
Worker.receive
我先回到《深入理解Spark 2.1 Core (六):資源調度的原理與源碼分析 》的ExecutorRunner.fetchAndRunExecutor中,看看executor的退出:
// executor會退出并返回0或者非0的exitCodeval exitCode = process.waitFor()state = ExecutorState.EXITEDval message = "Command exited with code " + exitCode// 給Worker發送ExecutorStateChanged信號worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))- 1
worker接收到了ExecutorStateChanged信號后,調用handleExecutorStateChanged
case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>handleExecutorStateChanged(executorStateChanged)Worker.handleExecutorStateChanged
private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged):Unit = {// 給Master發送executorStateChanged信號sendToMaster(executorStateChanged)val state = executorStateChanged.stateif (ExecutorState.isFinished(state)) {// 釋放executor資源val appId = executorStateChanged.appIdval fullId = appId + "/" + executorStateChanged.execIdval message = executorStateChanged.messageval exitStatus = executorStateChanged.exitStatusexecutors.get(fullId) match {case Some(executor) =>logInfo("Executor " + fullId + " finished with state " + state +message.map(" message " + _).getOrElse("") +exitStatus.map(" exitStatus " + _).getOrElse(""))executors -= fullIdfinishedExecutors(fullId) = executortrimFinishedExecutorsIfNecessary()coresUsed -= executor.coresmemoryUsed -= executor.memorycase None =>logInfo("Unknown Executor " + fullId + " finished with state " + state +message.map(" message " + _).getOrElse("") +exitStatus.map(" exitStatus " + _).getOrElse(""))}maybeCleanupApplication(appId)}} }Master.receive
Master接收到ExecutorStateChanged信號后:
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>// 通過appId取到App的信息,// 在App的信息中找到該executor的信息val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))execOption match {case Some(exec) =>val appInfo = idToApp(appId)// 改變改executor的狀態val oldState = exec.stateexec.state = stateif (state == ExecutorState.RUNNING) {assert(oldState == ExecutorState.LAUNCHING,s"executor $execId state transfer from $oldState to RUNNING is illegal")appInfo.resetRetryCount()}exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))if (ExecutorState.isFinished(state)) {logInfo(s"Removing executor ${exec.fullId} because it is $state")// 若該app已經結束, // 保持原來的executor信息,// 用于呈現在Web UI上,// 若該app還沒結束,// 則從app信息中移除該executorif (!appInfo.isFinished) {appInfo.removeExecutor(exec)}// 把executor從worker中移除exec.worker.removeExecutor(exec)// 獲取executor退出狀態val normalExit = exitStatus == Some(0)// 若executor退出狀態非正常,// 且app重新嘗試調度次數到達最大重試次數,// 則移除這個appif (!normalExit&& appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES&& MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing pathval execs = appInfo.executors.valuesif (!execs.exists(_.state == ExecutorState.RUNNING)) {logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +s"${appInfo.retryCount} times; removing it")removeApplication(appInfo, ApplicationState.FAILED)}}}//重新調度schedule()- 1
- 45
Worker
Worker.killProcess
我們回到《深入理解Spark 2.1 Core (六):資源調度的原理與源碼分析 》的ExecutorRunner.start中:
// 創建Shutdownhook線程 // 用于worker關閉時,殺掉executorshutdownHook = ShutdownHookManager.addShutdownHook { () =>if (state == ExecutorState.RUNNING) {state = ExecutorState.FAILED}killProcess(Some("Worker shutting down")) }}- 1
worker退出后,ShutdownHookManager會調用killProcess殺死executor:
private def killProcess(message: Option[String]) {var exitCode: Option[Int] = Noneif (process != null) {logInfo("Killing process!")// 停止運行日志輸出if (stdoutAppender != null) {stdoutAppender.stop()}if (stderrAppender != null) {// 停止錯誤日志輸出stderrAppender.stop()}// kill executor進程,// 并返回結束類型exitCode = Utils.terminateProcess(process, EXECUTOR_TERMINATE_TIMEOUT_MS)if (exitCode.isEmpty) {logWarning("Failed to terminate process: " + process +". This process will likely be orphaned.")}}try {// 給worker發送ExecutorStateChanged信號worker.send(ExecutorStateChanged(appId, execId, state, message, exitCode))} catch {case e: IllegalStateException => logWarning(e.getMessage(), e)}}- 1
Master.timeOutDeadWorkers
當Worker向Master注冊直接的時候,會向worker的handleRegisterResponse發送RegisteredWorker信號。handleRegisterResponse處理該信號時會啟動一個線程,來不斷的給worker自己的SendHeartbeat信號
case RegisteredWorker(masterRef, masterWebUiUrl) =>***forwordMessageScheduler.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {self.send(SendHeartbeat)}}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)***- 1
worker receive到SendHeartbeat信號后,處理:
case SendHeartbeat =>if (connected) { sendToMaster(Heartbeat(workerId, self)) }給Master發送Heartbeat信號。
Master接收到Heartbeat信號后處理:
case Heartbeat(workerId, worker) =>idToWorker.get(workerId) match {case Some(workerInfo) =>// 更新worker的lastHeartbeat信息workerInfo.lastHeartbeat = System.currentTimeMillis()case None =>if (workers.map(_.id).contains(workerId)) {logWarning(s"Got heartbeat from unregistered worker $workerId." +" Asking it to re-register.")worker.send(ReconnectWorker(masterUrl))} else {logWarning(s"Got heartbeat from unregistered worker $workerId." +" This worker was never registered, so ignoring the heartbeat.")}}- 1
而在Master.onStart中我可以看到:
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {self.send(CheckForWorkerTimeOut)}}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)也專門起了一個線程給自己發送CheckForWorkerTimeOut信號。Master receive到CheckForWorkerTimeOut信號后:
case CheckForWorkerTimeOut =>timeOutDeadWorkers()調用timeOutDeadWorkers:
private def timeOutDeadWorkers() {val currentTime = System.currentTimeMillis()// 過濾出 最后收到心跳的時間 < 現在的時間 - worker心跳間隔的workerval toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray// 遍歷這些workerfor (worker <- toRemove) {// 若WorkerInfo 狀態不為 DEADif (worker.state != WorkerState.DEAD) {logWarning("Removing %s because we got no heartbeat in %d seconds".format(worker.id, WORKER_TIMEOUT_MS / 1000))// 調用removeWorkerremoveWorker(worker)} // 若WorkerInfo 狀態為 DEADelse {// 等待足夠長的時間后,// 再將它從workers列表中移除 :// 最后收到心跳的時間 < 現在的時間 - worker心跳間隔 × REAPER_ITERATIONS// REAPER_ITERATIONS 由 spark.dead.worker.persistence 參數設置,// 默認為 15if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {workers -= worker }}}}- 1
Master.removeWorker
private def removeWorker(worker: WorkerInfo) {logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)// 標志worker狀態為DEADworker.setState(WorkerState.DEAD)// 移除各個緩存idToWorker -= worker.idaddressToWorker -= worker.endpoint.addressif (reverseProxy) {webUi.removeProxyTargets(worker.id)}for (exec <- worker.executors.values) {logInfo("Telling app of lost executor: " + exec.id)// 向使用該executor的app,// 發送ExecutorUpdated信號exec.application.driver.send(ExecutorUpdated(exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))// 標志executor狀態為LOSTexec.state = ExecutorState.LOST// 將executor從app信息中移除exec.application.removeExecutor(exec)}for (driver <- worker.drivers.values) {// 重啟 或移除 Driverif (driver.desc.supervise) {logInfo(s"Re-launching ${driver.id}")relaunchDriver(driver)} else {logInfo(s"Not re-launching ${driver.id} because it was not supervised")removeDriver(driver.id, DriverState.ERROR, None)}}// 從持久化引擎中移除persistenceEngine.removeWorker(worker)}- 1
Master.removeDriver
private def removeDriver(driverId: String,finalState: DriverState,exception: Option[Exception]) {drivers.find(d => d.id == driverId) match {case Some(driver) =>logInfo(s"Removing driver: $driverId")// 從driver列表中移除drivers -= driverif (completedDrivers.size >= RETAINED_DRIVERS) {val toRemove = math.max(RETAINED_DRIVERS / 10, 1)completedDrivers.trimStart(toRemove)}// 加入到completedDrivers列表completedDrivers += driver// 從持久化引擎中移除persistenceEngine.removeDriver(driver)// 標志狀態driver.state = finalStatedriver.exception = exception// 將這個driver注冊過的worker,// 移除上面的driverdriver.worker.foreach(w => w.removeDriver(driver))// 重新調度schedule()case None =>logWarning(s"Asked to remove unknown driver: $driverId")}} }- 1
Master
接下來我們來講講Master的容錯及HA。在之前的Master代碼中出現了持久化引擎persistenceEngine的對象,其實它就是實現Master的容錯及HA的關鍵。我們先來看看Master.osStart中,會根據RECOVERY_MODE,來生成持久化引擎persistenceEngine和選舉代理 leaderElectionAgent。
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {case "ZOOKEEPER" =>logInfo("Persisting recovery state to ZooKeeper")val zkFactory =new ZooKeeperRecoveryModeFactory(conf, serializer)(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))case "FILESYSTEM" =>val fsFactory =new FileSystemRecoveryModeFactory(conf, serializer)(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))case "CUSTOM" =>// 用戶自定義機制val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer]).newInstance(conf, serializer).asInstanceOf[StandaloneRecoveryModeFactory](factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))case _ =>// 不做持久化(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))}persistenceEngine = persistenceEngine_leaderElectionAgent = leaderElectionAgent_- 1
RECOVERY_MODE由spark.deploy.recoveryMode配置,默認為NONE:
private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")接下來,我們來深入講解下FILESYSTEM和ZOOKEEPER這兩種recoveryMode。
FILESYSTEM
FILESYSTEM recoveryMode下,集群的元數據信息會保存在本地文件系統。而Master啟動后則會立即成為Active的Master。
case "FILESYSTEM" =>val fsFactory =new FileSystemRecoveryModeFactory(conf, serializer)(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))- 1
- 2
FileSystemRecoveryModeFactory會生成兩個對象,一個是FileSystemPersistenceEngine,一個是MonarchyLeaderAgent:
private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: Serializer)extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")def createPersistenceEngine(): PersistenceEngine = {logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)new FileSystemPersistenceEngine(RECOVERY_DIR, serializer)}def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {new MonarchyLeaderAgent(master)} }- 1
- 2
FileSystemRecoveryModeFactory
我們先來講解下FileSystemRecoveryModeFactory:
private[master] class FileSystemPersistenceEngine(val dir: String,val serializer: Serializer)extends PersistenceEngine with Logging {// 新建一個目錄new File(dir).mkdir()// 持久化對象,// 將對象序列化的寫入到文件override def persist(name: String, obj: Object): Unit = {serializeIntoFile(new File(dir + File.separator + name), obj)}// 去持久化override def unpersist(name: String): Unit = {val f = new File(dir + File.separator + name)if (!f.delete()) {logWarning(s"Error deleting ${f.getPath()}")}}// 讀取,// 根據文件名反序列化出override def read[T: ClassTag](prefix: String): Seq[T] = {val files = new File(dir).listFiles().filter(_.getName.startsWith(prefix))files.map(deserializeFromFile[T])}// 序列化到文件的實現private def serializeIntoFile(file: File, value: AnyRef) {// 生成新文件val created = file.createNewFile()if (!created) { throw new IllegalStateException("Could not create file: " + file) }// 輸出文件流val fileOut = new FileOutputStream(file)var out: SerializationStream = nullUtils.tryWithSafeFinally {// 根據輸出文件流 生成 輸出序列化流out = serializer.newInstance().serializeStream(fileOut)// 將值通過輸出序列化流寫入文件out.writeObject(value)} {// 關閉輸出文件流fileOut.close()if (out != null) {out.close()}}}// 從文件反序列化的實現private def deserializeFromFile[T](file: File)(implicit m: ClassTag[T]): T = {// 輸入文件流val fileIn = new FileInputStream(file)var in: DeserializationStream = nulltry {// 根據輸入文件流 生成 輸入序列化流in = serializer.newInstance().deserializeStream(fileIn)// 從文件反序列化讀取對象in.readObject[T]()} finally {// 關閉輸入文件流fileIn.close()if (in != null) {in.close()}}}}MonarchyLeaderAgent
@DeveloperApi trait LeaderElectionAgent {val masterInstance: LeaderElectabledef stop() {} }@DeveloperApi trait LeaderElectable {def electedLeader(): Unitdef revokedLeadership(): Unit }// 選舉代理的單點實現 // 總是啟動最初的Master private[spark] class MonarchyLeaderAgent(val masterInstance: LeaderElectable)extends LeaderElectionAgent {masterInstance.electedLeader() }- 7
ZOOKEEPER
ZOOKEEPER recoveryMode下,集群的元數據信息會保存在ZooKeeper中。ZooKeeper會在備份的Master中選舉出新的Master,新的Master在啟動后會從ZooKeeper中獲取數據信息并且恢復這些數據。
case "ZOOKEEPER" =>logInfo("Persisting recovery state to ZooKeeper")val zkFactory =new ZooKeeperRecoveryModeFactory(conf, serializer)(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))ZooKeeperRecoveryModeFactory會生成兩個對象,一個是ZooKeeperPersistenceEngine,一個是ZooKeeperLeaderElectionAgent:
private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serializer)extends StandaloneRecoveryModeFactory(conf, serializer) {def createPersistenceEngine(): PersistenceEngine = {new ZooKeeperPersistenceEngine(conf, serializer)}def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {new ZooKeeperLeaderElectionAgent(master, conf)} }- 1
ZooKeeperPersistenceEngine
我們先來講解下ZooKeeperPersistenceEngine:
private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer)extends PersistenceEnginewith Logging {// 創建zk 及工作路徑private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)SparkCuratorUtil.mkdir(zk, WORKING_DIR)// 持久化對象,// 將對象序列化的寫入到zkoverride def persist(name: String, obj: Object): Unit = {serializeIntoFile(WORKING_DIR + "/" + name, obj)}// 去持久化override def unpersist(name: String): Unit = {zk.delete().forPath(WORKING_DIR + "/" + name)}// 讀取,// 根據文件名反序列化出override def read[T: ClassTag](prefix: String): Seq[T] = {zk.getChildren.forPath(WORKING_DIR).asScala.filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T])}// 關閉zkoverride def close() {zk.close()}// 序列化到zk的實現private def serializeIntoFile(path: String, value: AnyRef) {// 序列化字節val serialized = serializer.newInstance().serialize(value)val bytes = new Array[Byte](serialized.remaining())serialized.get(bytes)// 寫入到zkzk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)}// 從zk反序列化的實現private def deserializeFromFile[T](filename: String)(implicit m: ClassTag[T]): Option[T] = {// 從zk中得到數據val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)try {// 反序列化Some(serializer.newInstance().deserialize[T](ByteBuffer.wrap(fileData)))} catch {case e: Exception =>logWarning("Exception while reading persisted file, deleting", e)zk.delete().forPath(WORKING_DIR + "/" + filename)None}} }- 1
- 2
- 3
ZooKeeperLeaderElectionAgent
ZooKeeperLeaderElectionAgent被創建會調用start:
private def start() {logInfo("Starting ZooKeeper LeaderElection agent")zk = SparkCuratorUtil.newClient(conf)leaderLatch = new LeaderLatch(zk, WORKING_DIR)leaderLatch.addListener(this)leaderLatch.start()}- 1
leaderLatch.start(),啟動了leader的競爭與選舉。涉及到的ZooKeeper選舉實現,已不在Spark源碼范疇,所以在這不再講解。
總結
- Executor退出:向worker發送ExecutorStateChanged信號;worker接收到信號后向Master發送executorStateChanged信號并釋放該Executor資源;Matser收到信號后,改變該Executor狀態,移除Web UI上該Executor的信息,若重試次數達到最大次數,則移除該Application,否則重新調度。
- Worker退出:ShutdownHookManager會調用killProcess殺死該所有的executor;Mastser利用心跳超時機制,得知Worker退出,改變該Worker狀態,將該Worker上的Executor從Application信息中移除,將該Worker上的driver重啟或移除,從持久化引擎中移除該Worker。
- Matser退出:FILESYSTEM recoveryMode下,集群的元數據信息會保存在本地文件系統,而Master啟動后則會立即成為Active的Master;ZOOKEEPER recoveryMode下,集群的元數據信息會保存在ZooKeeper中,ZooKeeper會在備份的Master中選舉出新的Master,新的Master在啟動后會從ZooKeeper中獲取數據信息并且恢復這些數據;除此之外還有用戶自定義的恢復機制和不做持久化的機制。
總結
以上是生活随笔為你收集整理的深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解Spark 2.1 Core (
- 下一篇: 深入理解Spark 2.1 Core (