Tus*_*aar 9 apache-spark pyspark delta-lake
我当前有一个 pyspark 数据框,我最初使用下面的代码创建了一个增量表 -
df.write.format("delta").saveAsTable("events")
Run Code Online (Sandbox Code Playgroud)
现在,由于上面的数据框根据我的要求每天填充数据,因此为了将新记录附加到增量表中,我使用了以下语法 -
df.write.format("delta").mode("append").saveAsTable("events")
Run Code Online (Sandbox Code Playgroud)
现在我在数据块和集群中完成了这一切。我想知道如何在 python 中编写通用 pyspark 代码,如果增量表不存在,则创建增量表,如果增量表存在,则追加记录。我想做这件事,因为如果我将我的 python 包给某人,他们不会在其环境中具有相同的增量表,因此应该从代码动态创建它。
如果您还没有 Delta 表,那么当您使用该append模式时将会创建它。因此,您不需要编写任何特殊代码来处理表尚不存在和退出的情况。
PS 仅当您执行合并到表中而不是追加时,您才需要这样的代码。在这种情况下,代码将如下所示:
if table_exists:
do_merge
else:
df.write....
Run Code Online (Sandbox Code Playgroud)
PS 这是该模式的通用实现
| 归档时间: |
|
| 查看次数: |
39487 次 |
| 最近记录: |