当前位置: 首页 > 工具软件 > spark-pac4j > 使用案例 >

【Spark源码】spark-submit和Spark-class

阎宾实
2023-12-01

首先从启动脚本开始看:

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10

启动脚本调用的是spark-submit,所以直接看bin/spark-submit脚本,跟spark-shell一样,先检查是否设置了${SPARK_HOME},然后启动spark-class,并传递了org.apache.spark.deploy.SparkSubmit作为第一个参数,然后把前面Spark-shell的参数都传给spark-class

# -z:判断变量的值是否为空
if [ -z "${SPARK_HOME}" ]; then
  # $0:表示当前脚本文件名 
  # dirname:用于取指定路径所在的目录,如dirname /usr/local/bin 结果为/usr/local
  # $(命令) 返回该命令的结果
  # 所以结合以上分析,结果为【 切换到 脚本 所在的目录】
  # 该命令也可以写为 `dirname $0`
  source "$(dirname "$0")"/find-spark-home  # 
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

# $@是传递给脚本的所有参数
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

脚本里面调用的是/bin/spark-class脚本

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

. "${SPARK_HOME}"/bin/load-spark-env.sh

# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ "$(command -v java)" ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

# Find Spark jars.
if [ -d "${SPARK_HOME}/jars" ]; then
  SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
  SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi

if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
  echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
  echo "You need to build Spark with the target \"package\" before running this program." 1>&2
  exit 1
else
  LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi

# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

# For tests
if [[ -n "$SPARK_TESTING" ]]; then
  unset YARN_CONF_DIR
  unset HADOOP_CONF_DIR
fi

# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
#
# The exit code of the launcher is appended to the output, so the parent shell removes it from the
# command array and checks the value to see if the launcher succeeded.
build_command() {
  "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?
}

# Turn off posix mode since it does not allow process substitution
set +o posix
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r ARG; do
  if [ "$CMD_START_FLAG" == "true" ]; then
    CMD+=("$ARG")
  else
    if [ "$ARG" == $'\0' ]; then
      # After NULL character is consumed, change the delimiter and consume command string.
      DELIM=''
      CMD_START_FLAG="true"
    elif [ "$ARG" != "" ]; then
      echo "$ARG"
    fi
  fi
done < <(build_command "$@")

COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}

# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
  echo "${CMD[@]}" | head -n-1 1>&2
  exit 1
fi

if [ $LAUNCHER_EXIT_CODE != 0 ]; then
  exit $LAUNCHER_EXIT_CODE
fi

CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

脚本中会调用org.apache.spark.launcher.Main类生成shell 执行脚本,这个类是真正的执行者,我们好好看看这个真正的入口在哪里?

首先,依然是设置项目主目录:

# 如果没有设置SPARK_HOME,shell会将当前脚本的上一级目录做为spark_home
# -z表示当串长度为0时,条件为真。  而$()和`` 都表示在shell中执行命令同时将结果返回
if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

这里使用find-spark-home脚本来进行设置,脚本内容如下

# $(cd "$(dirname "$0")"; pwd) : 输出当前脚本所在目录 如果脚本文件路径为/a/b/c.txt,则此结果返回/a/b
FIND_SPARK_HOME_PYTHON_SCRIPT="$(cd "$(dirname "$0")"; pwd)/find_spark_home.py"

# 当${SPARK_HOME}参数已经配置时,退出脚本
if [ ! -z "${SPARK_HOME}" ]; then
   exit 0
# 当FIND_SPARK_HOME_PYTHON_SCRIPT所表示的文件find_spark_home.py不存在时,进行spark_home配置
elif [ ! -f "$FIND_SPARK_HOME_PYTHON_SCRIPT" ]; then
# 设置spark_home为当前脚本所在目录的上一级目录,如脚本文件为/opt/spark-3.0.0/bin/find-spark-home,这里就返回/opt/spark-3.0.0作为SPARK_HOME
  export SPARK_HOME="$(cd "$(dirname "$0")"/..; pwd)"
....

然后,配置一些环境变量:

. "${SPARK_HOME}"/bin/load-spark-env.sh

在spark-env中设置了assembly相关的信息。

