如何比较两个版本的 delta 表以获得类似于 CDC 的更改?

Jac*_*ski 6 delta-lake

如果我想使用 delta time-travel 来比较两个版本以获得类似于 CDC 的更改,该怎么做?

我可以看到两个选项:

  1. 在 SQL 中,您有 EXCEPT/MINUS 查询,您可以将所有数据与另一个表进行比较。我假设您也可以使用它,对吗?但是,如果您比较的版本越来越大并且您总是需要将所有版本与最新版本的所有行进行比较,那么速度是否足够快?

  2. Delta 是否对每行进行某种散列并且可以非常快地完成,或者这对于 delta 来说是否非常耗时?


发现松弛

Jac*_*ski 6

您可以计算表的两个版本的差异,但正如您猜测的那样,这样做的成本很高。当增量表有除追加之外的变化时,计算实际差异也很棘手。

通常当人们问到这个问题时,他们试图设计自己的系统,让他们对从增量到某处的数据进行一次处理;火花流 + Delta 源已经存在来做到这一点

如果你想自己写,你可以直接读取事务日志(协议规范在https://github.com/delta-io/delta/blob/master/PROTOCOL.md)并使用版本中的操作在您计算的两者之间找出哪些文件有更改要读取


请注意,增量表的版本被缓存(由 Spark 保存),因此比较不同的数据集应该相当便宜。

val v0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta/t2")
val v1 = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta/t2")
// v0 and v1 are persisted - see Storage tab in web UI
Run Code Online (Sandbox Code Playgroud)

获得那些 v0 和 v1 并不昂贵;比较两者既昂贵又棘手。如果表是仅附加的,则它是 (v1 - v0);如果它有 upserts 那么你也必须处理 (v0 - v1),如果它有元数据或协议更改,它会变得更加棘手。

当你自己完成所有这些逻辑时,它与重新实现 DeltaSource 非常相似。


然后你可以考虑以下几点:

val log = DeltaLog.forTable(spark, "/tmp/delta/t2")
val v0 = log.getSnapshotAt(0)
val actionsAtV0 = v0.state

val v1 = log.getSnapshotAt(1)
val actionsAtV1 = v1.state
Run Code Online (Sandbox Code Playgroud)

actionsAtV0actionsAtV1是将增量表分别带到版本 0 和 1 的所有操作,可以将其视为增量表的 CDC。

这基本上是读取事务日志,除了使用一些 Delta 的内部 API 使其更容易。