如何覆盖spark中的输出目录

Vij*_*uri 91 apache-spark

我有一个火花流应用程序,它可以生成每分钟的数据集.我需要保存/覆盖已处理数据的结果.

当我试图覆盖数据集org.apache.hadoop.mapred.FileAlreadyExistsException时停止执行.

我设置了Spark属性set("spark.files.overwrite","true"),但没有运气.

如何覆盖或预先删除spark中的文件?

sam*_*est 93

更新:建议使用Dataframes,加上类似的东西... .write.mode(SaveMode.Overwrite) ....

对于旧版本试试

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)
Run Code Online (Sandbox Code Playgroud)

在1.1.0中,您可以使用带有--conf标志的spark-submit脚本设置co​​nf设置.

警告(旧版本):根据@piggybox,Spark中存在一个错误,它只会覆盖编写文件所需的part-文件,任何其他文件都将被删除.

  • 对于`Spark 1.4`:`df.write.mode(SaveMode.Overwrite).parquet(path)` (28认同)
  • 你也可以使用`df.write.mode(mode:String).parquet(path)`where mode:String可以是:"overwrite","append","ignore","error". (6认同)
  • 一个隐藏的问题:与@ pzecevic的解决方案相比,通过HDFS消除整个文件夹,在这种方法中,Spark只会覆盖输出文件夹中具有相同文件名的部分文件.这大部分时间都有效,但是如果还有其他内容,例如来自文件夹中另一个Spark/Hadoop作业的额外部分文件,则不会覆盖这些文件. (3认同)

Cur*_*ycu 28

自从df.save(path, source, mode)被弃用以来,(http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame)

使用df.write.format(source).mode("overwrite").save(path)
df.write是DataFrameWriter的地方

'source'可以是("com.databricks.spark.avro"|"镶木地板"|"json")

  • `source` 也可以是 `csv` (2认同)

pze*_*vic 26

该参数的文档说明spark.files.overwrite:"是否覆盖SparkContext.addFile()目标文件存在时添加的文件及其内容与源文件的内容不匹配." 所以它对saveAsTextFiles方法没有影响.

您可以在保存文件之前执行此操作:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }
Run Code Online (Sandbox Code Playgroud)

Aas在此解释:http: //apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. HTML

  • 怎么样的pyspark? (29认同)

dnl*_*rky 22

pyspark.sql.DataFrame.save文档(当前为1.3.1),您可以指定mode='overwrite'保存DataFrame的时间:

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)

我已经验证这甚至会删除剩余的分区文件.因此,如果您最初说过10个分区/文件,但随后使用仅具有6个分区的DataFrame覆盖了该文件夹,则生成的文件夹将具有6个分区/文件.

有关模式选项的更多信息,请参阅Spark SQL文档.

  • 真实而有用,谢谢,但DataFrame特定的解决方案 - "spark.hadoop.validateOutputSpecs"将适用于所有Spark API. (2认同)

akn*_*akn 6

df.write.mode('overwrite').parquet("/output/folder/path")如果您想使用python覆盖实木复合地板文件,则可以使用。这是火花1.6.2。API在更高版本中可能会有所不同