当前位置: 首页 > 工具软件 > Executor > 使用案例 >

一文搞定Spark的Executor创建与启动

濮阳君浩
2023-12-01

1. Executor的创建与启动

博客中我们分析了StandaloneSchedulerBackend的启动和Application程序注册,其中有关于Application分配资源和给任务分配executor资源没有介绍,本博客中将介绍这部分的实现源码。

1.1 Master创建Application

首先Master收到RegisterApplication消息之后具体操作代码如下:

 override def receive: PartialFunction[Any, Unit] = {
   ......
    case RegisterApplication(description, driver) =>
      // TODO Prevent repeated registrations from some driver
      //忽略STANDBY状态的Master
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        //创建 Application实例对象,封装诸如系统时间戳、Application ID、默认Application需要的最犬CPU Core数量和ClientEndPoint引用令信息
        val app = createApplication(description, driver)
        //注册Application
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        //将创建好的Applocation进行持久化,防止丢失
        persistenceEngine.addApplication(app)
        //返回StandaloneAppClient ClientEndPoint 的receive处理RegisteredApplication消息
        driver.send(RegisteredApplication(app.id, self))
        //给Application分配资源
        schedule()
      }
      ........
   }

1.2 Application资源分配

从上面代码中可以看出,在创建完Application之后,将会调用schedule方法给刚创建的app分配资源。

schedule 方法有两个作用:
(1) 完成 Driver 的调度, 将 waitingDrivers 数组中的 Driver 发送到满足运行条件的 Worker 上运行;
(2) 在满足条件的 Worker 结点上为 Application启动 Executor。 schedule 方法的源代码如下所示。

具体代码如下:

 private def schedule(): Unit = {
    //Master状态不为Alive直接返回
    if (state != RecoveryState.ALIVE) {
      return
    }
    //随机打乱works,防止在同一个works上启动太多的app,与此同时过滤出Alive状态的works
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    //当前最后一个分配的work下标
    var curPos = 0
    for (driver <- waitingDrivers.toList) { 
      /**
        * 我们以轮循方式为每个等待的Driver分配work。 对于每个Driver,我们从分配给Driver的最后一个work开始,
        * 然后继续进行,直到我们遍历所有处于活跃状态的work。
        * */
      var launched = false
      var isClusterIdle = true
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {//遍历所有的work,直到driver启动
        val worker = shuffledAliveWorkers(curPos)
        //该work上没有启动driver和executor
        isClusterIdle = worker.drivers.isEmpty && worker.executors.isEmpty
        numWorkersVisited += 1
        //判断当前work资源能否启动该driver
        if (canLaunchDriver(worker, driver.desc)) {
          //向该work请求driver启动需要的资源
          val allocated = worker.acquireResources(driver.desc.resourceReqs)
          //给driver分配申请好的资源
          driver.withResources(allocated)
          //启动driver
          launchDriver(worker, driver)
          //从等待队列中删除该driver
          waitingDrivers -= driver
          //标识启动成功
          launched = true
        }
        //更新下标,如同一个循环列表
        curPos = (curPos + 1) % numWorkersAlive
      }
      if (!launched && isClusterIdle) {
        logWarning(s"Driver ${driver.id} requires more resource than any of Workers could have.")
      }
    }
    //为application在Worker上启动Excutor
    startExecutorsOnWorkers()
  }

在 Master 中, schedule 方法是一个很重要的方法, 每一次新的 Driver 的注册、 Application 的注册或者可用资源发生变动, 都将调用 schedule 方法。 schedule 方法用于为当前等待调度的 Application 调度可用的资源, 在满足条件的 Worker 结点上启动 Executor。这个方法还一个作用,就是当有 Driver 提交时,负责将 Driver 发送到一个可用资源满足 Driver 需有另外求的 Worker 结点上运行, launchDriver (worker, driver) 方法负责完成这一任务。

1.3 Executor资源分配

Application 调度成功之后, Master 将会为 Appication 在 Worker 结点上启动 Executors, 调用 startExecutorsOnWorkers 方法完成此操作,其源代码如下所示:

