如何附加到 HDFS 中的同一文件(spark 2.11)

and*_*ani 4 apache-spark spark-streaming apache-spark-sql

我正在尝试使用 SparkStreaming 将流数据存储到 HDFS 中,但它不断在新文件中创建,而不是附加到一个文件或多个文件中

如果它不断创建n个文件,我觉得效率不会很高

HDFS文件系统 在此输入图像描述

代码

lines.foreachRDD(f => {
  if (!f.isEmpty()) {
    val df = f.toDF().coalesce(1)
    df.write.mode(SaveMode.Append).json("hdfs://localhost:9000/MT9")
  }
 })
Run Code Online (Sandbox Code Playgroud)

在我的 pom 中,我使用各自的依赖项:

  • 火花核心_2.11
  • Spark-SQL_2.11
  • 火花流_2.11
  • 火花流-kafka-0-10_2.11

小智 6

正如您Append在 Spark 中已经意识到的那样,意味着写入现有目录而不是追加到文件。

这是有意且期望的行为(想想如果进程在“追加”过程中失败会发生什么,即使格式和文件系统允许)。

如果有必要,合并文件等操作应由单独的进程应用,以确保正确性和容错性。不幸的是,这需要完整的副本,出于明显的原因,这在批次之间是不需要的。

  • 您可以通过此链接:https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/SaveMode.html 附加模式意味着将 DataFrame 保存到数据源时,如果数据/表已经存在,则 DataFrame 的内容将被附加到现有数据中。 (2认同)