编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

Spark源码之spark-submit提交作业过程

wxchong 2024-08-22 03:46:41 开源技术 7 ℃ 0 评论

实际上spark各种脚本,spark-shell、spark-sql实现方式都是通过调用spark-submit脚本来实现的,而spark-submit又是通过 spark-class脚本来实现的,spark-class脚本最终执行org.apache.spark.launcher.Main,作为整个 Spark程序的主入口。下面我们具体来看一下。

spark-submit

首先查看一下spark-submit的位置,可以看出/usr/bin/spark-submit是个软连接:

[root@master ~]# which spark-submit
/usr/bin/spark-submit
[root@master ~]# ll /usr/bin/spark-submit
lrwxrwxrwx 1 root root 30 2月 6 10:50 /usr/bin/spark-submit -> /etc/alternatives/spark-submit
[root@master ~]# vi /usr/bin/spark-submit

下面是spark-submit脚本的内容:

#!/bin/bash
 # Reference: http://stackoverflow.com/questions/59895/can-a-bash-script-tell-what-directory-its-stored-in
 
 SOURCE="${BASH_SOURCE[0]}"
 BIN_DIR="$( dirname "$SOURCE" )"
 while [ -h "$SOURCE" ] ##为了排除软连接找到真正的spark-submit脚本的位置,-h 表示判断一个文件存在并且是一个软链接。 
 do
 SOURCE="$(readlink "$SOURCE")" ##读取这个文件的软连接,如果不是软连接则返回空
 [[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE"
 BIN_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
 done
 BIN_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
 LIB_DIR=$BIN_DIR/../lib
 export HADOOP_HOME=$LIB_DIR/hadoop
# Autodetect JAVA_HOME if not defined
. $LIB_DIR/bigtop-utils/bigtop-detect-javahome
##此时 $LIB_DIR =/opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/spark/bin/spark-submit
exec $LIB_DIR/spark/bin/spark-submit "$@"

上述脚本的主要功能是:找到spark提交的真正脚本的所在位置,在CDH版本中该位置在/opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/spark/bin/spark-submit,另一种查找spark-submit的位置方法是使用find / -name spark-submit命令。

真正的spark-submit脚本内容是:

#!/usr/bin/env bash
SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
##spark-submit调用了spark-class
##传入的类是org.apache.spark.deploy.SparkSubmit
##以及它传入的参数,如deploy mode、executor-memory等
## 此时的$SPARK_HOME=/opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/
exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

可以看出该脚本又调用了spark-class脚本,传递过去的第一个参数是org.apache.spark.deploy.SparkSubmit,$@是执行/usr/bin/spark-submit脚本时传入的参数,同样传给了spark-class。

接着来看spark-class脚本,spark-class脚本会加载spark配置的环境变量信息、定位依赖包spark-assembly-1.5.0-cdh5.6.0-hadoop2.6.0-cdh5.6.0.jar文件(以cdh2.6.0-spark1.5.0为例)等,然后再调用org.apache.spark.launcher.Main正式启动Spark应用程序的运行,spark-class脚本具体内容:

#!/usr/bin/env bash
#定位SAPRK_HOME目录,SPARK_HOME=/opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/spark
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
#加载load-spark-env.sh,运行环境相关信息
#例如配置文件conf下的spark-env.sh等
. "$SPARK_HOME"/bin/load-spark-env.sh
# 定位JAVA_HOME目录,并赋值给RUNNER变量
if [ -n "${JAVA_HOME}" ]; then
 RUNNER="${JAVA_HOME}/bin/java"
else
 if [ `command -v java` ]; then
 RUNNER="java"
 else
 echo "JAVA_HOME is not set" >&2
 exit 1
 fi
fi
#定位spark-assembly-1.5.0-cdh5.6.0-hadoop2.6.0-cdh5.6.0.jar文件(以cdh2.6.0-spark1.5.0为例)
#这意味着任务提交时无需将该JAR文件打包
"/opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/spark/bin/spark-class" 77L, 2654C
else
 ASSEMBLY_DIR="$SPARK_HOME/assembly/target/scala-$SPARK_SCALA_VERSION"
fi
num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" ]; then
 echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2
 echo "You need to build Spark before running this program." 1>&2
 exit 1
fi
ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
if [ "$num_jars" -gt "1" ]; then
 echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
 echo "$ASSEMBLY_JARS" 1>&2
 echo "Please remove all but one jar." 1>&2
 exit 1
fi
SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"
LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"
# 把launcher加到类路径中
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
 LAUNCH_CLASSPATH="$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"
# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
##执行org.apache.spark.launcher.Main作为Spark应用程序的主入口
CMD=()
while IFS= read -d '' -r ARG; do
 CMD+=("$ARG")
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
exec "${CMD[@]}"

最关键的就是下面这句了:

CMD=()
while IFS= read -d '' -r ARG; do
 CMD+=("$ARG")
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
exec "${CMD[@]}"

首先执行了"$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@ 这个是真正执行的第一个spark的类。返回值由while循环读取,加入到CMD中。

launcher.Main返回的数据存储到CMD中。

然后执行命令:

exec "${CMD[@]}"

launcher.Main

org.apache.spark.launcher.Main是Spark启动器的命令行接口,spark-class调用完org.apache.spark.launcher.Main后,执行exec "${CMD[@]}",会启动org.apache.spark.deploy.SparkSubmit的执行,org.apache.spark.launcher.Main部分源码如下:

public static void main(String[] argsArray) throws Exception {
 ...
 List<String> args = new ArrayList<String>(Arrays.asList(argsArray));
 String className = args.remove(0);
 ...
 //创建命令解析器
 AbstractCommandBuilder builder;
 if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
 try {
 //调用org.apache.spark.deploy.SparkSubmit
 builder = new SparkSubmitCommandBuilder(args);
 } catch (IllegalArgumentException e) {
 //......省略
 builder = new SparkSubmitCommandBuilder(help);
 }
 } else {
 // 启动其他类
 builder = new SparkClassCommandBuilder(className, args);
 }
 List<String> cmd = builder.buildCommand(env);//解析器解析参数
 ...
 //返回有效的参数
 if (isWindows()) {
 System.out.println(prepareWindowsCommand(cmd, env));
 } else {
 List<String> bashCmd = prepareBashCommand(cmd, env);
 for (String c : bashCmd) {
 System.out.print(c);
 System.out.print('\0');
 }
 }
 }

这样就进入了org.apache.spark.deploy.SparkSubmit方法入口函数。SparkSubmit部分源码如下:

def main(args: Array[String]): Unit = {
 //任务提交时设置的参数,例如master、executorMemory等
 val appArgs = new SparkSubmitArguments(args)
 if (appArgs.verbose) {
 // scalastyle:off println
 printStream.println(appArgs)
 // scalastyle:on println
 }
 appArgs.action match {
 //任务提交时,执行submit(appArgs)
 case SparkSubmitAction.SUBMIT => submit(appArgs)
 case SparkSubmitAction.KILL => kill(appArgs)
 case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
 }
 }
  1. 创建SparkSubmitArguments对象,并解析参数来初始化成员。
  2. 这里只分析submit过程。

SparkSubmitArguments

此类,解析并封装来自spark-submit脚本的参数。当new SparkSubmitArguments(args)的时候会顺序触发

 // Set parameters from command line arguments
 try {
 //解析命令行参数
 parse(args.toList)
 } catch {
 //省略
 }
 // Populate `sparkProperties` map from properties file
 // 将默认属性文件(默认为spark-defaults.conf)中配置的参数值与通过-conf指定的值合并;
 mergeDefaultSparkProperties()
 // Remove keys that don't start with "spark." from `sparkProperties`.
 //删除不以"spark."开头的属性
 ignoreNonSparkProperties()
 // Use `sparkProperties` map along with env vars to fill in any missing parameters
 //从环境变量、Spark属性等加载参数。此方法会设置action参数,默认为SUBMIT;
 loadEnvironmentArguments()
 //验证action操作所需要的属性都已设置
 validateArguments()
  • 解析Sprak-submit脚本传递过来的参数
  • 将默认属性文件的配置值和--conf传递过来的属性值合并,取--conf的值,
  • 删除不以"spark."开头的属性
  • 从环境变量、Spark属性等加载参数。此方法会设置action参数,默认为SUBMIT;
  • 根据action值的不同,检查action对应操作所需要的属性是否都已设置,不同的action操作有不同的检测方法。
private def validateArguments(): Unit = {
 action match {
 case SUBMIT => validateSubmitArguments()
 case KILL => validateKillArguments()
 case REQUEST_STATUS => validateStatusRequestArguments()
 }
}

Spark属性参数优先级

  1. 应用程序中设置的参数;
  2. 命令行设置的参数;
  3. 在配置文件中设置的参数(默认为spark-defaults.conf);
  4. 环境变量中设置的属性值。

SparkSubmit.submit

该方法的功能分为两步骤:

  • 首先,我们通过设置类路径、系统属性和应用程序参数准备启动环境来运行基于集群管理器和部署模式的子主类。
  • 其次,我们使用这个启动环境来调用子类的主方法。

下面来看submit方法的实现:

private def submit(args: SparkSubmitArguments): Unit = {
 val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
 def doRunMain(): Unit = {
 // args.proxyUser可以在命令行中通过args.proxyUser传递,模拟指定的用户提交程序
 if (args.proxyUser != null) {
 val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
 UserGroupInformation.getCurrentUser())
 try {
 proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
 override def run(): Unit = {
 //重点!!!
 runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
 }
 })
 } catch {
 case e: Exception =>
 ....
 }
 } else {
 //重点!!!
 runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
 }
 }
 // In standalone cluster mode, there are two submission gateways:
 // (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
 // (2) The new REST-based gateway introduced in Spark 1.3
 // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
 // to use the legacy gateway if the master endpoint turns out to be not a REST server.
 if (args.isStandaloneCluster && args.useRest) {
 try {
 printStream.println("Running Spark using the REST application submission protocol.")
 doRunMain()
 } catch {
 // Fail over to use the legacy submission gateway
 case e: SubmitRestConnectionException =>
 printWarning(s"Master endpoint ${args.master} was not a REST server. " +
 "Falling back to legacy submission gateway instead.")
 args.useRest = false
 submit(args)
 }
 // In all other modes, just run the main class as prepared
 } else {
 doRunMain()
 }
 }
  1. prepareSubmitEnvironment其中一个职责就是设置childMainClass,它决定了应用程序主类的调用方式;
  2. 调用doRunMain内部方法,它将调用runMain方法。

