UPSERT 拼花 Pyspark

Ita*_*itt 4 etl amazon-s3 parquet pyspark

我在 s3 中有带有以下分区的镶木地板文件:年/月/日期/some_id 使用 Spark (PySpark),我想在过去14 天每天都使用 UPSERT - 我想替换 s3 中的现有数据(一个每个分区的镶木地板文件),但不删除 14 天之前的天数。我尝试了两种保存模式: 追加- 不好,因为它只是添加了另一个文件。 覆盖- 删除过去的数据和其他分区的数据。

有什么方法或最佳实践可以克服这个问题吗?我应该在每次运行中读取 s3 中的所有数据,然后再写回吗?也许重命名文件以便append将替换 s3 中的当前文件?

非常感谢!

vil*_*oro 5

我通常会做类似的事情。就我而言,我执行 ETL 并将一天的数据附加到镶木地板文件中:

关键是处理要写入的数据(在我的情况下是实际日期),确保按date列进行分区并覆盖当前日期的所有数据。

这将保留所有旧数据。举个例子:

(
    sdf
    .write
    .format("parquet")
    .mode("overwrite")
    .partitionBy("date")
    .option("replaceWhere", "2020-01-27")
    .save(uri)
)
Run Code Online (Sandbox Code Playgroud)

您还可以查看delta.io,它是parquet格式的扩展,它提供了一些有趣的功能,例如ACID事务。