如何在不覆盖的情况下将 Spark Streaming 输出写入 HDFS

JSR*_*R29 2 apache-kafka spark-streaming

经过一些处理后,我有一个 DStream[String , ArrayList[String]] ,所以当我使用 saveAsTextFile 将它写入 hdfs 并在每批后覆盖数据时,如何通过附加到以前的结果来写入新结果

output.foreachRDD(r => {
  r.saveAsTextFile(path)
})
Run Code Online (Sandbox Code Playgroud)

编辑 ::如果有人可以帮助我将输出转换为 avro 格式,然后附加到 HDFS

maa*_*asg 5

saveAsTextFile不支持追加。如果使用固定文件名调用,则每次都会覆盖它。我们可以saveAsTextFile(path+timestamp)每次都保存到一个新文件。这是它的基本功能DStream.saveAsTextFiles(path)

支持的一种易于访问的格式append是 Parquet。我们首先将数据 RDD 转换为DataFrameor Dataset,然后我们可以从在该抽象之上提供的写支持中受益。

case class DataStructure(field1,..., fieldn)

... streaming setup, dstream declaration, ...

val structuredOutput = outputDStream.map(record => mapFunctionRecordToDataStructure)
structuredOutput.foreachRDD(rdd => 
  import sparkSession.implicits._
  val df = rdd.toDF()
  df.write.format("parquet").mode("append").save(s"$workDir/$targetFile")

})
Run Code Online (Sandbox Code Playgroud)

请注意,附加到 Parquet 文件会随着时间的推移变得更加昂贵,因此不时旋转目标文件仍然是必需的。