标签: delta-lake

如何从 Databricks Delta 表中删除一列?

我最近开始发现 Databricks 并面临需要删除增量表的某个列的情况。当我使用 PostgreSQL 时,它就像

ALTER TABLE main.metrics_table 
DROP COLUMN metric_1;
Run Code Online (Sandbox Code Playgroud)

我正在查看有关 DELETE 的Databricks文档,但它仅涵盖DELETE the rows that match a predicate.

我还找到了关于 DROP 数据库、DROP 函数和 DROP 表的文档,但绝对没有关于如何从增量表中删除列的内容。我在这里缺少什么?是否有从增量表中删除列的标准方法?

sql apache-spark apache-spark-sql databricks delta-lake

10
推荐指数
4
解决办法
1万
查看次数

Databricks - 如何更改现有 Delta 表的分区?

我在 Databricks delta 中有一个表,它由transaction_date. 我想将分区列更改为view_date. 我尝试删除该表,然后使用PARTITIONED BY (view_date).

然而,我的尝试失败了,因为实际文件驻留在 S3 中,即使我删除了一个 hive 表,分区也保持不变。有没有办法更改现有 Delta 表的分区?或者唯一的解决方案是删除实际数据并使用新指示的分区列重新加载它?

databricks delta-lake

9
推荐指数
2
解决办法
5801
查看次数

如何删除Delta表中的重复项?

有一个函数可以从 Delta 表中删除数据:

deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete(col("date") < "2017-01-01")
Run Code Online (Sandbox Code Playgroud)

但有没有办法以某种方式删除重复项?就像 deltaTable.dropDuplicates()...

我不想将整个表作为数据帧读取,删除重复项,然后再次将其重写到存储中

duplicates apache-spark delta-lake

9
推荐指数
2
解决办法
2万
查看次数

从 Pyspark 中的数据帧插入或更新增量表

我当前有一个 pyspark 数据框,我最初使用下面的代码创建了一个增量表 -

df.write.format("delta").saveAsTable("events")
Run Code Online (Sandbox Code Playgroud)

现在,由于上面的数据框根据我的要求每天填充数据,因此为了将新记录附加到增量表中,我使用了以下语法 -

df.write.format("delta").mode("append").saveAsTable("events")
Run Code Online (Sandbox Code Playgroud)

现在我在数据块和集群中完成了这一切。我想知道如何在 python 中编写通用 pyspark 代码,如果增量表不存在,则创建增量表,如果增量表存在,则追加记录。我想做这件事,因为如果我将我的 python 包给某人,他们不会在其环境中具有相同的增量表,因此应该从代码动态创建它。

apache-spark pyspark delta-lake

9
推荐指数
1
解决办法
4万
查看次数

增量表合并多列

我有一个表,其主键为多个列,因此我需要对多个列执行合并逻辑


DeltaTable.forPath(spark, "path")
  .as("data")
  .merge(
    finalDf1.as("updates"),
    "data.column1 = updates.column1 AND data.column2 = updates.column2 AND data.column3 = updates.column3 AND data.column4 = updates.column4 AND data.column5 = updates.column5")
  .whenMatched
  .updateAll()
  .whenNotMatched
  .insertAll()
  .execute()

Run Code Online (Sandbox Code Playgroud)

当我检查数据计数时,它没有按预期更新。

有人可以帮我解决这个问题吗?

databricks azure-databricks delta-lake

9
推荐指数
1
解决办法
9517
查看次数

如何配置 Spark 在 join 或 groupby 后调整输出分区的数量?

我知道你可以设置spark.sql.shuffle.partitionsspark.sql.adaptive.advisoryPartitionSizeInBytes。前者不适用于自适应查询执行,而后者由于某种原因仅适用于第一次洗牌,之后它仅使用默认的分区数量,即#cores。

有没有办法配置AQE来调整分区数量,使每个分区不超过100MB?

apache-spark apache-spark-sql pyspark databricks delta-lake

9
推荐指数
1
解决办法
1103
查看次数

AWS Glue 可以抓取 Delta Lake 表数据吗?

根据Databricks的文章,可以将 delta Lake 与 AWS Glue 集成。但是,我不确定是否也可以在 Databricks 平台之外进行。有人这样做过吗?另外,是否可以使用 Glue 爬虫添加与 Delta Lake 相关的元数据?

amazon-s3 apache-spark aws-glue delta-lake

8
推荐指数
1
解决办法
2740
查看次数

如何在 Spark 2.4.4 中使用增量创建表?

这是 Spark 2.4.4 和 Delta Lake 0.5.0。

我正在尝试使用 delta 数据源创建一个表,但似乎我遗漏了一些东西。尽管该CREATE TABLE USING delta命令工作正常,但表目录既没有创建也没有insertInto工作。

以下CREATE TABLE USING delta工作正常,但insertInto失败了。

scala> sql("""
create table t5
USING delta
LOCATION '/tmp/delta'
""").show

scala> spark.catalog.listTables.where('name === "t5").show
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
|  t5| default|       null| EXTERNAL|      false|
+----+--------+-----------+---------+-----------+

scala> spark.range(5).write.option("mergeSchema", true).insertInto("t5")
org.apache.spark.sql.AnalysisException: `default`.`t5` requires that the data to be inserted have the same number of columns as the target table: target table has 0 column(s) but the inserted data …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql delta-lake

8
推荐指数
3
解决办法
7099
查看次数

PySpark:使用选定的列或分区优化 Delta 的读取/加载

我正在尝试将 Delta 的数据加载到 pyspark 数据框中。

path_to_data = 's3://mybucket/daily_data/'
df = spark.read.format("delta").load(path_to_data)
Run Code Online (Sandbox Code Playgroud)

现在基础数据按日期分区为

s3://mybucket/daily_data/
    dt=2020-06-12
    dt=2020-06-13
    ...
    dt=2020-06-22
Run Code Online (Sandbox Code Playgroud)

有没有办法优化 Dataframe 的读取,给出:

  1. 只需要特定的日期范围
  2. 只需要列的子集

目前我尝试的方法是:

s3://mybucket/daily_data/
    dt=2020-06-12
    dt=2020-06-13
    ...
    dt=2020-06-22
Run Code Online (Sandbox Code Playgroud)

在上述状态下,Spark是否需要加载整个数据,根据日期范围过滤数据,然后过滤所需的列?由于数据已经分区,因此可以在 pyspark read 中进行任何优化来加载数据吗?

线上的一些东西:

df.registerTempTable("my_table")
new_df = spark.sql("select col1,col2 from my_table where dt_col > '2020-06-20' ")
# dt_col is column in dataframe of timestamp dtype.
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark delta-lake

8
推荐指数
1
解决办法
5万
查看次数

没有 Databricks 运行时的 Delta Lake

可以使用 Delta Lake 而不依赖于 Databricks Runtime 吗?(我的意思是,是否可以仅在本地使用 delta-lake 和 HDFS 和 Spark?)如果不能,您能否从技术角度详细说明为什么会这样?

hdfs apache-spark databricks delta-lake

7
推荐指数
2
解决办法
7026
查看次数