如何从Eclipse/Intellij IDE运行简单的Spark应用程序?

blu*_*sky 11 java eclipse hadoop scala apache-spark

为了简化我的地图开发减少在实际部署任务到Hadoop之前在Hadoop上运行的任务我使用一个简单的地图缩减器测试我写道:

object mapreduce {
  import scala.collection.JavaConversions._

  val intermediate = new java.util.HashMap[String, java.util.List[Int]]
                                                  //> intermediate  : java.util.HashMap[String,java.util.List[Int]] = {}
  val result = new java.util.ArrayList[Int]       //> result  : java.util.ArrayList[Int] = []

  def emitIntermediate(key: String, value: Int) {
    if (!intermediate.containsKey(key)) {
      intermediate.put(key, new java.util.ArrayList)
    }
    intermediate.get(key).add(value)
  }                                               //> emitIntermediate: (key: String, value: Int)Unit

  def emit(value: Int) {
    println("value is " + value)
    result.add(value)
  }                                               //> emit: (value: Int)Unit

  def execute(data: java.util.List[String], mapper: String => Unit, reducer: (String, java.util.List[Int]) => Unit) {

    for (line <- data) {
      mapper(line)
    }

    for (keyVal <- intermediate) {
      reducer(keyVal._1, intermediate.get(keyVal._1))
    }

    for (item <- result) {
      println(item)
    }
  }                                               //> execute: (data: java.util.List[String], mapper: String => Unit, reducer: (St
                                                  //| ring, java.util.List[Int]) => Unit)Unit

  def mapper(record: String) {
    var jsonAttributes = com.nebhale.jsonpath.JsonPath.read("$", record, classOf[java.util.ArrayList[String]])
    println("jsonAttributes are " + jsonAttributes)
    var key = jsonAttributes.get(0)
    var value = jsonAttributes.get(1)

    println("key is " + key)
    var delims = "[ ]+";
    var words = value.split(delims);
    for (w <- words) {
      emitIntermediate(w, 1)
    }
  }                                               //> mapper: (record: String)Unit

  def reducer(key: String, listOfValues: java.util.List[Int]) = {
    var total = 0
    for (value <- listOfValues) {
      total += value;
    }

    emit(total)
  }                                               //> reducer: (key: String, listOfValues: java.util.List[Int])Unit
  var dataToProcess = new java.util.ArrayList[String]
                                                  //> dataToProcess  : java.util.ArrayList[String] = []
  dataToProcess.add("[\"test1\" , \"test1 here is another test1 test1 \"]")
                                                  //> res0: Boolean = true
  dataToProcess.add("[\"test2\" , \"test2 here is another test2 test1 \"]")
                                                  //> res1: Boolean = true

  execute(dataToProcess, mapper, reducer)         //> jsonAttributes are [test1, test1 here is another test1 test1 ]
                                                  //| key is test1
                                                  //| jsonAttributes are [test2, test2 here is another test2 test1 ]
                                                  //| key is test2
                                                  //| value is 2
                                                  //| value is 2
                                                  //| value is 4
                                                  //| value is 2
                                                  //| value is 2
                                                  //| 2
                                                  //| 2
                                                  //| 4
                                                  //| 2
                                                  //| 2


  for (keyValue <- intermediate) {
      println(keyValue._1 + "->"+keyValue._2.size)//> another->2
                                                  //| is->2
                                                  //| test1->4
                                                  //| here->2
                                                  //| test2->2
   }


}
Run Code Online (Sandbox Code Playgroud)

这允许我在部署到实际的Hadoop集群之前在Windows上的Eclipse IDE中运行mapreduce任务.我想为Spark执行类似的操作,或者能够在部署到Spark集群之前从Eclipse中编写Spark代码进行测试.这可能与Spark有关吗?由于Spark在Hadoop之上运行,这是否意味着我不能在没有安装Hadoop的情况下运行Spark?换句话说,我可以只使用Spark库运行代码吗?:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "$YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val sc = new SparkContext("local", "Simple App", "YOUR_SPARK_HOME",
      List("target/scala-2.10/simple-project_2.10-1.0.jar"))
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}
Run Code Online (Sandbox Code Playgroud)

摘自https://spark.apache.org/docs/0.9.0/quick-start.html#a-standalone-app-in-scala

如果是这样,我需要在项目中包含哪些Spark库?

Klu*_*ßer 2

将以下内容添加到您的 build.sbt libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.1"并确保您scalaVersion已设置(例如。scalaVersion := "2.10.3"

另外,如果您只是在本地运行程序,则可以跳过 SparkContext 的最后两个参数,如下所示val sc = new SparkContext("local", "Simple App")

最后,Spark 可以在 Hadoop 上运行,但也可以以独立模式运行。请参阅:https ://spark.apache.org/docs/0.9.1/spark-standalone.html

  • 这个可以直接用吗?不需要打包一个jar提交到spark-submit吗? (5认同)