一、SparkSubmit
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
打开spark-submit 脚本
进入
org.apache.spark.deploy.yarn.YarnClusterApplication
查看 submitApplication
查看 createContainerLaunchContext
小结:
SparkSubmit
— main
— doSubmit
// 解析参数
— parseArguments// master => –master => yarn
// mainClass => –class => SparkPi(WordCount)
— parse— submit
— doRunMain
— runMain
// (childArgs, childClasspath, sparkConf, childMainClass)
// 【Cluster】childMainClass => org.apache.spark.deploy.yarn.YarnClusterApplication
// 【Client 】childMainClass => SparkPi(WordCount)
— prepareSubmitEnvironment// Class.forName(“xxxxxxx”)
— mainClass : Class = Utils.classForName(childMainClass)// classOf[SparkApplication].isAssignableFrom(mainClass)
— 【Cluster】a). mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
— 【Client 】b). new JavaMainApplication(mainClass)— app.start
YarnClusterApplication
— start
// new ClientArguments
// –class => userClass => SparkPI(WordCount)
— new Client— client.run
— submitApplication
// 【Cluster】 org.apache.spark.deploy.yarn.ApplicationMaster
// 【Client 】 org.apache.spark.deploy.yarn.ExecutorLauncher
–createContainerLaunchContext
–createApplicationSubmissionContext
二、ApplicationMaster
这里不得不提一下,scala的模式匹配真是优雅
启动Driver
注册AM
资源分配
启动 NodeManager
小结
ApplicationMaster
— main
// –class => userClass => SparkPi(WordCount)
— new ApplicationMasterArguments— master = new ApplicationMaster
— master.run()
— 【Client 】runExecutorLauncher
— 【Cluster】runDriver— userClassThread = startUserApplication()
— ClassLoader.loadClass(args.userClass)
getMethod(“main”)— new Thread().start()
— run
— mainMethod.invoke(null, userArgs.toArray)— WordCount
— new SparkContext() // 创建Spark的环境
— 【blocking………………..】// Blocking —————————————–
— val sc = ThreadUtils.awaitResult// 注册
— registerAM// 资源分配
— createAllocator— allocator.allocateResources
— handleAllocatedContainers
— runAllocatedContainers
// bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend
— prepareCommand— resumeDriver // 执行Spark作业
— userClassThread.join()
三、ExecutorBackend
小结
YarnCoarseGrainedExecutorBackend
— main
— CoarseGrainedExecutorBackend.run
— val env = SparkEnv.createExecutorEnv
— env.rpcEnv.setupEndpoint(“Executor”)
四、Task
小总结