如何删除Delta表中的重复项?

Los*_*ssa 9 duplicates apache-spark delta-lake

有一个函数可以从 Delta 表中删除数据:

deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete(col("date") < "2017-01-01")
Run Code Online (Sandbox Code Playgroud)

但有没有办法以某种方式删除重复项?就像 deltaTable.dropDuplicates()...

我不想将整个表作为数据帧读取,删除重复项,然后再次将其重写到存储中

Ole*_*egK 8

如果您有主键(如 UUID),并且重复项基于特定的列组合(例如 Col1、Col2、Col3),那么您可以使用 ROW_NUMBER() 方法来获取重复项的 UUID 列表要删除的行。附带说明一下,Delta 表目前没有 ROWID 或序列来自动生成主键。

如果您的重复项基于某个复合键(例如,Col2、Col4、Col7),则用于删除重复项的 ROW_NUMBER() 技巧将不起作用:它将删除该行的所有副本。对于这种情况,您可以结合使用 Delta MERGE 和 Delta Time Travel(版本控制)功能来消除 Delta 表中的重复项。以下是消除完全重复项(所有相应字段具有相同值的行)的步骤:

  1. 获取一个数据框,其中包含在 Delta 表中具有重复项的不同行。ROW_NUMBER() 函数将在这里为您提供帮助。
  2. 使用 MERGE 操作和 WHEN MATCHED DELETE 删除这些行。请注意,如果同一行有 4 个副本,它将删除所有 4 个副本。我们将在第 5 步中重新添加非重复项。
  3. 使用DESCRIBE HISTORY命令获取当前Delta表之前的版本号。该版本仍然具有所有重复项,而当前版本则没有。
  4. 重复步骤 1,但这次使用 VERSION AS OF 选项来获取包含我们在步骤 2 中删除的不同行的数据帧。
  5. 使用 MEREGE 操作和 WHEN NOT MATCHED INSERT ALL 添加过去具有重复项的不同行。

以下是我在 Azure Databricks 上测试的一个 Python 示例,其中的 Delta 表存储在 ADLS 中。

from delta.tables import *

# Step 1
dfTemp = (
  spark.sql("SELECT *, ROW_NUMBER() OVER (PARTITION BY Col2, Col4, Col7 ORDER BY Col9 DESC) rn FROM delta.`abfss://adls-account@adls-store.dfs.core.windows.net/my-path/my-delta-table`")
           ).filter(f.col('rn') > 1).drop('rn').distinct()

# Step 2
deltaTemp = DeltaTable.forPath(spark, "abfss://adls-account@adls-store.dfs.core.windows.net/my-path/my-delta-table")

deltaTemp.alias("main").merge(
    dfTemp.alias("nodups"),
    "main.Col2 = nodups.Col2 AND main.Col4 = nodups.Col4 AND main.Col7 = nodups.Col7").whenMatchedDelete().execute()
Run Code Online (Sandbox Code Playgroud)

现在,获取我们在上面的步骤 2 中执行合并之前的 Delta 表的版本号。我在 Azure Databricks 笔记本中以 SQL 形式运行此命令:

%sql

-- Step 3
DESCRIBE HISTORY delta.`abfss://adls-account@adls-store.dfs.core.windows.net/my-path/my-delta-table`
Run Code Online (Sandbox Code Playgroud)

假设我当前的 Delta 表版本是 50,之前的版本(包含重复项)是 49。您现在可以执行以下剩余步骤:

# Step 4
dfTemp = (
  spark.sql("SELECT *, ROW_NUMBER() OVER (PARTITION BY Col2, Col4, Col7 ORDER BY Col9 DESC) rn FROM delta.`abfss://adls-account@adls-store.dfs.core.windows.net/my-path/my-delta-table` VERSION AS OF 49")
           ).filter(f.col('rn') > 1).drop('rn').distinct()

# Step 5
deltaTemp = DeltaTable.forPath(spark, "abfss://adls-account@adls-store.dfs.core.windows.net/my-path/my-delta-table")

deltaTemp.alias("main").merge(
    dfTemp.alias("nodups"),
    "main.Col2 = nodups.Col2 AND main.Col4 = nodups.Col4 AND main.Col7 = nodups.Col7").whenNotMatchedInsertAll().execute()
Run Code Online (Sandbox Code Playgroud)

当您必须处理部分重复项(主键相同,但其他一些相应字段具有不同值的行)时,您可能最终会编写逻辑来“标记”要保留哪一个重复行。逻辑将完全取决于您的特定用例。


vee*_*dar 5

是的,您可以直接从增量表中删除重复项。使用合并命令。以下命令将仅保留最新的记录,其余冗余数据将被删除。

MERGE into [deltatable] as target
USING ( select *, ROW_NUMBER() OVER (Partition By [primary keys] Order By [date] desc) as rn  from [deltatable]) t1 qualify rn> 1 ) as source
ON [merge primary keys and date column between source and target]
WHEN MATCHED THEN DELETE
Run Code Online (Sandbox Code Playgroud)

还有其他多种方法,但它们很耗时。

例子:

MERGE into delta_table as target
USING ( select *, ROW_NUMBER() OVER (Partition By pk1 Order By date1 desc) as rn  from delta_table ) t1 qualify rn> 1 ) as source
ON source.pk1 = target.pk1 and source.date1 = target.date1
WHEN MATCHED THEN DELETE
Run Code Online (Sandbox Code Playgroud)