如何在scala的spark输出文件中添加partitionBy列名作为前缀

5 scala multipleoutputs apache-spark hadoop2 spark-dataframe

我对这个问题做了很多研究,但没有找到令人满意的答案。我必须重命名来自 spark 的输出文件。

目前我在 S3 中输出我的 spark 数据帧,然后我再次读取它,然后再次重命名和复制。问题是我的 spark 作业需要 16 分钟才能完成,但从 S3 读取然后在 S3 中重新命名和写入又需要 15 分钟。

有什么办法可以重命名我的输出文件..我没问题 part-00000

这就是我保存数据框的方式

dfMainOutputFinalWithoutNull.repartition(50).write.partitionBy("DataPartition", "PartitionYear")
      .format("csv")
      .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
      .option("nullValue", "")
      .option("delimiter", "\t")
      .option("quote", "\u0000")
      .option("header", "true")
      .option("codec", "bzip2")
      .save(outputFileURL)
Run Code Online (Sandbox Code Playgroud)

在这种情况下,任何想法如何使用 hadoop 文件格式?

目前我正在这样做,如下所示

val finalFileName = finalPrefix + DataPartitionName + "." + YearPartition + "." + intFileCounter + "." + fileVersion + currentTime + fileExtention
      val dest = new Path(mainFileURL + "/" + finalFileName)
      fs.rename(urlStatus.getPath, dest)
Run Code Online (Sandbox Code Playgroud)

问题是我有 50GB 的输出数据,它创建了非常多的文件,重命名这么多文件需要很长时间。

成本明智也很昂贵,因为我的 EMR 运行时间更长,再次复制数据需要额外费用。