# 如果没有设置SPARK_HOME,shell会将当前脚本所在目录的上一级目录做为spark_home
if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

SPARK_ENV_SH="spark-env.sh"
if [ -z "$SPARK_ENV_LOADED" ]; then
  export SPARK_ENV_LOADED=1

  export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}"/conf}"

  SPARK_ENV_SH="${SPARK_CONF_DIR}/${SPARK_ENV_SH}"
  if [[ -f "${SPARK_ENV_SH}" ]]; then
    # 将所有变量声明提升为环境(export)变量
    # set -a中的SPARK_HOME能够在另外一个bash中访问。其实这就是set -a意义所在,它将当前变量导出,使得其他的bash中运行的脚本也能够访问改变量,但是与export不同的是只能访问,不能修改。
    # 另外如果不用set -a,其实可以通过子shell也可以访问,而不修改,但是这样做使得所有父shell中的变量都能够被子shell访问到,不能做到有范围的控制。
    set -a
    . ${SPARK_ENV_SH}
    set +a
  fi
fi

# Setting SPARK_SCALA_VERSION if not already set.

# TODO: revisit for Scala 2.13 support
export SPARK_SCALA_VERSION=2.12

然后寻找java,并赋值给RUNNER变量

# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  # command -v 可以判断一个命令是否支持,这里表示如果java命令支持则RUNNER等于java,否则提示java_home未设置
  if [ "$(command -v java)" ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

中间大部分代码是跟assembly相关的内容

最关键的就是下面这句了:

build_command() {
  # java -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  # $? : 上个命令的退出状态,或函数的返回值。
  # printf '输出类型输出格式' 输出内容
  printf "%d\0" $?
}

CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
# 将 build_command "$@" 命令输出的结果逐行添加到CMD参数中
while IFS= read -d "$DELIM" -r ARG; do
  if [ "$CMD_START_FLAG" == "true" ]; then
    CMD+=("$ARG")
  else
    if [ "$ARG" == $'\0' ]; then
      # After NULL character is consumed, change the delimiter and consume command string.
      DELIM=''
      CMD_START_FLAG="true"
    elif [ "$ARG" != "" ]; then
      echo "$ARG"
    fi
  fi
done < <(build_command "$@") # $@是传递给脚本的所有参数

# #CMD[@]获取CMD数组中的元素个数
COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}

# ${str:a:b} 表示提取字符串str从a开始的b个字符
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

首先循环读取ARG参数,加入到CMD中。然后执行了"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" 这个是真正执行的第一个spark的类。

不管是启动spark-shell,或者通过spark-submit提交jar,还是其他其他master或者worker的脚本,最后都会进入spark-class,并调用launch.main方法构建执行命令。

java -Xmx128m -cp ...jars org.apache.spark.launcher.Main "$@"

也就是说org.apache.spark.launcher.Main是被spark-class调用,从spark-class接收参数。这个类是提供spark内部脚本调用的工具类,并不是真正的执行入口。它负责调用其他类,对参数进行解析,并生成执行命令,最后将命令返回给spark-classexec “${CMD[@]}”执行。

它主要是根据提交的类型spark-submitspark-class(master、worker、hostoryserver等等),构建对应的命令解析对象SparkSubmitCommandBuilderSparkClassCommandBuilder,再通过buildCommand方法构造执行命令。

大概看一下这时sparksubmit的参数,Master和Worker后续解析:

方式参数
spark-shellorg.apache.spark.deploy.SparkSubmit
–class org.apache.spark.repl.Main
–name “Spark shell”
spark-submitorg.apache.spark.deploy.SparkSubmit
–class com.idmapping.scala.WordCount
–master yarn
–deploy-mode client
–driver-memory 4G
–executor-memory 3G
–executor-cores 2
–conf spark.serializer=org.apache.spark.serializer.KryoSerializer
–conf spark.default.parallelism=24
/user/jars/idmapping-job-1.0-SNAPSHOT.jar
file:///user/tmp/words.txt file:///user/data/wordcount/

该类在launcher模块下,简单的浏览下代码:

package org.apache.spark.launcher;

/**
 * Command line interface for the Spark launcher. Used internally by Spark scripts.
 * 这是提供spark内部脚本使用工具类
 */
class Main {

