Dil*_*eam 8 sql hadoop apache-spark apache-spark-sql databricks
Apache Spark SQL 是否支持类似于 Oracle 的 MERGE SQL 子句的 MERGE 子句?
MERGE into <table> using (
select * from <table1>
when matched then update...
DELETE WHERE...
when not matched then insert...
)
Run Code Online (Sandbox Code Playgroud)
Que*_*tin 14
Spark 确实支持使用 Delta Lake 作为存储格式的 MERGE 操作。首先要做的是使用delta格式保存表格,以提供对事务功能的支持,并支持使用 spark 进行 DELETE/UPDATE/MERGE 操作
蟒蛇/斯卡拉:
df.write.format("delta").save("/data/events")
查询语句: CREATE TABLE events (eventId long, ...) USING delta
表存在后,您可以运行通常的 SQL Merge 命令:
MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
UPDATE SET events.data = updates.data
WHEN NOT MATCHED
THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
Run Code Online (Sandbox Code Playgroud)
该命令也可在 Python/Scala 中使用:
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
Run Code Online (Sandbox Code Playgroud)
要支持 Delta Lake 格式,您还需要将 delta 包作为 Spark 作业中的依赖项:
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_x.xx</artifactId>
<version>xxxx</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
有关更多详细信息,请参阅https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge
| 归档时间: |
|
| 查看次数: |
8290 次 |
| 最近记录: |