Noé*_*che 5 sql performance pyspark databricks delta-lake
我正在将 PySpark 数据帧合并到 Delta 表中。输出增量按 DATE 分区。以下查询需要30 秒才能运行:
query = DeltaTable.forPath(spark, PATH_TO_THE_TABLE).alias(
"actual"
).merge(
spark_df.alias("sdf"),
"actual.DATE >= current_date() - INTERVAL 1 DAYS
AND (actual.feat1 = sdf.feat1)
AND (actual.TIME = sdf.TIME)
AND (actual.feat2 = sdf.feat2) "
,
).whenNotMatchedInsertAll()
Run Code Online (Sandbox Code Playgroud)
将 Delta 的内容复制到另一个位置后,使用代替 时,上述查询速度提高了 60 倍(即在同一集群上需要0.5 秒) 。这是复制增量的命令:NEW_PATHPATH_TO_THE_TABLE
(spark.read.format("delta").load(PATH_TO_THE_TABLE).write.format(
"delta"
)
.mode("overwrite").partitionBy(["DATE"]).save(NEW_PATH))
Run Code Online (Sandbox Code Playgroud)
如何使第一个增量的查询速度与新增量的查询速度一样快?我知道 Delta 有一个版本控制系统,我怀疑这就是它花费如此多时间的原因。我尝试清理 Delta 表(这将查询时间降低到 20 秒),但距离 0.5 秒还很远。
堆:
我看到您进行了比较,actual.DATE >= current_date()因为这是查询中最重要的部分,请尝试定期运行 ZORDER 按 thta 排序增量:
OPTIMIZE actual ZORDER BY (actual.DATE)
Run Code Online (Sandbox Code Playgroud)
您还可以尝试完全真空德尔塔:
VACUUM actual RETAIN 0 HOURS
Run Code Online (Sandbox Code Playgroud)
为此,您需要设置 s park.databricks.delta.retentionDurationCheck.enabled false。
如果您不想要增量的好处(事务、并发写入、时间旅行历史记录等),您可以只使用镶木地板。