  /**
   * Usage: Main [class] [class args]
   * 分为spark-submit和spark-class两种模式,但提交的是class类的话,会包含其他如:master/worker/history等等
   * This CLI works in two different modes:
   *   "spark-submit": if class is "org.apache.spark.deploy.SparkSubmit", the
   *   {@link SparkLauncher} class is used to launch a Spark application.
   *   
   *   "spark-class": 如果提供了另一个类,则运行内部Spark类
   *
   * 类unix系统的输出的参数是集合,而windows参数是空格分隔
   * This class works in tandem with the "bin/spark-class" script on Unix-like systems, and
   * "bin/spark-class2.cmd" batch script on Windows to execute the final command.
   * <p>
   * On Unix-like systems, the output is a list of command arguments, separated by the NULL
   * character. On Windows, the output is a command line suitable for direct execution from the
   * script.
   */
  
  /**
   * main这个类主要是解析参数,把需要的参数放到执行对象中
   * 如果是直接启动spark-shell调用spark-class传入的参数:
   * org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name "Spark shell" 
   * --master spark://host:7077
   */
  public static void main(String[] argsArray) throws Exception {
    // 判断参数列表
    checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");

    /** 
     * 将参数列表放入args集合中
     * 移出第一个参数赋值给classname,即执行程序。剩余参数为:
     * --class org.apache.spark.repl.Main --name "Spark shell" --master spark://host:7077
     */
    List<String> args = new ArrayList<>(Arrays.asList(argsArray));
    String className = args.remove(0);// 获取org.apache.spark.deploy.SparkSubmit

    // 判断是否打印执行信息
    // 创建命令解析器
    boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
    
    // 把执行参数解析成了k/v格式
    Map<String, String> env = new HashMap<>();
    List<String> cmd;
      
    // 构建执行程序对象:spark-submit/spark-class
    // 把参数都取出并解析,放入执行程序对象中
    // 意思是,submit还是master和worker等程序在这里拆分,并获取对应的执行参数
    if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
      // submit的判断及错误提示,帮助信息
      try {
        // 构建spark-submit命令对象
        AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args);
        // 这里才真正构建了执行命令
        // 调用了SparkClassCommandBuilder的buildCommand方法
        // 把执行参数解析成了k/v格式
        cmd = buildCommand(builder, env, printLaunchCommand);
      } catch (IllegalArgumentException e) {
        ...
      }
    } else {
      // 构建spark-class命令对象
      // 主要是在这个类里解析了命令对象和参数
      AbstractCommandBuilder builder = new SparkClassCommandBuilder(className, args);
      cmd = buildCommand(builder, env, printLaunchCommand);
    }

    // 是windows环境的话,不解析成k/v,而是用空格分隔
    if (isWindows()) {
      System.out.println(prepareWindowsCommand(cmd, env));
    } else {
      // A sequence of NULL character and newline separates command-strings and others.
      System.out.println('\0');

      // In bash, use NULL as the arg separator since it cannot be used in an argument.
      List<String> bashCmd = prepareBashCommand(cmd, env);
      for (String c : bashCmd) {
        System.out.print(c);
        System.out.print('\0');
      }
    }
  }

  /**
   * Prepare spark commands with the appropriate command builder.
   * If printLaunchCommand is set then the commands will be printed to the stderr.
   */
  private static List<String> buildCommand(
      AbstractCommandBuilder builder,
      Map<String, String> env,
      boolean printLaunchCommand) throws IOException, IllegalArgumentException {
    
    List<String> cmd = builder.buildCommand(env);
    if (printLaunchCommand) {
      System.err.println("Spark Command: " + join(" ", cmd));
      System.err.println("========================================");
    }
    return cmd;
  }

  // 以下是windows下的设置
  // 主要是在linux下使用,windows下就不怎么去关注了
 
  // 在这里构建windows下的执行命令
  private static String prepareWindowsCommand(List<String> cmd, Map<String, String> childEnv) {
    StringBuilder cmdline = new StringBuilder();
    for (Map.Entry<String, String> e : childEnv.entrySet()) {
      cmdline.append(String.format("set %s=%s", e.getKey(), e.getValue()));
      cmdline.append(" && ");
    }
    for (String arg : cmd) {
      cmdline.append(quoteForBatchScript(arg));
      cmdline.append(" ");
    }
    return cmdline.toString();
  }

  /**
   * Prepare the command for execution from a bash script. The final command will have commands to
   * set up any needed environment variables needed by the child process.
   */
  // 为windows下运行bash命令准备必须的环境变量
  private static List<String> prepareBashCommand(List<String> cmd, Map<String, String> childEnv) {
    if (childEnv.isEmpty()) {
      return cmd;
    }

    List<String> newCmd = new ArrayList<>();
    newCmd.add("env");

    for (Map.Entry<String, String> e : childEnv.entrySet()) {
      newCmd.add(String.format("%s=%s", e.getKey(), e.getValue()));
    }
    newCmd.addAll(cmd);
    return newCmd;
  }

  /**
   * spark-submit的命令行解析失败时使用的解析器. It's used as a best-effort
   * at trying to identify the class the user wanted to invoke, since that may require special
   * usage strings (handled by SparkSubmitArguments).
   */
  // 当spark-submit提交失败时,这里会再进行一次解析,再不行才会提示用法
  private static class MainClassOptionParser extends SparkSubmitOptionParser {

    String className;

    @Override
    protected boolean handle(String opt, String value) {
      if (CLASS.equals(opt)) {
        className = value;
      }
      return false;
    }

    @Override
    protected boolean handleUnknown(String opt) {
      return false;
    }

    @Override
    protected void handleExtraArgs(List<String> extra) {

    }

  }

}

