我有一个简单的 Spark 作业,将数据流式传输到 Delta 表。该表非常小并且没有分区。
创建了许多小镶木地板文件。
按照文档(https://docs.delta.io/1.0.0/best-practices.html)中的建议,我添加了每天运行一次的压缩作业。
val path = "..."
val numFiles = 16
spark.read
.format("delta")
.load(path)
.repartition(numFiles)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.save(path)
Run Code Online (Sandbox Code Playgroud)
每次压缩作业运行时,流作业都会出现以下异常:
org.apache.spark.sql.delta.ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.
Run Code Online (Sandbox Code Playgroud)
我尝试将以下配置参数添加到流作业中:
spark.databricks.delta.retryWriteConflict.enabled = true # would be false by default
spark.databricks.delta.retryWriteConflict.limit = 3 # optionally limit the maximum amout of retries
Run Code Online (Sandbox Code Playgroud)
这没有帮助。
知道如何解决这个问题吗?