Vik*_*s J 3 apache-spark databricks spark-structured-streaming delta-lake
我在位置“/mnt/events-bronze”有一个青铜级三角洲湖表(events_bronze),数据从kafka流式传输到该表。现在我希望能够从该表进行流式传输并使用“foreachBatch”更新到银表(events_silver”。这可以使用青铜表作为源来实现。但是,在初始运行期间,由于 events_silver 不存在,我不断收到错误,说 Delta 表不存在,这是显而易见的。那么我该如何创建与 events_bronze 具有相同结构的 events_silver 呢?我找不到 DDL 来执行相同的操作。
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
DeltaTable.forPath(spark, "/mnt/events-silver").as("silver")
.merge(
microBatchOutputDF.as("bronze"),
"silver.id=bronze.id")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
events_bronze
.writeStream
.trigger(Trigger.ProcessingTime("120 seconds"))
.format("delta")
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Run Code Online (Sandbox Code Playgroud)
在初始运行期间,问题是没有为路径“/mnt/events-silver”定义 Delta Lake 表。我不确定如何在第一次运行时创建与“/mnt/events-bronze”具有相同结构的它。
在开始流写入/合并之前,检查表是否已经存在。如果没有,请使用空数据框和模式(events_bronze)创建一个
val exists = DeltaTable.isDeltaTable("/mnt/events-silver")
if (!exists) {
val emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], <schema of events_bronze>)
emptyDF
.write
.format("delta")
.mode(SaveMode.Overwrite)
.save("/mnt/events-silver")
}
Run Code Online (Sandbox Code Playgroud)
表(Delta Lake 元数据)将在开始时仅创建一次,并且如果它不存在的话。如果作业重新启动等,它将出现并跳过表创建
从 Delta Lake 1.0.0 版本开始,添加了DeltaTable.createIfNotExists()方法(不断发展的 API)。
在您的示例中DeltaTable.forPath(spark, "/mnt/events-silver")可以替换为:
DeltaTable.createIfNotExists(spark)
.location("/mnt/events-silver")
.addColumns(microBatchOutputDF.schema)
.execute
Run Code Online (Sandbox Code Playgroud)
您必须小心,不要提供.option("checkpointLocation", "/mnt/events-silver/_checkpoint")checkpointLocation 是 DeltaTable 位置内的子目录的位置。这将导致 _checkpoint 目录在 DeltaTable 之前创建,并且在尝试创建 DeltaTable 时会抛出异常。
| 归档时间: |
|
| 查看次数: |
10670 次 |
| 最近记录: |