Sha*_*ath 1 scala hdfs dataframe apache-spark
我正在尝试从 hdfs 位置读取 parquet 文件,进行一些转换并覆盖同一位置中的文件。我必须覆盖同一位置的文件,因为我必须多次运行相同的代码。
这是我写的代码
val df = spark.read.option("header", "true").option("inferSchema", "true").parquet("hdfs://master:8020/persist/local/")
//after applying some transformations lets say the final dataframe is transDF which I want to overwrite at the same location.
transDF.write.mode("overwrite").parquet("hdfs://master:8020/persist/local/")
Run Code Online (Sandbox Code Playgroud)
现在的问题是在从给定位置读取镶木地板文件之前,由于某种原因,spark 我相信它会因为覆盖模式而删除给定位置的文件。因此,在执行代码时,我收到以下错误。
File does not exist: hdfs://master:8020/persist/local/part-00000-e73c4dfd-d008-4007-8274-d445bdea3fc8-c000.snappy.parquet
Run Code Online (Sandbox Code Playgroud)
关于如何解决这个问题有什么建议吗?谢谢。
简单的答案是你不能覆盖你正在阅读的内容。这背后的原因是覆盖需要删除所有内容,但是,由于 Spark 是并行工作的,因此某些部分当时可能仍在读取。此外,即使所有内容都已读取,spark 也需要原始文件来重新计算失败的任务。
由于您需要多次迭代的输入,因此我只需将输入和输出的名称作为执行一次迭代的函数的参数,并仅在写入成功后删除前一次迭代。