如何覆盖在 Spark 中读取 DataFrame 的 parquet 文件

cph*_*sto 9 python metadata apache-spark parquet

这是我面临的问题的缩影,我遇到了错误。让我尝试在这里重现它。

我将 a 保存DataFrame为 a parquet,但是当我重新加载DataFramefromparquet文件并再次将其保存为 时parquet,出现错误。

valuesCol = [('Male','2019-09-06'),('Female','2019-09-06'),('Male','2019-09-07')]
df = spark.createDataFrame(valuesCol,['sex','date'])
# Save as parquet
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

# Load it back
df = spark.read.format('parquet').load('.../temp')
df = df.where(col('sex')=='Male')
# Save it back - This produces ERROR   
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
Run Code Online (Sandbox Code Playgroud)

错误信息-

执行器 22): java.io.FileNotFoundException: 请求的文件 maprfs:///mapr/.../temp/part-00000-f67d5a62-36f2-4dd2-855a-846f422e623f-c000.snappy.parquet 不存在。底层文件可能已更新。您可以通过在 SQL 中运行“REFRESH TABLE tableName”命令或重新创建所涉及的数据集/数据帧来显式使 Spark 中的缓存失效。

另一个 SO问题解决了这个问题。建议的解决方案是像下面的代码一样刷新表格,但这没有帮助。问题在于元数据的刷新。我不知道如何刷新它。

df.createOrReplaceTempView('table_view')
spark.catalog.refreshTable('table_view')
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
Run Code Online (Sandbox Code Playgroud)

此问题的解决方法:解决此问题的一种不太优雅的方法是使用不同的名称保存DataFrameas文件,然后删除原始文件,最后将此文件重命名为旧名称。parquetparquetparquet

# Workaround
import os
import shutil

# Load it back
df = spark.read.format('parquet').load('.../temp')

# Save it back as temp1, as opposed to original temp      
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp1')

# Delete the original parquet file
shutil.rmtree('.../temp')

# Renaming the parquet folder.
os.rename('.../temp1','.../temp')
Run Code Online (Sandbox Code Playgroud)

但是,问题是某些 DataFrame 非常大,这可能不是处理它的最佳方法。更不用说重命名是否会导致元数据出现一些问题,我不确定。

小智 8

此错误的一种解决方案是缓存,对 df 执行操作(例如:)df.show(),然后以“覆盖”模式保存镶木地板文件。

在Python中:

save_mode = "overwrite"
df = spark.read.parquet("path_to_parquet")

....... make your transformation to the df which is new_df

new_df.cache()
new_df.show()

new_df.write.format("parquet")\
                .mode(save_mode)\
                .save("path_to_parquet")
Run Code Online (Sandbox Code Playgroud)


Gel*_*ion 3

当数据从缓存中取出时,它似乎工作正常。

val df = spark.read.format("parquet").load("temp").cache()
Run Code Online (Sandbox Code Playgroud)

cache是一个惰性操作,不会触发任何计算,我们必须添加一些虚拟操作。

println(df.count()) //count over parquet files should be very fast  
Run Code Online (Sandbox Code Playgroud)

现在它应该可以工作:

df.repartition(1).write.mode(SaveMode.Overwrite).parquet("temp")
Run Code Online (Sandbox Code Playgroud)

  • 所以,这就是“.cache()”起作用的原因,因为“df”不是指向原始的“parquet”文件,而是指向磁盘上的位置,对吗? (2认同)