private def startExecutorsOnWorkers(): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    //使用最简单的FIFO策略,为待分配executor的app启动executor
    for (app <- waitingApps) {
      //获取app需要在每一个executor上分配的核心数
      val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
      // If the cores left is less than the coresPerExecutor,the cores left will not be allocated
      //如果该app剩余待分配的核数大于等于每个executor需求数目则开始分配?
      //???以前版本是只要大等于0就开始分配
      if (app.coresLeft >= coresPerExecutor) {
        // Filter out workers that don't have enough resources to launch an executor
        //筛选出没有足够资源来启动executor的workers
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canLaunchExecutor(_, app.desc))
          .sortBy(_.coresFree).reverse
        if (waitingApps.length == 1 && usableWorkers.isEmpty) {
          logWarning(s"App ${app.id} requires more resource than any of Workers could have.")
        }
        //在可用的workers上为app分配资源(可以看作预分配)
        val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

        // Now that we've decided how many cores to allocate on each worker, let's allocate them
        for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
        //实际为Worker上的Executor分配资源
          allocateWorkerResourceToExecutors(
            app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
        }
      }
    }
  }

在 scheduleExecutorsOnWorkers 方法中, 有两种选择启动 Executor 的策略,第一种是轮流均摊策略 (round - robin) , 采用圆桌算法依次轮流均摊,直到满足资源需求,轮流均摊策略通常会有更好的数据本地性, 因此它是默认的选择策略。 第二种是依次全占,在 usable­Workers 中,依次获取每个 Worker 上的全部资源,直到满足资源需求。

当使用 scheduleExecutorsOnWorkers 为 Application 分配好资源后,allocateWorkerResourceToExecutors 方法被调用, 将会在 Worker 结点上实际分配资源。 下面是 allocateWorkerResourceToExecutors 的源代码:

private def allocateWorkerResourceToExecutors(
      app: ApplicationInfo,
      assignedCores: Int,
      coresPerExecutor: Option[Int],
      worker: WorkerInfo): Unit = {
    // If the number of cores per executor is specified, we divide the cores assigned
    // to this worker evenly among the executors with no remainder.
    // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
    //计算需要启动的executor的个数
    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
    //每个executor需要分配的核数
    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
    for (i <- 1 to numExecutors) {//依次启动executor
      //请求资源
      val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor)
      //在该app上添加分配好的资源,并返回该executor描述符ExecutorDesc
      val exec = app.addExecutor(worker, coresToAssign, allocated)
      //根据返回的executor描述符ExecutorDesc启动executor
      launchExecutor(worker, exec)
      app.state = ApplicationState.RUNNING
    }
  }

1.4 启动Executor

从上面代码中可以看出,在为每个Executor分配好资源并封装成executor描述符ExecutorDesc之后,会使用launchExecutor方法来进行实际的启动Executor操作,其源代码如下:

  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    //在work上添加刚启动的executor
    worker.addExecutor(exec)
    //向work发送LaunchExecutor消息,
    worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id,
      exec.application.desc, exec.cores, exec.memory, exec.resources))
    //向driver发送ExecutorAdded消息
    exec.application.driver.send(
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }

从launchExecutor的源代码中我们可以看出,该方法首先在当前worker信息中添加新分配的executor,然后会向worker发送启动Executor的消息LaunchExecutor,该消息中携带了masterurl以及executor信息;然后会向Driver端发送ExecutorAdded信息,告诉driver新添加的driver编号,地址以及内存等信息。

当Worker接收到LaunchExecutor消息之后会在本机启动Executor,我们查看其receive中相关的源代码:

