Los*_*ssa 9 duplicates apache-spark delta-lake
有一个函数可以从 Delta 表中删除数据:
deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete(col("date") < "2017-01-01")
Run Code Online (Sandbox Code Playgroud)
但有没有办法以某种方式删除重复项?就像 deltaTable.dropDuplicates()...
我不想将整个表作为数据帧读取,删除重复项,然后再次将其重写到存储中
如果您有主键(如 UUID),并且重复项基于特定的列组合(例如 Col1、Col2、Col3),那么您可以使用 ROW_NUMBER() 方法来获取重复项的 UUID 列表要删除的行。附带说明一下,Delta 表目前没有 ROWID 或序列来自动生成主键。
如果您的重复项基于某个复合键(例如,Col2、Col4、Col7),则用于删除重复项的 ROW_NUMBER() 技巧将不起作用:它将删除该行的所有副本。对于这种情况,您可以结合使用 Delta MERGE 和 Delta Time Travel(版本控制)功能来消除 Delta 表中的重复项。以下是消除完全重复项(所有相应字段具有相同值的行)的步骤:
以下是我在 Azure Databricks 上测试的一个 Python 示例,其中的 Delta 表存储在 ADLS 中。
from delta.tables import *
# Step 1
dfTemp = (
spark.sql("SELECT *, ROW_NUMBER() OVER (PARTITION BY Col2, Col4, Col7 ORDER BY Col9 DESC) rn FROM delta.`abfss://adls-account@adls-store.dfs.core.windows.net/my-path/my-delta-table`")
).filter(f.col('rn') > 1).drop('rn').distinct()
# Step 2
deltaTemp = DeltaTable.forPath(spark, "abfss://adls-account@adls-store.dfs.core.windows.net/my-path/my-delta-table")
deltaTemp.alias("main").merge(
dfTemp.alias("nodups"),
"main.Col2 = nodups.Col2 AND main.Col4 = nodups.Col4 AND main.Col7 = nodups.Col7").whenMatchedDelete().execute()
Run Code Online (Sandbox Code Playgroud)
现在,获取我们在上面的步骤 2 中执行合并之前的 Delta 表的版本号。我在 Azure Databricks 笔记本中以 SQL 形式运行此命令:
%sql
-- Step 3
DESCRIBE HISTORY delta.`abfss://adls-account@adls-store.dfs.core.windows.net/my-path/my-delta-table`
Run Code Online (Sandbox Code Playgroud)
假设我当前的 Delta 表版本是 50,之前的版本(包含重复项)是 49。您现在可以执行以下剩余步骤:
# Step 4
dfTemp = (
spark.sql("SELECT *, ROW_NUMBER() OVER (PARTITION BY Col2, Col4, Col7 ORDER BY Col9 DESC) rn FROM delta.`abfss://adls-account@adls-store.dfs.core.windows.net/my-path/my-delta-table` VERSION AS OF 49")
).filter(f.col('rn') > 1).drop('rn').distinct()
# Step 5
deltaTemp = DeltaTable.forPath(spark, "abfss://adls-account@adls-store.dfs.core.windows.net/my-path/my-delta-table")
deltaTemp.alias("main").merge(
dfTemp.alias("nodups"),
"main.Col2 = nodups.Col2 AND main.Col4 = nodups.Col4 AND main.Col7 = nodups.Col7").whenNotMatchedInsertAll().execute()
Run Code Online (Sandbox Code Playgroud)
当您必须处理部分重复项(主键相同,但其他一些相应字段具有不同值的行)时,您可能最终会编写逻辑来“标记”要保留哪一个重复行。逻辑将完全取决于您的特定用例。
是的,您可以直接从增量表中删除重复项。使用合并命令。以下命令将仅保留最新的记录,其余冗余数据将被删除。
MERGE into [deltatable] as target
USING ( select *, ROW_NUMBER() OVER (Partition By [primary keys] Order By [date] desc) as rn from [deltatable]) t1 qualify rn> 1 ) as source
ON [merge primary keys and date column between source and target]
WHEN MATCHED THEN DELETE
Run Code Online (Sandbox Code Playgroud)
还有其他多种方法,但它们很耗时。
例子:
MERGE into delta_table as target
USING ( select *, ROW_NUMBER() OVER (Partition By pk1 Order By date1 desc) as rn from delta_table ) t1 qualify rn> 1 ) as source
ON source.pk1 = target.pk1 and source.date1 = target.date1
WHEN MATCHED THEN DELETE
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
24191 次 |
| 最近记录: |