har*_*ini 3 merge pyspark databricks delta-lake
与创建的数据帧相比,需要仅更新现有表中发生更改的行。所以现在,我确实减去并获取更改的行,但不确定如何合并到现有表中。
old_df = spark.sql("select * from existing table")
diff = new_df.subtract(old_df)
Run Code Online (Sandbox Code Playgroud)
现在必须插入 diff 数据框(如果是新行)或更新现有记录
(deltaTable.alias("full_df").merge(
merge_df.alias("append_df"),
"full_df.col1 = append_df.col1 OR full_df.col2 =append_df.col2")
.whenNotMatchedInsertAll()
.execute()
)
Run Code Online (Sandbox Code Playgroud)
这不是更新现有记录(情况:col2 值已更改;col1 未更改)
.whenMatchedUpdateAll()接受可用于保留未更改行的条件:
(
deltaTable
.alias("full_df")
.merge(
merge_df.alias("append_df"),
"full_df.col1 = append_df.col1 OR full_df.col2 = append_df.col2")
.whenNotMatchedInsertAll()
.whenMatchedUpdateAll("full_df.col1 != append_df.col1 OR full_df.col2 != append_df.col2")
.execute()
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
9194 次 |
| 最近记录: |