override def receive: PartialFunction[Any, Unit] = synchronized {
    case msg: RegisterWorkerResponse =>
      handleRegisterResponse(msg)
..........
    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, resources_) =>
      if (masterUrl != activeMasterUrl) {//与当前Master地址不同则不启动
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else if (decommissioned) {//这个状态暂时还不明白什么意思?
        logWarning("Asked to launch an executor while decommissioned. Not launching executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
          //创建executor的工作目录
          val executorDir = new File(workDir, appId + "/" + execId)
          if (!executorDir.mkdirs()) {
            throw new IOException("Failed to create directory " + executorDir)
          }
          /**
            * 为执行程序创建本地目录。 它们通过SPARK_EXECUTOR_DIRS环境变量传递给执行程序,并在应用程序完成时由Worker删除*/
          val appLocalDirs = appDirectories.getOrElse(appId, {
            val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)
            val dirs = localRootDirs.flatMap { dir =>
              try {
                val appDir = Utils.createDirectory(dir, namePrefix = "executor")
                //更改文件目录权限为700
                Utils.chmod700(appDir)
                //获取绝对路径
                Some(appDir.getAbsolutePath())
              } catch {
                case e: IOException =>
                  logWarning(s"${e.getMessage}. Ignoring this directory.")
                  None
              }
            }.toSeq
            if (dirs.isEmpty) {
              throw new IOException("No subfolder can be created in " +
                s"${localRootDirs.mkString(",")}.")
            }
            dirs
          })
          //在map中添加appid到目录的映射
          appDirectories(appId) = appLocalDirs
          //创建ExecutorRunner
          val manager = new ExecutorRunner(
            appId,            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),  cores_,  memory_, self,
            workerId,  webUi.scheme,  host, webUi.boundPort,  publicAddress,sparkHome, executorDir, workerUri, conf,  appLocalDirs,  ExecutorState.LAUNCHING,  resources_)
          //在Map中添加映射关系
          executors(appId + "/" + execId) = manager
          //启动ExecutorRunner
          manager.start()
          //更新该Worker使用完的资源
          coresUsed += cores_
          memoryUsed += memory_
          addResourcesUsed(resources_)
        } catch {
          case e: Exception =>
            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
              Some(e.toString), None))
        }
      }
}

从上面的代码中可以看到, 首先判断传过来的 masterUrl 是否与 activeMasterUrl 相同, 如果不相同, 说明收到的不是处于 ALIVE 状态的 Master 发送过来的请求, 这种情况下直接打印警告信息;(中间还有一个判断??????),否 则说明该请求来自 ALIVE Master, 于是为 Executor 创建工作目录, 创建好工作目录之后, 使用 appld、 execld 和 appDes 等参数创建 ExecutorRunner。顾名思义, ExecutorRunner 是 Executor 运行的地方,在ExecutorRunner 中, 有一个工作线程, 这个线程负责下载依赖的文件, 并启动 CoarseGaindExecutorBackend。 下面是 ExecutorRunner 中的线程启动的源代码。

private[worker] def start(): Unit = {
    workerThread = new Thread("ExecutorRunner for " + fullId) {
      override def run(): Unit = { 
      //在线程方法中调用fetchAndRunExecutor方法
      fetchAndRunExecutor() }
    }
    //启动线程
    workerThread.start()
    // 终止回调函数,用于杀死进程
    shutdownHook = ShutdownHookManager.addShutdownHook { () =>
      // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
      // be `ExecutorState.LAUNCHING`. In this case, we should set `state` to `FAILED`.
      if (state == ExecutorState.LAUNCHING) {
        state = ExecutorState.FAILED
      }
      killProcess(Some("Worker shutting down")) }
  }

