我在 Databricks 上运行查询:
DROP TABLE IF EXISTS dublicates_hotels;
CREATE TABLE IF NOT EXISTS dublicates_hotels
...
Run Code Online (Sandbox Code Playgroud)
我试图理解为什么我收到以下错误:
SQL 语句错误: AnalysisException:无法创建表('
default.dublicates_hotels')。关联位置 ('dbfs:/user/hive/warehouse/dublicates_hotels') 不为空,但它不是 Delta 表
我已经找到了解决它的方法(通过手动删除它):
dbutils.fs.rm('.../dublicates_hotels',recurse=True)
Run Code Online (Sandbox Code Playgroud)
但我不明白为什么它仍然保留桌子?即使我创建了一个新集群(终止了前一个集群),并且我正在附加一个新集群来运行此查询。
任何人都可以帮助我理解这一点吗?
我正在寻找一种在 python 中写回增量表而不使用 pyspark 的方法。我知道有一个名为 deltalake/ delta-lake-reader 的库,可用于读取 delta 表并将其转换为 pandas 数据帧。
目标是写回打开的增量表
输入代码如下所示:
from deltalake import DeltaTable
dt = DeltaTable('path/file')
df = dt.to_pandas()
Run Code Online (Sandbox Code Playgroud)
那么有没有办法让这样的东西从 pandas 数据帧写回增量表:
df = pandadf.to_delta()
DeltaTable.write(df, 'path/file')
Run Code Online (Sandbox Code Playgroud)
谢谢您的帮助!
我正在阅读有关 LakeFS 的文档,现在还不清楚什么是 LakeFS 的合并甚至合并冲突。
假设我使用 Apache Hudi 对单个表提供 ACID 支持。我想引入多表 ACID 支持,为此我想将 LakeFS 与 Hudi 一起使用。
如果我理解正确的话,lakeFS 是一个与数据无关的解决方案,对数据本身一无所知。LakeFS 仅建立边界(版本控制)并以某种方式调节对数据的并发访问。
所以合理的问题是——如果 LakeFS 与数据无关,它如何支持合并操作?合并本身对 LakeFS 意味着什么?那里有可能发生合并冲突吗?
需要一种优雅的方式将 Delta Lake 回滚到以前的版本。
我目前的方法如下:
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, testFolder)
spark.read.format("delta")
.option("versionAsOf", 0)
.load(testFolder)
.write
.mode("overwrite")
.format("delta")
.save(testFolder)
Run Code Online (Sandbox Code Playgroud)
但这很丑陋,因为需要重写整个数据集。似乎一些元更新就足够了,不需要数据 I/O。有人知道更好的方法吗?
我尝试将“small_radio_json.json”加载到 Delta Lake 表。在此代码之后我将创建表。
我尝试创建 Delta 表,但收到错误“写入 Delta 表时检测到架构不匹配”。可能与分区有关 events.write.format("delta").mode("overwrite").partitionBy("artist").save("/delta/events/")
如何修复或修改代码。
//https://learn.microsoft.com/en-us/azure/azure-databricks/databricks-extract-load-sql-data-warehouse
//https://learn.microsoft.com/en-us/azure/databricks/_static/notebooks/delta/quickstart-scala.html
//Session configuration
val appID = "123558b9-3525-4c62-8c48-d3d7e2c16a6a"
val secret = "123[xEPjpOIBJtBS-W9B9Zsv7h9IF:qw"
val tenantID = "12344839-0afa-4fae-a34a-326c42112bca"
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<appID>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant-
id>/oauth2/token")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
//Account Information
val storageAccountName = "mydatalake"
val fileSystemName = "fileshare1"
spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName +
".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net",
"" + appID + "")
spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName +
".dfs.core.windows.net", "" + secret …Run Code Online (Sandbox Code Playgroud) databricks文档描述了如何对增量表进行合并。
SQL 中的语法
MERGE INTO [db_name.]target_table [AS target_alias]
USING [db_name.]source_table [<time_travel_version>] [AS source_alias]
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]
Run Code Online (Sandbox Code Playgroud)
可以使用。有 python 等效项可用吗?
在加载之前,我需要从 delta-lake 表中删除某些数据。我可以从增量表中删除数据(如果存在),但当表不存在时会失败。
下面的 Databricks scala 代码
// create delete statement
val del_ID = "Check_ID =" + "123"
// get delta table from path where data exists
val deltaTable = DeltaTable.forPath(spark, path)
// delete data from delta table
deltaTable.delete(del_ID)
Run Code Online (Sandbox Code Playgroud)
仅当该路径上存在增量数据时,上述代码才有效,否则将失败。
有人可以分享一种方法,如果增量数据存在,则执行删除语句,否则删除语句将被忽略?
S3 Lake Formation 控制表和 Databricks Delta 表之间的主要区别是什么?他们看起来很相似。
我最近开始发现 Databricks 并面临需要删除增量表的某个列的情况。当我使用 PostgreSQL 时,它就像
ALTER TABLE main.metrics_table
DROP COLUMN metric_1;
Run Code Online (Sandbox Code Playgroud)
我正在查看有关 DELETE 的Databricks文档,但它仅涵盖DELETE the rows that match a predicate.
我还找到了关于 DROP 数据库、DROP 函数和 DROP 表的文档,但绝对没有关于如何从增量表中删除列的内容。我在这里缺少什么?是否有从增量表中删除列的标准方法?
我正在探索将表加入到自身时 Spark 的行为。我正在使用数据块。
我的虚拟场景是:
将外部表读取为数据帧 A(底层文件采用 delta 格式)
将数据框 B 定义为仅选择某些列的数据框 A
在 column1 和 column2 上连接数据框 A 和 B
(是的,这没有多大意义,我只是在尝试了解 Spark 的底层机制)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
Run Code Online (Sandbox Code Playgroud)
我的第一次尝试是按原样运行代码(尝试 1)。然后我尝试重新分区和缓存(尝试 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
Run Code Online (Sandbox Code Playgroud)
最后,我重新分区、排序和缓存
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5"))))) …Run Code Online (Sandbox Code Playgroud) delta-lake ×10
databricks ×6
apache-spark ×3
scala ×2
amazon-s3 ×1
apache-hudi ×1
bigdata ×1
data-lake ×1
dataframe ×1
lakefs ×1
merge ×1
pandas ×1
pyspark ×1
python ×1
rollback ×1
sql ×1