首先从启动脚本开始看:
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
启动脚本调用的是spark-submit
,所以直接看bin/spark-submit
脚本,跟spark-shell
一样,先检查是否设置了${SPARK_HOME}
,然后启动spark-class
,并传递了org.apache.spark.deploy.SparkSubmit
作为第一个参数,然后把前面Spark-shell的参数都传给spark-class
# -z:判断变量的值是否为空
if [ -z "${SPARK_HOME}" ]; then
# $0:表示当前脚本文件名
# dirname:用于取指定路径所在的目录,如dirname /usr/local/bin 结果为/usr/local
# $(命令) 返回该命令的结果
# 所以结合以上分析,结果为【 切换到 脚本 所在的目录】
# 该命令也可以写为 `dirname $0`
source "$(dirname "$0")"/find-spark-home #
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
# $@是传递给脚本的所有参数
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
脚本里面调用的是/bin/spark-class
脚本
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
. "${SPARK_HOME}"/bin/load-spark-env.sh
# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ "$(command -v java)" ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
# Find Spark jars.
if [ -d "${SPARK_HOME}/jars" ]; then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
echo "You need to build Spark with the target \"package\" before running this program." 1>&2
exit 1
else
LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi
# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
# For tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi
# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
#
# The exit code of the launcher is appended to the output, so the parent shell removes it from the
# command array and checks the value to see if the launcher succeeded.
build_command() {
"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
# Turn off posix mode since it does not allow process substitution
set +o posix
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r ARG; do
if [ "$CMD_START_FLAG" == "true" ]; then
CMD+=("$ARG")
else
if [ "$ARG" == $'\0' ]; then
# After NULL character is consumed, change the delimiter and consume command string.
DELIM=''
CMD_START_FLAG="true"
elif [ "$ARG" != "" ]; then
echo "$ARG"
fi
fi
done < <(build_command "$@")
COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
echo "${CMD[@]}" | head -n-1 1>&2
exit 1
fi
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
exit $LAUNCHER_EXIT_CODE
fi
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
脚本中会调用org.apache.spark.launcher.Main
类生成shell 执行脚本,这个类是真正的执行者,我们好好看看这个真正的入口在哪里?
首先,依然是设置项目主目录:
# 如果没有设置SPARK_HOME,shell会将当前脚本的上一级目录做为spark_home
# -z表示当串长度为0时,条件为真。 而$()和`` 都表示在shell中执行命令同时将结果返回
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
这里使用find-spark-home脚本来进行设置,脚本内容如下
# $(cd "$(dirname "$0")"; pwd) : 输出当前脚本所在目录 如果脚本文件路径为/a/b/c.txt,则此结果返回/a/b
FIND_SPARK_HOME_PYTHON_SCRIPT="$(cd "$(dirname "$0")"; pwd)/find_spark_home.py"
# 当${SPARK_HOME}参数已经配置时,退出脚本
if [ ! -z "${SPARK_HOME}" ]; then
exit 0
# 当FIND_SPARK_HOME_PYTHON_SCRIPT所表示的文件find_spark_home.py不存在时,进行spark_home配置
elif [ ! -f "$FIND_SPARK_HOME_PYTHON_SCRIPT" ]; then
# 设置spark_home为当前脚本所在目录的上一级目录,如脚本文件为/opt/spark-3.0.0/bin/find-spark-home,这里就返回/opt/spark-3.0.0作为SPARK_HOME
export SPARK_HOME="$(cd "$(dirname "$0")"/..; pwd)"
....
然后,配置一些环境变量:
. "${SPARK_HOME}"/bin/load-spark-env.sh
在spark-env中设置了assembly相关的信息。
# 如果没有设置SPARK_HOME,shell会将当前脚本所在目录的上一级目录做为spark_home
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
SPARK_ENV_SH="spark-env.sh"
if [ -z "$SPARK_ENV_LOADED" ]; then
export SPARK_ENV_LOADED=1
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}"/conf}"
SPARK_ENV_SH="${SPARK_CONF_DIR}/${SPARK_ENV_SH}"
if [[ -f "${SPARK_ENV_SH}" ]]; then
# 将所有变量声明提升为环境(export)变量
# set -a中的SPARK_HOME能够在另外一个bash中访问。其实这就是set -a意义所在,它将当前变量导出,使得其他的bash中运行的脚本也能够访问改变量,但是与export不同的是只能访问,不能修改。
# 另外如果不用set -a,其实可以通过子shell也可以访问,而不修改,但是这样做使得所有父shell中的变量都能够被子shell访问到,不能做到有范围的控制。
set -a
. ${SPARK_ENV_SH}
set +a
fi
fi
# Setting SPARK_SCALA_VERSION if not already set.
# TODO: revisit for Scala 2.13 support
export SPARK_SCALA_VERSION=2.12
然后寻找java,并赋值给RUNNER变量
# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
# command -v 可以判断一个命令是否支持,这里表示如果java命令支持则RUNNER等于java,否则提示java_home未设置
if [ "$(command -v java)" ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
中间大部分代码是跟assembly相关的内容
最关键的就是下面这句了:
build_command() {
# java -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
# $? : 上个命令的退出状态,或函数的返回值。
# printf '输出类型输出格式' 输出内容
printf "%d\0" $?
}
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
# 将 build_command "$@" 命令输出的结果逐行添加到CMD参数中
while IFS= read -d "$DELIM" -r ARG; do
if [ "$CMD_START_FLAG" == "true" ]; then
CMD+=("$ARG")
else
if [ "$ARG" == $'\0' ]; then
# After NULL character is consumed, change the delimiter and consume command string.
DELIM=''
CMD_START_FLAG="true"
elif [ "$ARG" != "" ]; then
echo "$ARG"
fi
fi
done < <(build_command "$@") # $@是传递给脚本的所有参数
# #CMD[@]获取CMD数组中的元素个数
COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
# ${str:a:b} 表示提取字符串str从a开始的b个字符
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
首先循环读取ARG参数,加入到CMD中。然后执行了"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
这个是真正执行的第一个spark的类。
不管是启动spark-shell
,或者通过spark-submit
提交jar,还是其他其他master或者worker的脚本,最后都会进入spark-class
,并调用launch.main
方法构建执行命令。
java -Xmx128m -cp ...jars org.apache.spark.launcher.Main "$@"
也就是说org.apache.spark.launcher.Main
是被spark-class调用,从spark-class
接收参数。这个类是提供spark内部脚本调用的工具类,并不是真正的执行入口。它负责调用其他类,对参数进行解析,并生成执行命令,最后将命令返回给spark-class
的 exec “${CMD[@]}”
执行。
它主要是根据提交的类型spark-submit
和spark-class
(master、worker、hostoryserver等等),构建对应的命令解析对象SparkSubmitCommandBuilder
和SparkClassCommandBuilder
,再通过buildCommand
方法构造执行命令。
大概看一下这时sparksubmit的参数,Master和Worker后续解析:
方式 | 参数 |
---|---|
spark-shell | org.apache.spark.deploy.SparkSubmit –class org.apache.spark.repl.Main –name “Spark shell” |
spark-submit | org.apache.spark.deploy.SparkSubmit –class com.idmapping.scala.WordCount –master yarn –deploy-mode client –driver-memory 4G –executor-memory 3G –executor-cores 2 –conf spark.serializer=org.apache.spark.serializer.KryoSerializer –conf spark.default.parallelism=24 /user/jars/idmapping-job-1.0-SNAPSHOT.jar file:///user/tmp/words.txt file:///user/data/wordcount/ |
该类在launcher模块下,简单的浏览下代码:
package org.apache.spark.launcher;
/**
* Command line interface for the Spark launcher. Used internally by Spark scripts.
* 这是提供spark内部脚本使用工具类
*/
class Main {
/**
* Usage: Main [class] [class args]
* 分为spark-submit和spark-class两种模式,但提交的是class类的话,会包含其他如:master/worker/history等等
* This CLI works in two different modes:
* "spark-submit": if class is "org.apache.spark.deploy.SparkSubmit", the
* {@link SparkLauncher} class is used to launch a Spark application.
*
* "spark-class": 如果提供了另一个类,则运行内部Spark类
*
* 类unix系统的输出的参数是集合,而windows参数是空格分隔
* This class works in tandem with the "bin/spark-class" script on Unix-like systems, and
* "bin/spark-class2.cmd" batch script on Windows to execute the final command.
* <p>
* On Unix-like systems, the output is a list of command arguments, separated by the NULL
* character. On Windows, the output is a command line suitable for direct execution from the
* script.
*/
/**
* main这个类主要是解析参数,把需要的参数放到执行对象中
* 如果是直接启动spark-shell调用spark-class传入的参数:
* org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name "Spark shell"
* --master spark://host:7077
*/
public static void main(String[] argsArray) throws Exception {
// 判断参数列表
checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
/**
* 将参数列表放入args集合中
* 移出第一个参数赋值给classname,即执行程序。剩余参数为:
* --class org.apache.spark.repl.Main --name "Spark shell" --master spark://host:7077
*/
List<String> args = new ArrayList<>(Arrays.asList(argsArray));
String className = args.remove(0);// 获取org.apache.spark.deploy.SparkSubmit
// 判断是否打印执行信息
// 创建命令解析器
boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
// 把执行参数解析成了k/v格式
Map<String, String> env = new HashMap<>();
List<String> cmd;
// 构建执行程序对象:spark-submit/spark-class
// 把参数都取出并解析,放入执行程序对象中
// 意思是,submit还是master和worker等程序在这里拆分,并获取对应的执行参数
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
// submit的判断及错误提示,帮助信息
try {
// 构建spark-submit命令对象
AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args);
// 这里才真正构建了执行命令
// 调用了SparkClassCommandBuilder的buildCommand方法
// 把执行参数解析成了k/v格式
cmd = buildCommand(builder, env, printLaunchCommand);
} catch (IllegalArgumentException e) {
...
}
} else {
// 构建spark-class命令对象
// 主要是在这个类里解析了命令对象和参数
AbstractCommandBuilder builder = new SparkClassCommandBuilder(className, args);
cmd = buildCommand(builder, env, printLaunchCommand);
}
// 是windows环境的话,不解析成k/v,而是用空格分隔
if (isWindows()) {
System.out.println(prepareWindowsCommand(cmd, env));
} else {
// A sequence of NULL character and newline separates command-strings and others.
System.out.println('\0');
// In bash, use NULL as the arg separator since it cannot be used in an argument.
List<String> bashCmd = prepareBashCommand(cmd, env);
for (String c : bashCmd) {
System.out.print(c);
System.out.print('\0');
}
}
}
/**
* Prepare spark commands with the appropriate command builder.
* If printLaunchCommand is set then the commands will be printed to the stderr.
*/
private static List<String> buildCommand(
AbstractCommandBuilder builder,
Map<String, String> env,
boolean printLaunchCommand) throws IOException, IllegalArgumentException {
List<String> cmd = builder.buildCommand(env);
if (printLaunchCommand) {
System.err.println("Spark Command: " + join(" ", cmd));
System.err.println("========================================");
}
return cmd;
}
// 以下是windows下的设置
// 主要是在linux下使用,windows下就不怎么去关注了
// 在这里构建windows下的执行命令
private static String prepareWindowsCommand(List<String> cmd, Map<String, String> childEnv) {
StringBuilder cmdline = new StringBuilder();
for (Map.Entry<String, String> e : childEnv.entrySet()) {
cmdline.append(String.format("set %s=%s", e.getKey(), e.getValue()));
cmdline.append(" && ");
}
for (String arg : cmd) {
cmdline.append(quoteForBatchScript(arg));
cmdline.append(" ");
}
return cmdline.toString();
}
/**
* Prepare the command for execution from a bash script. The final command will have commands to
* set up any needed environment variables needed by the child process.
*/
// 为windows下运行bash命令准备必须的环境变量
private static List<String> prepareBashCommand(List<String> cmd, Map<String, String> childEnv) {
if (childEnv.isEmpty()) {
return cmd;
}
List<String> newCmd = new ArrayList<>();
newCmd.add("env");
for (Map.Entry<String, String> e : childEnv.entrySet()) {
newCmd.add(String.format("%s=%s", e.getKey(), e.getValue()));
}
newCmd.addAll(cmd);
return newCmd;
}
/**
* spark-submit的命令行解析失败时使用的解析器. It's used as a best-effort
* at trying to identify the class the user wanted to invoke, since that may require special
* usage strings (handled by SparkSubmitArguments).
*/
// 当spark-submit提交失败时,这里会再进行一次解析,再不行才会提示用法
private static class MainClassOptionParser extends SparkSubmitOptionParser {
String className;
@Override
protected boolean handle(String opt, String value) {
if (CLASS.equals(opt)) {
className = value;
}
return false;
}
@Override
protected boolean handleUnknown(String opt) {
return false;
}
@Override
protected void handleExtraArgs(List<String> extra) {
}
}
}
SparkSubmitCommandBuilder类
/**
* This constructor is used when invoking spark-submit; it parses and validates arguments
* provided by the user on the command line.
*/
SparkSubmitCommandBuilder(List<String> args) {
this.allowsMixedArguments = false;
this.parsedArgs = new ArrayList<>();
boolean isExample = false;
List<String> submitArgs = args;
this.userArgs = Collections.emptyList();
if (args.size() > 0) {
// 判断是saprkR-sell还是pyspark-shell
switch (args.get(0)) {
case PYSPARK_SHELL:
this.allowsMixedArguments = true;
appResource = PYSPARK_SHELL;
submitArgs = args.subList(1, args.size());
break;
case SPARKR_SHELL:
this.allowsMixedArguments = true;
appResource = SPARKR_SHELL;
submitArgs = args.subList(1, args.size());
break;
// 不是pyspark或R,则是example
case RUN_EXAMPLE:
isExample = true;
appResource = SparkLauncher.NO_RESOURCE;
submitArgs = args.subList(1, args.size());
}
this.isExample = isExample;
OptionParser parser = new OptionParser(true);
parser.parse(submitArgs);
this.isSpecialCommand = parser.isSpecialCommand;
} else {
this.isExample = isExample;
this.isSpecialCommand = true;
}
}
launcher.Main
返回的数据存储到CMD中。
然后执行命令:
exec "${CMD[@]}"
这里开始真正执行某个Spark的类。
最后来说说这个exec命令,想要理解它跟着其他几个命令一起学习:
source
命令,在执行脚本的时候,会在当前的shell中直接把source执行的脚本给挪到自己的shell中执行。换句话说,就是把目标脚本的任务拿过来自己执行。exec
命令,是创建一个新的进程,只不过这个进程与前一个进程的ID是一样的。这样,原来的脚本剩余的部分就不能执行了,因为相当于换了一个进程。另外,创建新进程并不是说把所有的东西都直接复制,而是采用写时复制,即在新进程使用到某些内容时,才拷贝这些内容sh
命令则是开启一个新的shell执行,相当于创建一个新进程举个简单的例子,下面有三个脚本:
test-1.sh
exec -c sh /home/xinghl/test/test-2.sh
test-2.sh
while true
do
echo "a2"
sleep 3
done
test-3.sh
sh /home/xinghl/test/test-2.sh
test-4.sh
source /home/xinghl/test/test-2.sh
在执行test-1.sh和test-4.sh的效果是一样的,都只有一个进程。
在执行test-3.sh的时候会出现两个进程。