标签: delta-lake

Databricks - 不为空,但它不是 Delta 表

我在 Databricks 上运行查询:

DROP TABLE IF EXISTS dublicates_hotels;
CREATE TABLE IF NOT EXISTS dublicates_hotels
...
Run Code Online (Sandbox Code Playgroud)

我试图理解为什么我收到以下错误:

SQL 语句错误: AnalysisException:无法创建表(' default. dublicates_hotels')。关联位置 ('dbfs:/user/hive/warehouse/dublicates_hotels') 不为空,但它不是 Delta 表

我已经找到了解决它的方法(通过手动删除它):

dbutils.fs.rm('.../dublicates_hotels',recurse=True)
Run Code Online (Sandbox Code Playgroud)

但我不明白为什么它仍然保留桌子?即使我创建了一个新集群(终止了前一个集群),并且我正在附加一个新集群来运行此查询。

任何人都可以帮助我理解这一点吗?

apache-spark-sql databricks delta-lake

14
推荐指数
3
解决办法
3万
查看次数

如何在不使用 Pyspark 的情况下在 Python 中写入增量表/增量格式?

我正在寻找一种在 python 中写回增量表而不使用 pyspark 的方法。我知道有一个名为 deltalake/ delta-lake-reader 的库,可用于读取 delta 表并将其转换为 pandas 数据帧。

目标是写回打开的增量表

输入代码如下所示:

from deltalake import DeltaTable
dt = DeltaTable('path/file')
df = dt.to_pandas()
Run Code Online (Sandbox Code Playgroud)

那么有没有办法让这样的东西从 pandas 数据帧写回增量表:

df = pandadf.to_delta()
DeltaTable.write(df, 'path/file')
Run Code Online (Sandbox Code Playgroud)

谢谢您的帮助!

python dataframe pandas delta-lake

13
推荐指数
1
解决办法
1万
查看次数

LakeFS、Hudi、Delta Lake合并以及合并冲突

我正在阅读有关 LakeFS 的文档,现在还不清楚什么是 LakeFS 的合并甚至合并冲突。

假设我使用 Apache Hudi 对单个表提供 ACID 支持。我想引入多表 ACID 支持,为此我想将 LakeFS 与 Hudi 一起使用。

如果我理解正确的话,lakeFS 是一个与数据无关的解决方案,对数据本身一无所知。LakeFS 仅建立边界(版本控制)并以某种方式调节对数据的并发访问。

所以合理的问题是——如果 LakeFS 与数据无关,它如何支持合并操作?合并本身对 LakeFS 意味着什么?那里有可能发生合并冲突吗?

data-lake delta-lake apache-hudi lakefs data-lakehouse

13
推荐指数
1
解决办法
509
查看次数

Delta Lake 回滚

需要一种优雅的方式将 Delta Lake 回滚到以前的版本。

我目前的方法如下:

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, testFolder)

spark.read.format("delta")
  .option("versionAsOf", 0)
  .load(testFolder)
  .write
  .mode("overwrite")
  .format("delta")
  .save(testFolder)
Run Code Online (Sandbox Code Playgroud)

但这很丑陋,因为需要重写整个数据集。似乎一些元更新就足够了,不需要数据 I/O。有人知道更好的方法吗?

rollback apache-spark databricks delta-lake

11
推荐指数
2
解决办法
4218
查看次数

写入 Delta 表时检测到架构不匹配 - Azure Databricks

我尝试将“small_radio_json.json”加载到 Delta Lake 表。在此代码之后我将创建表。

我尝试创建 Delta 表,但收到错误“写入 Delta 表时检测到架构不匹配”。可能与分区有关 events.write.format("delta").mode("overwrite").partitionBy("artist").save("/delta/events/")

