spark delta 覆盖特定分区

SWD*_*per 4 delta apache-spark

所以我有一个数据框,其中有一列 file_date。对于给定的运行,数据帧只有一个唯一的 file_date 的数据。例如,在一次运行中,假设有大约 100 条记录,文件日期为 2020_01_21。

我正在使用以下内容编写此数据

(df
 .repartition(1)
 .write
 .format("delta")
 .partitionBy("FILE_DATE")
 .mode("overwrite")
 .option("overwriteSchema", "true")
 .option("replaceWhere","FILE_DATE=" + run_for_file_date)
 .mode("overwrite")
 .save("/mnt/starsdetails/starsGrantedDetails/"))
Run Code Online (Sandbox Code Playgroud)

我的要求是为每个 FILE_DATE 创建一个文件夹/分区,因为很有可能重新运行特定 file_date 的数据并且必须覆盖特定 file_date 的数据。不幸的是,在上面的代码中,如果我不放置“replaceWhere”选项,它也只会覆盖其他分区的数据,但是如果我写了上面的内容,数据似乎正确地覆盖了特定分区,但是每次写入完成时,我收到以下错误。

请注意,我还在写入之前设置了以下 spark 配置:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
Run Code Online (Sandbox Code Playgroud)

但我仍然收到以下错误:

AnalysisException: "Data written out does not match replaceWhere 'FILE_DATE=2020-01-19'.\nInvalid data would be written to partitions FILE_DATE=2020-01-20.;"
Run Code Online (Sandbox Code Playgroud)

你能帮忙吗?

Ali*_*san 8

使用replaceWhere覆盖增量分区时需要注意几件事。您的数据帧必须在写入分区之前进行过滤,例如我们有数据帧DF

在此处输入图片说明

当我们将此数据帧写入增量表时,必须过滤数据帧分区范围,这意味着我们应该只有在 replaceWhere 条件范围内的分区列值。

 DF.write.format("delta").mode("overwrite").option("replaceWhere",  "date >= '2020-12-14' AND date <= '2020-12-15' ").save( "Your location")
Run Code Online (Sandbox Code Playgroud)

如果我们使用条件 date < '2020-12-15' 而不是 date <= '2020-12-15' 它会给我们错误:

在此处输入图片说明

另一件事是引用“2020-12-15”中所需的分区列值,否则可能会出错。

还有拉请求开放三角洲覆盖分区spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")这里https://github.com/delta-io/delta/pull/371不知道他们正在计划推出它。