How to merge a spark dataframe with hive table on Databricks Deltalake?

Met*_*ata 2 apache-spark databricks delta-lake

I have a dataframe as below:

val data = Seq(("James", "Sales", 34, "Developer"), ("Michael", "Sales", 56, "Architect"), ("Robert", "Sales", 30, "Manager"), ("Maria", "Finance", 24, "Consultant"))
val df1 = data.toDF("name","dept","id", "role")
df1.printSchema()
root
|-- name: string (nullable = true)
|-- dept: string (nullable = true)
|-- id: integer (nullable = true)
|-- role: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

I have a hive table with same columns and exact schema:

val df2 = spark.sql("select * from db.table")
Run Code Online (Sandbox Code Playgroud)

From the incoming dataframe df1 I got two new records and 2 updated records.

val df2 = spark.sql("select * from db.table where name in ('James', 'Michael')")
df2.show()
+-------+-------+---+----------+
|   name|   dept| id|      role|
+-------+-------+---+----------+
|  James|  Sales| 34| Associate|
|Michael|  Sales| 56|    Junior|
+-------+-------+---+----------+
Run Code Online (Sandbox Code Playgroud)

The keys in use here are: dept & id

In one of my previous projects, we used to join the incoming dataframe with the partition of our Hive table in our staging table and simply run exchange partition in order to swap the existing hive partition with our staging table which contains merged data.

We are using Databricks distribution of Spark. Our hive table is built on Databricks delta lake & has millions of rows. Is there any other way I can merge my incoming dataframe df1 with the my hive table ? If so how can I achieve it without performance hit.

Ale*_*Ott 5

正如 Tim 提到的,如果您的目标表已经在 Delta 上,那么您只需要使用MERGE INTOSQL 命令或相应的 Scala API(请参阅Delta Merge 上的文档)。你需要这样的东西:

import io.delta.tables._
import org.apache.spark.sql.functions._

DeltaTable.forName(spark, "db.table")
  .as("target")
  .merge(
    df1.as("updates"),
    "target.dept = updates.dept and target.id = updates.id")
  .whenMatched
  .updateAll()
  .whenNotMatched
  .insertAll()
  .execute()
Run Code Online (Sandbox Code Playgroud)

不匹配的数据将按原样插入,匹配的数据将放入包含原始记录的重写文件中。通常,重写是主要的性能影响,并且您可能需要减小文件大小以重写更少的数据(请参阅文档) - 在较新的版本中,可以这样配置表,以便 Databricks Spark 引擎将自动找到最佳文件大小以减少重写时间而不影响读取模式(请参阅文档