cov*_*efe 4 scala apache-spark spark-streaming spark-dataframe
我正在尝试从本地计算机 (OSX) 上的文件夹中流式传输 CSV 文件。我将 SparkSession 和 StreamingContext 一起使用,如下所示:
val sc: SparkContext = createSparkContext(sparkContextName)
val sparkSess = SparkSession.builder().config(sc.getConf).getOrCreate()
val ssc = new StreamingContext(sparkSess.sparkContext, Seconds(time))
val csvSchema = new StructType().add("field_name",StringType)
val inputDF = sparkSess.readStream.format("org.apache.spark.csv").schema(csvSchema).csv("file:///Users/userName/Documents/Notes/MoreNotes/tmpFolder/")
Run Code Online (Sandbox Code Playgroud)
如果我ssc.start()在此之后运行,则会收到此错误:
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
Run Code Online (Sandbox Code Playgroud)
相反,如果我尝试这样开始SparkSession:
inputDF.writeStream.format("console").start()
Run Code Online (Sandbox Code Playgroud)
我得到:
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
Run Code Online (Sandbox Code Playgroud)
显然,我不明白如何SparkSession以及StreamingContext应该一起工作。如果我摆脱了SparkSession,StreamingContext只有textFileStream我需要强加一个 CSV 模式。希望得到有关如何使其工作的任何澄清。
Mun*_*nna 10
您不能同时进行 spark 会话和 spark 上下文。随着 Spark 2.0.0 的发布,开发人员可以使用一个新的抽象——Spark Session——它可以像以前可用的 Spark Context 一样被实例化和调用。
您仍然可以从 spark 会话构建器访问 spark 上下文:
val sparkSess = SparkSession.builder().appName("My App").getOrCreate()
val sc = sparkSess.sparkContext
val ssc = new StreamingContext(sc, Seconds(time))
Run Code Online (Sandbox Code Playgroud)
导致您的工作失败的另一件事是您正在执行转换并且没有调用任何操作。最后应该调用一些动作,例如 inputDF.show()