新网创想网站建设,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章主要讲解了“deploy目录下SparkSubmit类的用法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“deploy目录下SparkSubmit类的用法”吧!
公司主营业务:网站设计制作、成都网站制作、移动网站开发等业务。帮助企业客户真正实现互联网宣传,提高企业的竞争能力。创新互联公司是一支青春激扬、勤奋敬业、活力青春激扬、勤奋敬业、活力澎湃、和谐高效的团队。公司秉承以“开放、自由、严谨、自律”为核心的企业文化,感谢他们对我们的高要求,感谢他们从不同领域给我们带来的挑战,让我们激情的团队有机会用头脑与智慧不断的给客户带来惊喜。创新互联公司推出青浦免费做网站回馈大家。
之前说的各种脚本:spark-submit,spark-class也好,还是launcher工程也好,主要工作是准备各种环境、依赖包、JVM参数等运行环境。实际的提交主要还是Spark Code中的deploy下的SparkSubmit类来负责的。
deploy目录下的SparkSubmit类,前面提到过,主要入口方法是runMain。
我们先看看其他方法吧。
1、prepareSubmitEnvironment
这个方法准备提交的环境和参数。
先判断集群管理方式(cluster manager):yarn、meros、k8s,standalone。部署方式(deploy mode ): client还是cluster。
后面要根据这些信息设置不同的Backend和Wapper类等。
提交模式这一段真不好讲,因为它包含了太多种类的部署环境了,个性化较强,要慢慢看了。
cluster方式只看两种:yarn cluster和standalone cluster。把yarn和standalone两个搞懂了,其他的也就很好理解了。
这个方法返回一个四元组:
@return a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
核心代码
if (deployMode == CLIENT) { childMainClass = args.mainClass if (localPrimaryResource != null && isUserJar(localPrimaryResource)) { childClasspath += localPrimaryResource } if (localJars != null) { childClasspath ++= localJars.split(",") } } // Add the main application jar and any added jars to classpath in case YARN client // requires these jars. // This assumes both primaryResource and user jars are local jars, or already downloaded // to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be // added to the classpath of YARN client. if (isYarnCluster) { if (isUserJar(args.primaryResource)) { childClasspath += args.primaryResource } if (args.jars != null) { childClasspath ++= args.jars.split(",") } } if (deployMode == CLIENT) { if (args.childArgs != null) { childArgs ++= args.childArgs } } if (args.isStandaloneCluster) { if (args.useRest) { childMainClass = REST_CLUSTER_SUBMIT_CLASS childArgs += (args.primaryResource, args.mainClass) } else { // In legacy standalone cluster mode, use Client as a wrapper around the user class childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS if (args.supervise) { childArgs += "--supervise" } Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) } Option(args.driverCores).foreach { c => childArgs += ("--cores", c) } childArgs += "launch" childArgs += (args.master, args.primaryResource, args.mainClass) } if (args.childArgs != null) { childArgs ++= args.childArgs } } // In yarn-cluster mode, use yarn.Client as a wrapper around the user class if (isYarnCluster) { childMainClass = YARN_CLUSTER_SUBMIT_CLASS if (args.isPython) { childArgs += ("--primary-py-file", args.primaryResource) childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") } else if (args.isR) { val mainFile = new Path(args.primaryResource).getName childArgs += ("--primary-r-file", mainFile) childArgs += ("--class", "org.apache.spark.deploy.RRunner") } else { if (args.primaryResource != SparkLauncher.NO_RESOURCE) { childArgs += ("--jar", args.primaryResource) } childArgs += ("--class", args.mainClass) } if (args.childArgs != null) { args.childArgs.foreach { arg => childArgs += ("--arg", arg) } } }
上面这段代码非常核心,非常重要。它定义了不同的集群模式不同的部署方式下,应用使用什么类来包装我们的spark程序,好适应不同的集群环境下的提交流程。
我们就多花点时间来分析一下这段代码。
先看看ChildMainClass:
standaloneCluster下:REST_CLUSTER_SUBMIT_CLASS=classOf[RestSubmissionClientApp].getName()
yarnCluster下:YARN_CLUSTER_SUBMIT_CLASS=org.apache.spark.deploy.yarn.YarnClusterApplication
standalone client模式下:STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
2、runMain
上一步获得四元组之后,就是runMain的流程了。
核心代码先上:
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) val loader = getSubmitClassLoader(sparkConf) for (jar <- childClasspath) { addJarToClasspath(jar, loader) } var mainClass: Class[_] = null try { mainClass = Utils.classForName(childMainClass) } catch { } val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication] } else { new JavaMainApplication(mainClass) } try { app.start(childArgs.toArray, sparkConf) } catch { case t: Throwable => throw findCause(t) } }
搞清了prepareSubmitEnvironment的流程,runMain也就很简单了,它就是启动ChildMainClass(是SparkApplication的子类),然后执行start方法。
如果不是cluster模式而是client模式,那么ChildMainClass就是args.mainClass。这点需要注意下,这时候ChildMainClass就会用JavaMainApplication来包装了:
new JavaMainApplication(mainClass);
后面的内容就是看看RestSubmissionClientApp和org.apache.spark.deploy.yarn.YarnClusterApplication的实现逻辑了。
感谢各位的阅读,以上就是“deploy目录下SparkSubmit类的用法”的内容了,经过本文的学习后,相信大家对deploy目录下SparkSubmit类的用法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!