SparkSubmit.prepareSubmitEnvironment

该方法主要是为提交应用程序做准备,最终返回一个Tuple类型,具有4个元素:

  1. childArgs:返回的第一个参数是子程序的参数
  2. childClasspath:进入子程序所需要的类路径
  3. sysProps:一个包括系统属性的map类型
  4. childMainClass:主类名,即org.apache.spark.deploy.yarn.Client、org.apache.spark.deploy.Client、org.apache.spark.deploy.rest.RestSubmissionClient或应用程序主类名等,这些Client类是userClass的包装类,userClass就是应用程序主类名,也就是用户自己编写的Spark程序的主类。 prepareSubmitEnvironment的方法签名:
private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
 : (Seq[String], Seq[String], Map[String, String], String) = {
  1. 设置集群资源管理器,表示程序将会运行在哪个集群资源管理器上
val clusterManager: Int = args.master match {
 case m if m.startsWith("yarn") => YARN
 case m if m.startsWith("spark") => STANDALONE
 case m if m.startsWith("mesos") => MESOS
 case m if m.startsWith("local") => LOCAL
 case _ => printErrorAndExit("Master must start with yarn, spark, mesos, or local"); -1
}
  1. 设置应用程序部署模式
var deployMode: Int = args.deployMode match {
 case "client" | null => CLIENT
 case "cluster" => CLUSTER
 case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
}
  1. 在client模式下,直接将childMainClass设置为应用程序主类名
 //在client模式下,直接启动应用程序主类,并将应用程序jar和其它依赖的jar添加到类路径中
 if (deployMode == CLIENT) {
 childMainClass = args.mainClass
 if (isUserJar(args.primaryResource)) {
 childClasspath += args.primaryResource
 }
 if (args.jars != null) { childClasspath ++= args.jars.split(",") }
 if (args.childArgs != null) { childArgs ++= args.childArgs }
 }
  1. 在standalone cluster模式下,将childMainClass设置org.apache.spark.deploy.rest.RestSubmissionClient或org.apache.spark.deploy.Client
 if (args.isStandaloneCluster) {
 if (args.useRest) {
 childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
 childArgs += (args.primaryResource, args.mainClass)
 } else {
 // In legacy standalone cluster mode, use Client as a wrapper around the user class
 childMainClass = "org.apache.spark.deploy.Client"
 if (args.supervise) { childArgs += "--supervise" }
 Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
 Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
 childArgs += "launch"
 childArgs += (args.master, args.primaryResource, args.mainClass)
 }
 if (args.childArgs != null) {
 childArgs ++= args.childArgs
 }
 }
  1. 如果提交方式是yarn-cluster,则将childMainClass设置为org.apache.spark.deploy.yarn.Client
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
//在yarn-cluster模式下,使用yarn.Client用户包装类
 if (isYarnCluster) {
 childMainClass = "org.apache.spark.deploy.yarn.Client"
 // ................
 }

SparkSubmit.runMain

  1. runMain方法的定义:
private def runMain(
 childArgs: Seq[String],
 childClasspath: Seq[String],
 sysProps: Map[String, String],
 childMainClass: String,
 verbose: Boolean): Unit = {
  1. verbose可以在命令行中使用--verbose, -v设置,将runMain的参数输出到控制台:
if (verbose) {
 printStream.println(s"Main class:\n$childMainClass")
 printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
 printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
 printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
 printStream.println("\n")
 }

举个例子,如果提交方式为client:spark-submit -v --class com.carol.SparkTest --master yarn-client testSpark.jar 100 2,那么,会有如下输出:

Main class:
com.carol.SparkTest
Arguments:
100 
2
--name
testSpark
--jar
file:/home/jars/testSpark.jar
--class
com.carol.SparkTest
System properties:
.....
Classpath elements:
.....

如果提交方式为cluster:spark-submit -v --class com.carol.SparkTest --master yarn-cluster testSpark.jar 100 2,那么,会有如下输出:

Main class:
org.apache.spark.deploy.yarn.Client
Arguments:
--name
testSpark
--jar
file:/home/jars/testSpark.jar
--class
com.carol.SparkTest
--arg
100
--arg
2
System properties:
.....
Classpath elements:
.....
  1. 加载jar
for (jar <- childClasspath) {
 addJarToClasspath(jar, loader)
 }
  1. 将Spark属性参数设置为系统属性(很多地方采用从System属性中获取参数,比如创建SparkConf时从系统中加载):
for ((key, value) <- sysProps) {
 System.setProperty(key, value)
 }
  1. 创建childMainClass的类对象:
try {
 mainClass = Utils.classForName(childMainClass)
 } catch {
 // .....
 }
  1. 获取main方法对象:
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
  1. 调用main方法:
 try {
 mainMethod.invoke(null, childArgs.toArray)
 } catch {
 // .....
 }

到此,就已经调用prepareSubmitEnvironment方法设置的childMainClass类了。那么childMainClass的取值可为:

应用程序主类名; //Client
org.apache.spark.deploy.rest.StandaloneRestClient; // Standalone rest Client
org.apache.spark.deploy.Client; // Standalone Client
org.apache.spark.deploy.yarn.Client //yarn cluster

接下来就是通过Master、Worker启动DriverWrapper进程,进而启动应用程序主类的过程。

好了,内容就到这里了,照例放张美图休息一下

感兴趣的朋友可以点个关注,我们一起学习进步!

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表