如何在Spark Streaming中将RDD转换为DataFrame,而不仅仅是Spark

Lob*_*rrr 5 scala apache-spark spark-streaming rdd

我怎样才能转换RDDDataFramein Spark Streaming,而不仅仅是Spark

我看到了这个例子,但它需要SparkContext.

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
rdd.toDF()
Run Code Online (Sandbox Code Playgroud)

就我而言,我有StreamingContext.我应该SparkContext在里面创造foreach吗?它看起来太疯狂......那么,如何处理这个问题呢?我的最终目标(如果它可能有用)是使用保存DataFrame在Amazon S3中rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json");,RDD如果不将其转换为DataFrame(我知道),这是不可能的.

myDstream.foreachRDD { rdd =>
    val conf = new SparkConf().setMaster("local").setAppName("My App")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._
    rdd.toDF()
}
Run Code Online (Sandbox Code Playgroud)

Sha*_*kar 2

sqlContext在外部创建foreachRDD,一旦使用 转换rdd为DF sqlContext,就可以写入S3。

例如:

val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
myDstream.foreachRDD { rdd =>

    val df = rdd.toDF()
    df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")
}
Run Code Online (Sandbox Code Playgroud)

更新:

sqlContext甚至您可以在内部创建foreachRDD将在驱动程序上执行的内容。

  • 尝试 `val ssc = new StreamingContext(sc, Seconds(refreshingIntervalSeconds.toInt))` (2认同)