Delta Lake 创建表,其结构与其他表相同

Vik*_*s J 3 apache-spark databricks spark-structured-streaming delta-lake

我在位置“/mnt/events-bronze”有一个青铜级三角洲湖表(events_bronze),数据从kafka流式传输到该表。现在我希望能够从该表进行流式传输并使用“foreachBatch”更新到银表(events_silver”。这可以使用青铜表作为源来实现。但是,在初始运行期间,由于 events_silver 不存在,我不断收到错误,说 Delta 表不存在,这是显而易见的。那么我该如何创建与 events_bronze 具有相同结构的 events_silver 呢?我找不到 DDL 来执行相同的操作。

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  DeltaTable.forPath(spark, "/mnt/events-silver").as("silver")
    .merge(
      microBatchOutputDF.as("bronze"),
      "silver.id=bronze.id")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}
 events_bronze
      .writeStream
      .trigger(Trigger.ProcessingTime("120 seconds"))
      .format("delta")
      .foreachBatch(upsertToDelta _)
      .outputMode("update")
      .start()
Run Code Online (Sandbox Code Playgroud)

在初始运行期间,问题是没有为路径“/mnt/events-silver”定义 Delta Lake 表。我不确定如何在第一次运行时创建与“/mnt/events-bronze”具有相同结构的它。

Swa*_*ule 7

在开始流写入/合并之前,检查表是否已经存在。如果没有,请使用空数据框和模式(events_bronze)创建一个

  val exists = DeltaTable.isDeltaTable("/mnt/events-silver")

  if (!exists) {
    val emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], <schema of events_bronze>)
    emptyDF
      .write
      .format("delta")
      .mode(SaveMode.Overwrite)
      .save("/mnt/events-silver")
  }
Run Code Online (Sandbox Code Playgroud)

表(Delta Lake 元数据)将在开始时仅创建一次,并且如果它不存在的话。如果作业重新启动等,它将出现并跳过表创建


Kyl*_*man 7

从 Delta Lake 1.0.0 版本开始,添加了DeltaTable.createIfNotExists()方法(不断发展的 API)。

在您的示例中DeltaTable.forPath(spark, "/mnt/events-silver")可以替换为:

DeltaTable.createIfNotExists(spark)
  .location("/mnt/events-silver")
  .addColumns(microBatchOutputDF.schema)
  .execute
Run Code Online (Sandbox Code Playgroud)

您必须小心,不要提供.option("checkpointLocation", "/mnt/events-silver/_checkpoint")checkpointLocation 是 DeltaTable 位置内的子目录的位置。这将导致 _checkpoint 目录在 DeltaTable 之前创建,并且在尝试创建 DeltaTable 时会抛出异常。