如何修复或修改代码。

    //https://learn.microsoft.com/en-us/azure/azure-databricks/databricks-extract-load-sql-data-warehouse
    //https://learn.microsoft.com/en-us/azure/databricks/_static/notebooks/delta/quickstart-scala.html
    
    //Session configuration
    val appID = "123558b9-3525-4c62-8c48-d3d7e2c16a6a"
    val secret = "123[xEPjpOIBJtBS-W9B9Zsv7h9IF:qw"
    val tenantID = "12344839-0afa-4fae-a34a-326c42112bca"

    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", 
    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appID>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<secret>")
   spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant- 
   id>/oauth2/token")
   spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")

   //Account Information
    val storageAccountName = "mydatalake"
   val fileSystemName = "fileshare1"

    spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + 
    ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", 
    "" + appID + "")
    spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + 
    ".dfs.core.windows.net", "" + secret …
Run Code Online (Sandbox Code Playgroud)

scala azure-databricks delta-lake

11
推荐指数
2
解决办法
4万
查看次数

Databricks Delta Lake 的 MERGE INTO 的 pyspark 等效项是什么?

databricks文档描述了如何对增量表进行合并。

SQL 中的语法

MERGE INTO [db_name.]target_table [AS target_alias]
USING [db_name.]source_table [<time_travel_version>] [AS source_alias]
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]
Run Code Online (Sandbox Code Playgroud)

可以使用。有 python 等效项可用吗?

merge databricks delta-lake

11
推荐指数
1
解决办法
2万
查看次数

检查数据块中的路径上是否存在增量表

在加载之前,我需要从 delta-lake 表中删除某些数据。我可以从增量表中删除数据(如果存在),但当表不存在时会失败。

下面的 Databricks scala 代码

// create delete statement
val del_ID = "Check_ID =" + "123"

// get delta table from path where data exists
val deltaTable = DeltaTable.forPath(spark, path)

// delete data from delta table
deltaTable.delete(del_ID)
Run Code Online (Sandbox Code Playgroud)

仅当该路径上存在增量数据时,上述代码才有效,否则将失败。

有人可以分享一种方法,如果增量数据存在,则执行删除语句,否则删除语句将被忽略?

scala databricks delta-lake

11
推荐指数
2
解决办法
3万
查看次数

S3 Lake Formation 控制表和 Databricks Delta 表之间的主要区别是什么?

S3 Lake Formation 控制表和 Databricks Delta 表之间的主要区别是什么?他们看起来很相似。

amazon-s3 databricks delta-lake aws-lake-formation

11
推荐指数
1
解决办法
3779
查看次数

如何从 Databricks Delta 表中删除一列?

我最近开始发现 Databricks 并面临需要删除增量表的某个列的情况。当我使用 PostgreSQL 时,它就像

ALTER TABLE main.metrics_table 
DROP COLUMN metric_1;
Run Code Online (Sandbox Code Playgroud)

我正在查看有关 DELETE 的Databricks文档,但它仅涵盖DELETE the rows that match a predicate.

我还找到了关于 DROP 数据库、DROP 函数和 DROP 表的文档,但绝对没有关于如何从增量表中删除列的内容。我在这里缺少什么?是否有从增量表中删除列的标准方法?

sql apache-spark apache-spark-sql databricks delta-lake

10
推荐指数
4
解决办法
1万
查看次数

Apache Spark:重新分区、排序和缓存对连接的影响

我正在探索将表加入到自身时 Spark 的行为。我正在使用数据块。

我的虚拟场景是:

  1. 将外部表读取为数据帧 A(底层文件采用 delta 格式)

  2. 将数据框 B 定义为仅选择某些列的数据框 A

  3. 在 column1 和 column2 上连接数据框 A 和 B

(是的,这没有多大意义,我只是在尝试了解 Spark 的底层机制)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))

b = a.select("column1", "column2", "columnA")

c= a.join(b, how="left", on = ["column1", "column2"])
Run Code Online (Sandbox Code Playgroud)

我的第一次尝试是按原样运行代码(尝试 1)。然后我尝试重新分区和缓存(尝试 2)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
Run Code Online (Sandbox Code Playgroud)

最后,我重新分区、排序和缓存

 a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5"))))) …
Run Code Online (Sandbox Code Playgroud)

bigdata apache-spark pyspark azure-databricks delta-lake

10
推荐指数
1
解决办法
499
查看次数