worker启动一般包含两大部分:DriverRunner和ExcetorRunner。
worker启动driver的几个基本原理,最核心的是。worker内部会启动一个线程,这个线程可以理解为driverRunner。然后DriverRunner会去负责启动driver进程,并在之后对driver进程进行管理。
worker的启动步骤:
1- master要求worker启动driver和Excetor、launchdriver‘launchExcetor。
2- worker在内部launchdriver启动了一个线程DriverRunner;创建driver的工作目录; 封装启动driver的命令,用processBuilder启动Driver。
3- worker在内部通过LaunchExector启动一个ExectorRunner;创建Excetor的工作目录,封装启动Excetor的命令,用ProcessBuilder启动Excetor。
4- Excetor找到对应的driver,去反向注册自己。
launchdriver源码如下:
case launchdriver(driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId") val driver = new DriverRunner( conf, driverId, workdir, sparkHome, driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), self, workerUri, securityMgr) drivers(driverId) = driver driver.start() coresUsed += driverDesc.cores memoryUsed += driverDesc.mem case KillDriver(driverId) => logInfo(s"Asked to kill driver $driverId") drivers.get(driverId) match { case Some(runner) => runner.kill() case None => logError(s"Asked to kill unkNown driver $driverId") } case driverStateChanged @ DriverStateChanged(driverId, state, exception) => handleDriverStateChanged(driverStateChanged) case ReregisterWithMaster => reregisterWithMaster() case ApplicationFinished(id) => finishedApps += id maybeCleanupApplication(id) }
LaunchExecutor的源码如下:
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) // Create the executor's working directory val executorDir = new File(workdir, appId + "/" + execId) if (!executorDir.mkdirs()) { throw new IOException("Failed to create directory " + executorDir) } // Create local dirs for the executor. These are passed to the executor via the // SPARK_EXECUTOR_Dirs environment variable, and deleted by the Worker when the // application finishes. val appLocalDirs = appDirectories.getorElse(appId, { val localRootDirs = Utils.getorCreateLocalRootDirs(conf) val dirs = localRootDirs.flatMap { dir => try { val appDir = Utils.createDirectory(dir, namePrefix = "executor") Utils.chmod700(appDir) Some(appDir.getAbsolutePath()) } catch { case e: IOException => logWarning(s"${e.getMessage}. Ignoring this directory.") None } }.toSeq if (dirs.isEmpty) { throw new IOException("No subfolder can be created in " + s"${localRootDirs.mkString(",")}.") } dirs }) appDirectories(appId) = appLocalDirs val manager = new ExecutorRunner( appId, execId, appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), cores_, memory_, self, workerId, host, webUi.boundPort, publicAddress, sparkHome, executorDir, workerUri, conf, appLocalDirs, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None)) } catch { case e: Exception => logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.Failed, Some(e.toString), None)) } }
拓展:中华石杉--spark从入门到精通-第49讲
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。