从 Pyspark 中的数据帧插入或更新增量表

Tus*_*aar 9 apache-spark pyspark delta-lake

我当前有一个 pyspark 数据框,我最初使用下面的代码创建了一个增量表 -

df.write.format("delta").saveAsTable("events")
Run Code Online (Sandbox Code Playgroud)

现在,由于上面的数据框根据我的要求每天填充数据,因此为了将新记录附加到增量表中,我使用了以下语法 -

df.write.format("delta").mode("append").saveAsTable("events")
Run Code Online (Sandbox Code Playgroud)

现在我在数据块和集群中完成了这一切。我想知道如何在 python 中编写通用 pyspark 代码,如果增量表不存在,则创建增量表,如果增量表存在,则追加记录。我想做这件事,因为如果我将我的 python 包给某人,他们不会在其环境中具有相同的增量表,因此应该从代码动态创建它。

Ale*_*Ott 5

如果您还没有 Delta 表,那么当您使用该append模式时将会创建它。因此,您不需要编写任何特殊代码来处理表尚不存在和退出的情况。

PS 仅当您执行合并到表中而不是追加时,您才需要这样的代码。在这种情况下,代码将如下所示:

if table_exists:
  do_merge
else:
  df.write....
Run Code Online (Sandbox Code Playgroud)

PS 这是该模式的通用实现