小编Pra*_*ade的帖子

如何将火花流应用程序的输出写入单个文件

我正在使用火花流从 Kafka 读取数据并传递到 py 文件进行预测。它返回预测以及原始数据。它将原始数据及其预测保存到文件中,但是它为每个 RDD 创建了一个文件。我需要一个由收集到的所有数据组成的单个文件,直到我停止将程序保存到单个文件中。

我试过 writeStream 它甚至不创建单个文件。我尝试使用 append 将它保存到 parquet,但它创建了多个文件,每个 RDD 为 1。我试图用追加模式写多个文件作为输出。下面的代码创建一个文件夹 output.csv 并将所有文件输入其中。

 def main(args: Array[String]): Unit = {
    val ss = SparkSession.builder()
      .appName("consumer")
      .master("local[*]")
      .getOrCreate()

    val scc = new StreamingContext(ss.sparkContext, Seconds(2))


    val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer"-> 
"org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer"> 
"org.apache.kafka.common.serialization.StringDeserializer",
        "group.id"-> "group5" // clients can take
      )
mappedData.foreachRDD(
      x =>
    x.map(y =>       
ss.sparkContext.makeRDD(List(y)).pipe(pyPath).toDF().repartition(1)
.write.format("csv").mode("append").option("truncate","false")
.save("output.csv")
          )
    )
scc.start()
scc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

我只需要获取 1 个文件,其中包含流式传输时收集的所有语句。

任何帮助将不胜感激,谢谢您的期待。

streaming apache-spark spark-streaming apache-spark-sql csv-write-stream

2
推荐指数
1
解决办法
2910
查看次数