通过Web应用程序启动Spark应用程序的最佳实践?

j9d*_*9dy 27 apache-spark

我想通过Web应用程序向用户公开我的Spark应用程序.

基本上,用户可以决定他想要运行哪个动作并输入一些需要传递给spark应用程序的变量.例如:用户输入几个字段,然后点击一个按钮,用于执行以下操作的"运行sparkApp1与放慢参数MIN_X,MAX_X,MIN_Y,MAX_Y".

应该使用用户给出的参数启动spark应用程序.完成后,可能需要Web应用程序来检索结果(来自hdfs或mongodb)并将其显示给用户.处理时,Web应用程序应显示Spark应用程序的状态.

我的问题:

  • Web应用程序如何启动Spark应用程序?它可能能够从引擎盖下的命令行启动它,但可能有更好的方法来执行此操作.
  • Web应用程序如何访问Spark应用程序的当前状态?从Spark WebUI的REST API获取状态的方式是什么?

我正在运行带有YARN/Mesos(尚不确定)和MongoDB的Spark 1.6.1集群.

T. *_*ęda 40

非常基本的答案:

基本上,您可以使用SparkLauncher类来启动Spark应用程序并添加一些侦听器来监视进度.

不过您可能对Livy server感兴趣,它是Spark作业的RESTful Sever.据我所知,Zeppelin正在使用Livy提交工作并检索状态.

您还可以使用Spark REST接口来检查状态,然后信息将更加精确.这里有一个如何通过REST API提交作业的示例

你有3个选择,答案是 - 自己检查;)这取决于你的项目和要求.两个主要选项:

  • SparkLauncher + Spark REST界面
  • Livy服务器

应该对你有好处,你必须检查在项目中使用哪些更容易和更好

扩展答案

您可以以不同的方式使用应用程序中的Spark,具体取决于您的需求和喜好.

SparkLauncher

SparkLauncher是一个来自spark-launcher工件的类.它用于启动已准备好的Spark作业,就像Spark Submit一样.

典型用法是:

1)使用Spark作业构建项目并将JAR文件复制到所有节点2)从客户端应用程序(即Web应用程序)创建指向准备好的JAR文件的SparkLauncher

SparkAppHandle handle = new SparkLauncher()
    .setSparkHome(SPARK_HOME)
    .setJavaHome(JAVA_HOME)
    .setAppResource(pathToJARFile)
    .setMainClass(MainClassFromJarWithJob)
    .setMaster("MasterAddress
    .startApplication();
    // or: .launch().waitFor()
Run Code Online (Sandbox Code Playgroud)

startApplication创建SparkAppHandle,允许您添加侦听器并停止应用程序.它还提供了可能性getAppId.

SparkLauncher应该与Spark REST API一起使用.您可以查询http://driverNode:4040/api/v1/applications/*ResultFromGetAppId*/jobs,您将获得有关应用程序当前状态的信息.

Spark REST API

还可以通过RESTful API直接提交Spark作业.用法非常类似SparkLauncher,但它以纯RESTful方式完成.

示例请求 - 本文的学分:

curl -X POST http://spark-master-host:6066/v1/submissions/create --header "Content-Type:application/json;charset=UTF-8" --data '{
  "action" : "CreateSubmissionRequest",
  "appArgs" : [ "myAppArgument1" ],
  "appResource" : "hdfs:///filepath/spark-job-1.0.jar",
  "clientSparkVersion" : "1.5.0",
  "environmentVariables" : {
    "SPARK_ENV_LOADED" : "1"
  },
  "mainClass" : "spark.ExampleJobInPreparedJar",
  "sparkProperties" : {
    "spark.jars" : "hdfs:///filepath/spark-job-1.0.jar",
    "spark.driver.supervise" : "false",
    "spark.app.name" : "ExampleJobInPreparedJar",
    "spark.eventLog.enabled": "true",
    "spark.submit.deployMode" : "cluster",
    "spark.master" : "spark://spark-cluster-ip:6066"
  }
}'
Run Code Online (Sandbox Code Playgroud)

此命令将ExampleJobInPreparedJar使用给定的Spark Master 将类中的作业提交到集群.在响应中,您将拥有submissionId字段,这将有助于检查应用程序的状态 - 只需调用另一个服务:curl http://spark-cluster-ip:6066/v1/submissions/status/submissionIdFromResponse.就是这样,仅仅是编码

Livy REST服务器和Spark作业服务器

Livy REST ServerSpark Job Server是RESTful应用程序,允许您通过RESTful Web Service提交作业.这两者与Spark的REST接口之间的一个主要区别是Livy和SJS不需要事先准备好作业并打包到JAR文件.您只是提交将在Spark中执行的代码.

用法很简单.代码来自Livy存储库,但有一些削减以提高可读性

