Ash*_*iya 6 apache-spark databricks delta-lake
在我们的数据管道中,我们从数据源中提取 CDC 事件,并将这些更改以 AVRO 格式写入“增量数据”文件夹中。
然后,我们定期运行 Spark 作业,将这些“增量数据”与我们当前版本的“快照表”(ORC 格式)合并,以获得最新版本的上游快照。
在此合并逻辑期间:
1)我们将“增量数据”加载为DataFrame df1
2)将当前的“快照表”加载为DataFrame df2
3) 合并 df1 和 df2 去重复 ID 并获取最新版本的行(使用 update_timestamp 列)
此逻辑将“增量数据”和当前“快照表”的全部数据加载到 Spark 内存中,该内存可能非常巨大,具体取决于数据库。
我注意到在 Delta Lake 中,使用以下代码完成类似的操作:
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
Run Code Online (Sandbox Code Playgroud)
在这里,“updatesDF”可以被认为是来自 CDC 源的“增量数据”。
我的问题:
1)合并/更新插入内部如何工作?它是否将整个“updatedDF”和“/data/events/”加载到 Spark 内存中?
2)如果不是,它是否应用类似于 Apache Hudi 的增量更改?
3)在重复数据删除期间,此 upsert 逻辑如何知道获取记录的最新版本?因为我没有看到任何指定“更新时间戳”列的设置?
1) How does merge/upsert internally works? Does it load entire "updatedDF" and
"/data/events/" into Spark memory?
Run Code Online (Sandbox Code Playgroud)
不,Spark 不需要将需要更新的整个 Delta DF 加载到内存中。否则它就无法扩展。它采用的方法与 Spark 所做的其他作业非常相似 - 如果数据集足够大(或者您在云中创建显式分区),则整个表会透明地分为多个分区。然后,每个分区都会分配一个任务来组成您的merge工作。任务可以在不同的 Spark 执行器等上运行。
2) If not, does it apply the delta changes something similar to Apache Hudi ?
Run Code Online (Sandbox Code Playgroud)
我听说过 Apache Hudi,但还没看过。在内部,Delta看起来像版本化的镶木地板文件。对表的更改存储为有序的原子单元,称为提交。当您保存表时 - 查看它有哪些文件 - 它将包含 000000.json、000001.json 等文件,并且每个文件都会引用子目录中底层镶木地板文件的一组操作。例如,000000.json 将表示此版本及时引用 parquet 文件 001 和 002,而 000001.json 将表示此版本及时不应引用这两个较旧的 parquet 文件,而仅使用 parquet 文件 003。
3) During deduplication how this upsert logic knows to take the latest version of a record?
Because I don't see any setting to specify the "update timestamp" column?
Run Code Online (Sandbox Code Playgroud)
默认情况下,它引用最新的变更集。时间戳是 Delta 版本控制实现方式的内部实现。您可以通过语法引用旧快照AS OF- 请参阅
https://docs.databricks.com/delta/delta-batch.html#syntax
| 归档时间: |
|
| 查看次数: |
3351 次 |
| 最近记录: |