如何向 Delta Lake 表添加新列?

Com*_*ion 3 apache-spark databricks azure-databricks delta-lake

我正在尝试向在 Azure Blob 存储中存储为增量表的数据添加一个新列。对数据执行的大多数操作都是 upserts,有很多更新和很少的新插入。我写数据的代码目前看起来像这样:

DeltaTable.forPath(spark, deltaPath)
      .as("dest_table")
      .merge(myDF.as("source_table"),
             "dest_table.id = source_table.id")
      .whenNotMatched()
      .insertAll()
      .whenMatched(upsertCond)
      .updateExpr(upsertStat)
      .execute()
Run Code Online (Sandbox Code Playgroud)

这些文档来看,Delta Lake 似乎只支持在insertAll()updateAll()调用上添加新列。但是,我仅在满足某些条件并希望将新列添加到所有现有数据(默认值为null)时才进行更新。

我想出了一个看起来非常笨拙的解决方案,我想知道是否有更优雅的方法。这是我目前提出的解决方案:

// Read in existing data
val myData = spark.read.format("delta").load(deltaPath)
// Register table with Hive metastore
myData.write.format("delta").saveAsTable("input_data")

// Add new column
spark.sql("ALTER TABLE input_data ADD COLUMNS (new_col string)")

// Save as DataFrame and overwrite data on disk
val sqlDF = spark.sql("SELECT * FROM input_data")
sqlDF.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(deltaPath)
Run Code Online (Sandbox Code Playgroud)

Joh*_*tud 13

还可以使用 SQL 命令添加新列,如下所示:

ALTER TABLE dbName.TableName ADD COLUMNS (newColumnName dataType)

UPDATE dbName.TableName SET newColumnName = val;
Run Code Online (Sandbox Code Playgroud)


小智 6

首先更改增量表,然后执行合并操作:

from pyspark.sql.functions import lit

spark.read.format("delta").load('/mnt/delta/cov')\
  .withColumn("Recovered", lit(''))\
  .write\
  .format("delta")\
  .mode("overwrite")\
  .option("overwriteSchema", "true")\
  .save('/mnt/delta/cov')
Run Code Online (Sandbox Code Playgroud)

  • 但通过这种方式...我们并没有进行模式进化:-( (4认同)