上面代码中的方法是用于下载并运行我们的ApplicationDescription中描述的executor的,也就是该方法会实际的启动executor,我们查看其源代码:

 private def fetchAndRunExecutor(): Unit = {
    try {
      //将分配的executor(仅群集)或executor的资源保存到JSON格式的资源文件中。 仅在Standalone版本中使用。
      val resourceFileOpt = prepareResourcesFile(SPARK_EXECUTOR_PREFIX, resources, executorDir)
      //将运行参数添加资源
      val arguments = appDesc.command.arguments ++ resourceFileOpt.map(f =>
        Seq("--resourcesFile", f.getAbsolutePath)).getOrElse(Seq.empty)
      val subsOpts = appDesc.command.javaOpts.map {
        Utils.substituteAppNExecIds(_, appId, execId.toString)
      }
      val subsCommand = appDesc.command.copy(arguments = arguments, javaOpts = subsOpts)
      //根据给定的参数创建ProcessBuilder
      val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf),
        memory, sparkHome.getAbsolutePath, substituteVariables)
      //得到进程启动的命令
      val command = builder.command()
      //将启动命令格式化
      val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala)
        .mkString("\"", "\" \"", "\"")
      logInfo(s"Launch command: $redactedCommand")
      //设置进程工作目录
      builder.directory(executorDir)
      builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
      // In case we are running this from within the Spark Shell, avoid creating a "scala"
      // parent process for the executor command
      builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

      // Add webUI log urls
      val baseUrl =
        if (conf.get(UI_REVERSE_PROXY)) {
          s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
        } else {
          s"$webUiScheme$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
        }
      builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
      builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
      //启动CoarseGrainedExecutorBackend进程
      process = builder.start()
      val header = "Spark Executor Command: %s\n%s\n\n".format(
        redactedCommand, "=" * 40)
      // Redirect its stdout and stderr to files
      //将其stdout和stderr重定向到文件
      val stdout = new File(executorDir, "stdout")
      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

      val stderr = new File(executorDir, "stderr")
      Files.write(header, stderr, StandardCharsets.UTF_8)
      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
      //标识进程状态并向worker发送ExecutorStateChanged消息
      state = ExecutorState.RUNNING
      worker.send(ExecutorStateChanged(appId, execId, state, None, None))
      //等待进程结束
      val exitCode = process.waitFor()
      //改变进程状态
      state = ExecutorState.EXITED
      val message = "Command exited with code " + exitCode
      //向Worker发送消息,以便于Worker回收资源
      worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
    } catch {
      case interrupted: InterruptedException =>
        logInfo("Runner thread for executor " + fullId + " interrupted")
        state = ExecutorState.KILLED
        killProcess(None)
      case e: Exception =>
        logError("Error running executor", e)
        state = ExecutorState.FAILED
        killProcess(Some(e.toString))
    }
  }

上述方法中会执行命令来启动CoarseGrainedExecutorBackend进程,在CoarseGrainedExecutorBackend进程启动后,将会向Driver端发起注册请求。之所以要向Driver注册,是因为实际控制Executor计算任务的还是Driver,Master只是间接地为Driver 分配了Executor,分配好了之后,使用权便交到Driver手中。Driver要知道Master都为自己分配了哪些Executor,这些Executor都位于哪些Worker中,因此Executor在Worker上启动成功后,主动去联系Driver注册,以免Driver长时间等待Master分配的资源。CoarseGrainedExecutorBackend在启动过程中完成的操作,其onStart方法如下所示:

 override def onStart(): Unit = {
    logInfo("Registering PWR handler.")
    SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
      "disabling decommission feature.")(decommissionSelf)

    logInfo("Connecting to driver: " + driverUrl)
    try {
      _resources = parseOrFindResources(resourcesFileOpt)
    } catch {
      case NonFatal(e) =>
        exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
    }
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      //向Driver发送ask请求,携带executor的注册信息,等待Driver回应
      driver = Some(ref)
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
        extractAttributes, _resources, resourceProfile.id))
    }(ThreadUtils.sameThread).onComplete {
      case Success(_) =>
        self.send(RegisteredExecutor)
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

上述代码中,CoarseGrainedExecutorBackend会向Driver端发送RegisterExecutor请求,当Driver收到消息并返回成功的话,CoarseGrainedExecutorBackend会发送一个RegisteredExecutor消息,在CoarseGrainedExecutorBackend的receive中收到该消息会正式创建一个Executor。

 override def receive: PartialFunction[Any, Unit] = {
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
      //实际创建Executor
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
          resources = _resources)
          //向Driver端发送已经启动Executor的消息
        driver.get.send(LaunchedExecutor(executorId))
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }
.........
}

注:LaunchedExecutor消息会在CoarseGrainedSchedulerBackend中接收并处理。

如果喜欢的话希望点赞收藏,关注我,将不间断更新博客。

希望热爱技术的小伙伴私聊,一起学习进步

来自于热爱编程的小白

 类似资料: