小编Vla*_*mir的帖子

异常:org.apache.spark.sql.delta.ConcurrentAppendException:文件通过并发更新添加到表的根目录

我有一个简单的 Spark 作业,将数据流式传输到 Delta 表。该表非常小并且没有分区。

创建了许多小镶木地板文件。

按照文档(https://docs.delta.io/1.0.0/best-practices.html)中的建议,我添加了每天运行一次的压缩作业。

    val path = "..."
    val numFiles = 16
    
    spark.read
     .format("delta")
     .load(path)
     .repartition(numFiles)
     .write
     .option("dataChange", "false")
     .format("delta")
     .mode("overwrite")
     .save(path)
Run Code Online (Sandbox Code Playgroud)

每次压缩作业运行时,流作业都会出现以下异常:

org.apache.spark.sql.delta.ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.
Run Code Online (Sandbox Code Playgroud)

我尝试将以下配置参数添加到流作业中:

spark.databricks.delta.retryWriteConflict.enabled = true  # would be false by default
spark.databricks.delta.retryWriteConflict.limit = 3  # optionally limit the maximum amout of retries
Run Code Online (Sandbox Code Playgroud)

这没有帮助。

知道如何解决这个问题吗?

parquet spark-streaming databricks delta-lake

6
推荐指数
1
解决办法
1万
查看次数

标签 统计

databricks ×1

delta-lake ×1

parquet ×1

spark-streaming ×1