1)案例1:提交作业,放在本地机器上

// creating client
LivyClient client = new LivyClientBuilder()
  .setURI(new URI(livyUrl))
  .build();

try {
  // sending and submitting JAR file
  client.uploadJar(new File(piJar)).get();
  // PiJob is a class that implements Livy's Job
  double pi = client.submit(new PiJob(samples)).get();
} finally {
  client.stop(true);
}
Run Code Online (Sandbox Code Playgroud)

2)案例2:动态创建和执行作业

// example in Python. Data contains code in Scala, that will be executed in Spark
data = {
  'code': textwrap.dedent("""\
    val NUM_SAMPLES = 100000;
    val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
      val x = Math.random();
      val y = Math.random();
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _);
    println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
    """)
}

r = requests.post(statements_url, data=json.dumps(data), headers=headers)
pprint.pprint(r.json()) 
Run Code Online (Sandbox Code Playgroud)

如您所见,可以使用预编译的作业和对Spark的即席查询.

水圈雾

另一个Spark即服务应用程序.Mist非常简单,与Livy和Spark Job Server类似.

用法非常相似

1)创建工作文件:

import io.hydrosphere.mist.MistJob

object MyCoolMistJob extends MistJob {
    def doStuff(parameters: Map[String, Any]): Map[String, Any] = {
        val rdd = context.parallelize()
        ...
        return result.asInstance[Map[String, Any]]
    }
} 
Run Code Online (Sandbox Code Playgroud)

2)将作业文件打包到JAR中3)向Mist发送请求:

curl --header "Content-Type: application/json" -X POST http://mist_http_host:mist_http_port/jobs --data '{"path": "/path_to_jar/mist_examples.jar", "className": "SimpleContext$", "parameters": {"digits": [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]}, "namespace": "foo"}'
Run Code Online (Sandbox Code Playgroud)

我可以在Mist中看到的一个强大功能是,它通过MQTT为流媒体作业提供了开箱即用的支持.

Apache Toree

创建Apache Toree是为了实现Spark的简单交互式分析.它不需要构建任何JAR.它通过IPython协议工作,但不仅支持Python.

目前文档侧重于Jupyter笔记本支持,但也有REST风格的API.

比较和结论

我列出了几个选项:

  1. SparkLauncher
  2. Spark REST API
  3. Livy REST服务器和Spark作业服务器
  4. 水圈雾
  5. Apache Toree

所有这些都适用于不同的用例.我可以区分几个类别:

  1. 需要带有作业的JAR文件的工具:Spark Launcher,Spark REST API
  2. 交互式和预包装工作的工具:Livy,SJS,Mist
  3. 专注于交互式分析的工具:Toree(但是可能会对预先打包的作业提供一些支持;此时不会发布任何文档)

SparkLauncher非常简单,是Spark项目的一部分.您正在使用普通代码编写作业配置,因此它比JSON对象更容易构建.

对于完全RESTful风格的提交,请考虑Spark REST API,Livy,SJS和Mist.其中三个是稳定的项目,有一些生产用例.REST API还要求预先打包作业,而Livy和SJS则不需要.但是请记住,Spark REST API在每个Spark发行版中都是默认的,而Livy/SJS则不是.我对Mist了解不多,但是 - 过了一会儿 - 它应该是整合所有类型的Spark工作的非常好的工具.

Toree专注于互动工作.它仍在孵化中,但即使是现在你也可以检查它的可能性.

当有内置REST API时,为什么要使用自定义的附加REST服务?像Livy这样的SaaS是Spark的一个切入点.它管理Spark上下文,并且只在一个节点上,而不是在群集之外的其他地方.它们还支持交互式分析.Apache Zeppelin使用Livy将用户代码提交给Spark


Max*_*ind 7

这里举一个SparkLauncherT.Gawęda的例子:

SparkAppHandle handle = new SparkLauncher()
    .setSparkHome(SPARK_HOME)
    .setJavaHome(JAVA_HOME)
    .setAppResource(SPARK_JOB_JAR_PATH)
    .setMainClass(SPARK_JOB_MAIN_CLASS)
    .addAppArgs("arg1", "arg2")
    .setMaster("yarn-cluster")
    .setConf("spark.dynamicAllocation.enabled", "true")
    .startApplication();
Run Code Online (Sandbox Code Playgroud)

在这里,您可以找到一个Java Web应用程序的示例,其中Spark作业捆绑在一个项目中.通过SparkLauncher您可以获得SparkAppHandle可用于获取有关工作状态的信息.如果您需要进度状态,可以使用Spark rest-api:

http://driverHost:4040/api/v1/applications/[app-id]/jobs
Run Code Online (Sandbox Code Playgroud)

您需要的唯一依赖SparkLauncher:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-launcher_2.10</artifactId>
    <version>2.0.1</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)