1. Executor的创建与启动


1.1 Master创建Application


 override def receive: PartialFunction[Any, Unit] = {
    case RegisterApplication(description, driver) =>
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        //创建 Application实例对象,封装诸如系统时间戳、Application ID、默认Application需要的最犬CPU Core数量和ClientEndPoint引用令信息
        val app = createApplication(description, driver)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        //返回StandaloneAppClient ClientEndPoint 的receive处理RegisteredApplication消息
        driver.send(RegisteredApplication(app.id, self))

1.2 Application资源分配


schedule 方法有两个作用:
(1) 完成 Driver 的调度, 将 waitingDrivers 数组中的 Driver 发送到满足运行条件的 Worker 上运行;
(2) 在满足条件的 Worker 结点上为 Application启动 Executor。 schedule 方法的源代码如下所示。


 private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) {
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    for (driver <- waitingDrivers.toList) { 
        * 我们以轮循方式为每个等待的Driver分配work。 对于每个Driver,我们从分配给Driver的最后一个work开始,
        * 然后继续进行,直到我们遍历所有处于活跃状态的work。
        * */
      var launched = false
      var isClusterIdle = true
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {//遍历所有的work,直到driver启动
        val worker = shuffledAliveWorkers(curPos)
        isClusterIdle = worker.drivers.isEmpty && worker.executors.isEmpty
        numWorkersVisited += 1
        if (canLaunchDriver(worker, driver.desc)) {
          val allocated = worker.acquireResources(driver.desc.resourceReqs)
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        curPos = (curPos + 1) % numWorkersAlive
      if (!launched && isClusterIdle) {
        logWarning(s"Driver ${driver.id} requires more resource than any of Workers could have.")

在 Master 中, schedule 方法是一个很重要的方法, 每一次新的 Driver 的注册、 Application 的注册或者可用资源发生变动, 都将调用 schedule 方法。 schedule 方法用于为当前等待调度的 Application 调度可用的资源, 在满足条件的 Worker 结点上启动 Executor。这个方法还一个作用,就是当有 Driver 提交时,负责将 Driver 发送到一个可用资源满足 Driver 需有另外求的 Worker 结点上运行, launchDriver (worker, driver) 方法负责完成这一任务。

1.3 Executor资源分配

Application 调度成功之后, Master 将会为 Appication 在 Worker 结点上启动 Executors, 调用 startExecutorsOnWorkers 方法完成此操作,其源代码如下所示:

private def startExecutorsOnWorkers(): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    for (app <- waitingApps) {
      val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
      // If the cores left is less than the coresPerExecutor,the cores left will not be allocated
      if (app.coresLeft >= coresPerExecutor) {
        // Filter out workers that don't have enough resources to launch an executor
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canLaunchExecutor(_, app.desc))
        if (waitingApps.length == 1 && usableWorkers.isEmpty) {
          logWarning(s"App ${app.id} requires more resource than any of Workers could have.")
        val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

        // Now that we've decided how many cores to allocate on each worker, let's allocate them
        for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
            app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))

在 scheduleExecutorsOnWorkers 方法中, 有两种选择启动 Executor 的策略,第一种是轮流均摊策略 (round - robin) , 采用圆桌算法依次轮流均摊,直到满足资源需求,轮流均摊策略通常会有更好的数据本地性, 因此它是默认的选择策略。 第二种是依次全占,在 usable­Workers 中,依次获取每个 Worker 上的全部资源,直到满足资源需求。

当使用 scheduleExecutorsOnWorkers 为 Application 分配好资源后,allocateWorkerResourceToExecutors 方法被调用, 将会在 Worker 结点上实际分配资源。 下面是 allocateWorkerResourceToExecutors 的源代码:

private def allocateWorkerResourceToExecutors(
      app: ApplicationInfo,
      assignedCores: Int,
      coresPerExecutor: Option[Int],
      worker: WorkerInfo): Unit = {
    // If the number of cores per executor is specified, we divide the cores assigned
    // to this worker evenly among the executors with no remainder.
    // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
    for (i <- 1 to numExecutors) {//依次启动executor
      val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor)
      val exec = app.addExecutor(worker, coresToAssign, allocated)
      launchExecutor(worker, exec)
      app.state = ApplicationState.RUNNING

1.4 启动Executor


  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id,
      exec.application.desc, exec.cores, exec.memory, exec.resources))
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))



override def receive: PartialFunction[Any, Unit] = synchronized {
    case msg: RegisterWorkerResponse =>
    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, resources_) =>
      if (masterUrl != activeMasterUrl) {//与当前Master地址不同则不启动
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else if (decommissioned) {//这个状态暂时还不明白什么意思?
        logWarning("Asked to launch an executor while decommissioned. Not launching executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
          val executorDir = new File(workDir, appId + "/" + execId)
          if (!executorDir.mkdirs()) {
            throw new IOException("Failed to create directory " + executorDir)
            * 为执行程序创建本地目录。 它们通过SPARK_EXECUTOR_DIRS环境变量传递给执行程序,并在应用程序完成时由Worker删除*/
          val appLocalDirs = appDirectories.getOrElse(appId, {
            val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)
            val dirs = localRootDirs.flatMap { dir =>
              try {
                val appDir = Utils.createDirectory(dir, namePrefix = "executor")
              } catch {
                case e: IOException =>
                  logWarning(s"${e.getMessage}. Ignoring this directory.")
            if (dirs.isEmpty) {
              throw new IOException("No subfolder can be created in " +
          appDirectories(appId) = appLocalDirs
          val manager = new ExecutorRunner(
            appId,            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),  cores_,  memory_, self,
            workerId,  webUi.scheme,  host, webUi.boundPort,  publicAddress,sparkHome, executorDir, workerUri, conf,  appLocalDirs,  ExecutorState.LAUNCHING,  resources_)
          executors(appId + "/" + execId) = manager
          coresUsed += cores_
          memoryUsed += memory_
        } catch {
          case e: Exception =>
            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
              Some(e.toString), None))

从上面的代码中可以看到, 首先判断传过来的 masterUrl 是否与 activeMasterUrl 相同, 如果不相同, 说明收到的不是处于 ALIVE 状态的 Master 发送过来的请求, 这种情况下直接打印警告信息;(中间还有一个判断??????),否 则说明该请求来自 ALIVE Master, 于是为 Executor 创建工作目录, 创建好工作目录之后, 使用 appld、 execld 和 appDes 等参数创建 ExecutorRunner。顾名思义, ExecutorRunner 是 Executor 运行的地方,在ExecutorRunner 中, 有一个工作线程, 这个线程负责下载依赖的文件, 并启动 CoarseGaindExecutorBackend。 下面是 ExecutorRunner 中的线程启动的源代码。

private[worker] def start(): Unit = {
    workerThread = new Thread("ExecutorRunner for " + fullId) {
      override def run(): Unit = { 
      fetchAndRunExecutor() }
    // 终止回调函数,用于杀死进程
    shutdownHook = ShutdownHookManager.addShutdownHook { () =>
      // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
      // be `ExecutorState.LAUNCHING`. In this case, we should set `state` to `FAILED`.
      if (state == ExecutorState.LAUNCHING) {
        state = ExecutorState.FAILED
      killProcess(Some("Worker shutting down")) }


 private def fetchAndRunExecutor(): Unit = {
    try {
      //将分配的executor(仅群集)或executor的资源保存到JSON格式的资源文件中。 仅在Standalone版本中使用。
      val resourceFileOpt = prepareResourcesFile(SPARK_EXECUTOR_PREFIX, resources, executorDir)
      val arguments = appDesc.command.arguments ++ resourceFileOpt.map(f =>
        Seq("--resourcesFile", f.getAbsolutePath)).getOrElse(Seq.empty)
      val subsOpts = appDesc.command.javaOpts.map {
        Utils.substituteAppNExecIds(_, appId, execId.toString)
      val subsCommand = appDesc.command.copy(arguments = arguments, javaOpts = subsOpts)
      val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf),
        memory, sparkHome.getAbsolutePath, substituteVariables)
      val command = builder.command()
      val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala)
        .mkString("\"", "\" \"", "\"")
      logInfo(s"Launch command: $redactedCommand")
      builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
      // In case we are running this from within the Spark Shell, avoid creating a "scala"
      // parent process for the executor command
      builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

      // Add webUI log urls
      val baseUrl =
        if (conf.get(UI_REVERSE_PROXY)) {
        } else {
      builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
      builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
      process = builder.start()
      val header = "Spark Executor Command: %s\n%s\n\n".format(
        redactedCommand, "=" * 40)
      // Redirect its stdout and stderr to files
      val stdout = new File(executorDir, "stdout")
      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

      val stderr = new File(executorDir, "stderr")
      Files.write(header, stderr, StandardCharsets.UTF_8)
      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
      state = ExecutorState.RUNNING
      worker.send(ExecutorStateChanged(appId, execId, state, None, None))
      val exitCode = process.waitFor()
      state = ExecutorState.EXITED
      val message = "Command exited with code " + exitCode
      worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
    } catch {
      case interrupted: InterruptedException =>
        logInfo("Runner thread for executor " + fullId + " interrupted")
        state = ExecutorState.KILLED
      case e: Exception =>
        logError("Error running executor", e)
        state = ExecutorState.FAILED

上述方法中会执行命令来启动CoarseGrainedExecutorBackend进程,在CoarseGrainedExecutorBackend进程启动后,将会向Driver端发起注册请求。之所以要向Driver注册,是因为实际控制Executor计算任务的还是Driver,Master只是间接地为Driver 分配了Executor,分配好了之后,使用权便交到Driver手中。Driver要知道Master都为自己分配了哪些Executor,这些Executor都位于哪些Worker中,因此Executor在Worker上启动成功后,主动去联系Driver注册,以免Driver长时间等待Master分配的资源。CoarseGrainedExecutorBackend在启动过程中完成的操作,其onStart方法如下所示:

 override def onStart(): Unit = {
    logInfo("Registering PWR handler.")
    SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
      "disabling decommission feature.")(decommissionSelf)

    logInfo("Connecting to driver: " + driverUrl)
    try {
      _resources = parseOrFindResources(resourcesFileOpt)
    } catch {
      case NonFatal(e) =>
        exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      driver = Some(ref)
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
        extractAttributes, _resources, resourceProfile.id))
    }(ThreadUtils.sameThread).onComplete {
      case Success(_) =>
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)


 override def receive: PartialFunction[Any, Unit] = {
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
          resources = _resources)
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)




