Это 10-й день моего участия в Gengwen Challenge.Подробности о мероприятии:Обновить вызов
Самое лысое - смотреть исходники
окрестности
- На этот раз используется искровая версия 3.0.0.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.12</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
1. Это наш скрипт для отправки задачи
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.0.0.jar
2. Давайте посмотримspark-submit
код скрипта
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
# 重点代码
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
Видно, что скрипт выполняетсяspark-class
скрипт с параметрами:org.apache.spark.deploy.SparkSubmit "$@"
3. Продолжайте отслеживатьspark-class
код скрипта
# 下方是部分源码
# 找到最后的代码 看到执行了CMD的值
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
# 接下来找到CMD是那来的?
# 下面代码是组装参数
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r ARG; do
if [ "$CMD_START_FLAG" == "true" ]; then
CMD+=("$ARG")
else
if [ "$ARG" == $'\0' ]; then
# After NULL character is consumed, change the delimiter and consume command string.
DELIM=''
CMD_START_FLAG="true"
elif [ "$ARG" != "" ]; then
echo "$ARG"
fi
fi
done < <(build_command "$@")
# 这里调用了 build_command 方法
# -Xmx128m 是不是很熟悉,这里拼接成了一个Java启动命令
# 通过运行这个类 org.apache.spark.launcher.Main 拼接好启动命令
build_command() {
"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
# 拼接完启动命令后由 exec "${CMD[@]}" 来执行
org.apache.spark.launcher.Main
После множества хитрых операций будет сгенерирована команда запуска, и команда запуститсяorg.apache.spark.deploy.SparkSubmit
4. Следующий взглядorg.apache.spark.deploy.SparkSubmit
- Сначала находим основной метод
override def main(args: Array[String]): Unit = {
//这里声明了一个匿名内部类
val submit = new SparkSubmit() {
.......
}
//调用匿名内部类方法
submit.doSubmit(args)
}
- см. далее
SparkSubmit
что делается в анонимном внутреннем классе- Переписанный parseArguments будет вызываться после вызова родительского класса doSubmit
- После вызова doSubmit будет вызван doSubmit родительского класса.
val submit = new SparkSubmit() {
self =>
//这里重写了 parseArguments 在调用父类doSubmit后,会被调用
override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
new SparkSubmitArguments(args) {
override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
override protected def logError(msg: => String): Unit = self.logError(msg)
}
}
override protected def logInfo(msg: => String): Unit = printMessage(msg)
override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")
//doSubmit被调用后会调用父类的doSubmit
override def doSubmit(args: Array[String]): Unit = {
try {
super.doSubmit(args)
} catch {
case e: SparkUserAppException =>
exitFn(e.exitCode)
}
}
}
def doSubmit(args: Array[String]): Unit = {
val uninitLog = initializeLogIfNecessary(true, silent = true)
// parseArguments 调用了上方 匿名内部类 重写的方法
val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
-
parseArguments
Что дал метод? он создалSparkSubmitArguments
используется для анализа параметров - Взгляни
SparkSubmitArguments
исходный код ключа
// 由于scala会把类中所有代码都走一便,有几百行,就不一一逼逼了
// 直接找到108行
parse(args.asJava)
//这方法主要是解析参数 类如 --class --master 啥的
//其中 handle 是重点 ,其他都是解析字符串啥的
protected def parse(args: util.List[String]): Unit = {
val eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)")
var idx = 0
idx = 0
while ( {
idx < args.size
}) {
var arg = args.get(idx)
var value = null
val m = eqSeparatedOpt.matcher(arg)
if (m.matches) {
arg = m.group(1)
value = m.group(2)
}
var name = findCliOption(arg, opts)
if (name != null) {
if (value == null) {
if (idx == args.size - 1) throw new IllegalArgumentException(String.format("Missing argument for option '%s'.", arg))
idx += 1
value = args.get(idx)
}
// 重点方法 handle
if (!handle(name, value)) break
continue
}
name = findCliOption(arg, switches)
if (name != null) {
if (!handle(name, null)) break
continue
}
if (!handleUnknown(arg)) break
idx += 1
}
if (idx < args.size) idx += 1
handleExtraArgs(args.subList(idx, args.size))
}
//handle方法是父类定义的 在 `SparkSubmitArguments` 重写
//父类代码
protected boolean handle(String opt, String value) {
//这种直接抛异常的,基本都是需要子类重写,直接找子类就行
throw new UnsupportedOperationException();
}
//子类代码 在 `SparkSubmitArguments`中
override protected def handle(opt: String, value: String): Boolean = {
opt match {
case NAME =>
name = value
//--master
case MASTER =>
master = value
//--class
case CLASS =>
mainClass = value
case DEPLOY_MODE =>
if (value != "client" && value != "cluster") {
error("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
//此处省略一些代码,很多参数,具体可自行查看
case _ =>
error(s"Unexpected argument '$opt'.")
}
action != SparkSubmitAction.PRINT_VERSION
}
- Далее продолжаем смотреть на не выполненный doSubmit
def doSubmit(args: Array[String]): Unit = {
val uninitLog = initializeLogIfNecessary(true, silent = true)
// 上面讲述了 parseArguments 都做了些啥
val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
}
// 接下来继续
// 这里用的了模式匹配
// 默认情况下啥 SUBMIT (为啥默认SUBMIT,去 SparkSubmitArguments 看,在227行: action = Option(action).getOrElse(SUBMIT))
// 可以穿参数 指定
appArgs.action match {
//接下来走 submit方法
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
5.submit(appArgs, uninitLog)
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
def doRunMain(): Unit = {
if (args.proxyUser != null) {
//我们参数没指定,所以走else,代码省略。。。可自行查看
} else {
runMain(args, uninitLog)
}
}
if (args.isStandaloneCluster && args.useRest) {
//Standalone提交方式
//代码省略。。。
} else {
//我们这里使用yarn 所以走else
//这里会调用上面的doRunMain
doRunMain()
}
}
ВходитьrunMain
метод, сохраните только код ключа, проверьте все самостоятельно
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
//执行完下面这行代码得到 childMainClass="org.apache.spark.deploy.yarn.YarnClusterApplication"
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
......
//这里是获取spark运行环境,用来给yarn发送用
val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
......
//这里拿到 org.apache.spark.deploy.yarn.YarnClusterApplication 的 class
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {
......
}
//这里判断 mainClass 是否继承了 SparkApplication
//YarnClusterApplication 继承了 SparkApplication
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
//所以会走这里 创建YarnClusterApplication对象
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}
.........
try {
//启动
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable =>
throw findCause(t)
}
}
//这方法几百行省略了一些
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
.......
//重点 关注 YARN_CLUSTER_SUBMIT_CLASS
//private[deploy] val YARN_CLUSTER_SUBMIT_CLASS ="org.apache.spark.deploy.yarn.YarnClusterApplication"
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
//取 常量 YARN_CLUSTER_SUBMIT_CLASS
//可得 childMainClass="org.apache.spark.deploy.yarn.YarnClusterApplication"
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
......
}
.......
(childArgs, childClasspath, sparkConf, childMainClass)
}
6. ДалееYarnClusterApplication
изstart
метод
Печень не может двигаться, ждите главу 2... скоро