SparkSubmitCommandBuilder类

/**
   * This constructor is used when invoking spark-submit; it parses and validates arguments
   * provided by the user on the command line.
   */
  SparkSubmitCommandBuilder(List<String> args) {
    this.allowsMixedArguments = false;
    this.parsedArgs = new ArrayList<>();
    boolean isExample = false;
    List<String> submitArgs = args;
    this.userArgs = Collections.emptyList();

    if (args.size() > 0) {
      // 判断是saprkR-sell还是pyspark-shell
      switch (args.get(0)) {
        case PYSPARK_SHELL:
          this.allowsMixedArguments = true;
          appResource = PYSPARK_SHELL;
          submitArgs = args.subList(1, args.size());
          break;

        case SPARKR_SHELL:
          this.allowsMixedArguments = true;
          appResource = SPARKR_SHELL;
          submitArgs = args.subList(1, args.size());
          break;

        // 不是pyspark或R,则是example
        case RUN_EXAMPLE:
          isExample = true;
          appResource = SparkLauncher.NO_RESOURCE;
          submitArgs = args.subList(1, args.size());
      }

      this.isExample = isExample;
      OptionParser parser = new OptionParser(true);
      parser.parse(submitArgs);
      this.isSpecialCommand = parser.isSpecialCommand;
    } else {
      this.isExample = isExample;
      this.isSpecialCommand = true;
    }
  }

launcher.Main返回的数据存储到CMD中。

然后执行命令:

exec "${CMD[@]}"

这里开始真正执行某个Spark的类。

最后来说说这个exec命令,想要理解它跟着其他几个命令一起学习:

  • source命令,在执行脚本的时候,会在当前的shell中直接把source执行的脚本给挪到自己的shell中执行。换句话说,就是把目标脚本的任务拿过来自己执行。
  • exec命令,是创建一个新的进程,只不过这个进程与前一个进程的ID是一样的。这样,原来的脚本剩余的部分就不能执行了,因为相当于换了一个进程。另外,创建新进程并不是说把所有的东西都直接复制,而是采用写时复制,即在新进程使用到某些内容时,才拷贝这些内容
  • sh命令则是开启一个新的shell执行,相当于创建一个新进程

举个简单的例子,下面有三个脚本:
test-1.sh

exec -c sh /home/xinghl/test/test-2.sh

test-2.sh

while true
do
        echo "a2"
        sleep 3
done

test-3.sh

sh /home/xinghl/test/test-2.sh

test-4.sh

source /home/xinghl/test/test-2.sh

在执行test-1.sh和test-4.sh的效果是一样的,都只有一个进程。
在执行test-3.sh的时候会出现两个进程。

 类似资料: