sam*_*mba 3 scala apache-spark databricks delta-lake
我想更改 Databricks Delta 表的列名。
所以我做了以下事情:
// Read old table data
val old_data_DF = spark.read.format("delta")
.load("dbfs:/mnt/main/sales")
// Created a new DF with a renamed column
val new_data_DF = old_data_DF
.withColumnRenamed("column_a", "metric1")
.select("*")
// Dropped and recereated the Delta files location
dbutils.fs.rm("dbfs:/mnt/main/sales", true)
dbutils.fs.mkdirs("dbfs:/mnt/main/sales")
// Trying to write the new DF to the location
new_data_DF.write
.format("delta")
.partitionBy("sale_date_partition")
.save("dbfs:/mnt/main/sales")
Run Code Online (Sandbox Code Playgroud)
在这里,我在写入 Delta 时的最后一步出现错误:
java.io.FileNotFoundException: dbfs:/mnt/main/sales/sale_date_partition=2019-04-29/part-00000-769.c000.snappy.parquet
A file referenced in the transaction log cannot be found. This occurs when data has been manually deleted from the file system rather than using the table `DELETE` statement
Run Code Online (Sandbox Code Playgroud)
显然数据被删除了,很可能我在上面的逻辑中遗漏了一些东西。现在唯一包含数据的地方是new_data_DF. 写入一个位置dbfs:/mnt/main/sales_tmp也失败
我应该怎么做才能将数据从new_data_DFDelta 位置写入?
您可以通过以下方式做到这一点。
// Read old table data
val old_data_DF = spark.read.format("delta")
.load("dbfs:/mnt/main/sales")
// Created a new DF with a renamed column
val new_data_DF = old_data_DF
.withColumnRenamed("column_a", "metric1")
.select("*")
// Trying to write the new DF to the location
new_data_DF.write
.format("delta")
.mode("overwrite") // this would overwrite the whole data files
.option("overwriteSchema", "true") //this is the key line.
.partitionBy("sale_date_partition")
.save("dbfs:/mnt/main/sales")
Run Code Online (Sandbox Code Playgroud)
OverWriteSchema 选项将使用我们在转换期间更新的最新架构创建新的物理文件。
通常,避免rm在 Delta 表上使用是一个好主意。Delta 的事务日志在大多数情况下可以防止最终的一致性问题,但是,当您在很短的时间内删除和重新创建表时,不同版本的事务日志可能会出现和消失。
相反,我建议使用 Delta 提供的事务原语。例如,要覆盖表中的数据,您可以:
df.write.format("delta").mode("overwrite").save("/delta/events")
Run Code Online (Sandbox Code Playgroud)
如果您有一个已经损坏的表,您可以使用FSCK修复它。
| 归档时间: |
|
| 查看次数: |
12573 次 |
| 最近记录: |