仅更新已更改的行 pyspark 增量表 databricks

har*_*ini 3 merge pyspark databricks delta-lake

与创建的数据帧相比,需要仅更新现有表中发生更改的行。所以现在,我确实减去并获取更改的行,但不确定如何合并到现有表中。

old_df = spark.sql("select * from existing table")
diff = new_df.subtract(old_df)
Run Code Online (Sandbox Code Playgroud)

现在必须插入 diff 数据框(如果是新行)或更新现有记录

(deltaTable.alias("full_df").merge(
    merge_df.alias("append_df"),
    "full_df.col1 = append_df.col1 OR full_df.col2 =append_df.col2") 
  .whenNotMatchedInsertAll() 
  .execute()
)
Run Code Online (Sandbox Code Playgroud)

这不是更新现有记录(情况:col2 值已更改;col1 未更改)

mar*_*rat 7

.whenMatchedUpdateAll()接受可用于保留未更改行的条件:

(
  deltaTable
  .alias("full_df")
  .merge(
    merge_df.alias("append_df"),
    "full_df.col1 = append_df.col1 OR full_df.col2 = append_df.col2") 
  .whenNotMatchedInsertAll()
  .whenMatchedUpdateAll("full_df.col1 != append_df.col1 OR full_df.col2 != append_df.col2")
  .execute()
)
Run Code Online (Sandbox Code Playgroud)