我最近开始发现 Databricks 并面临需要删除增量表的某个列的情况。当我使用 PostgreSQL 时,它就像
ALTER TABLE main.metrics_table
DROP COLUMN metric_1;
Run Code Online (Sandbox Code Playgroud)
我正在查看有关 DELETE 的Databricks文档,但它仅涵盖DELETE the rows that match a predicate.
我还找到了关于 DROP 数据库、DROP 函数和 DROP 表的文档,但绝对没有关于如何从增量表中删除列的内容。我在这里缺少什么?是否有从增量表中删除列的标准方法?
我在 Databricks delta 中有一个表,它由transaction_date. 我想将分区列更改为view_date. 我尝试删除该表,然后使用PARTITIONED BY (view_date).
然而,我的尝试失败了,因为实际文件驻留在 S3 中,即使我删除了一个 hive 表,分区也保持不变。有没有办法更改现有 Delta 表的分区?或者唯一的解决方案是删除实际数据并使用新指示的分区列重新加载它?
有一个函数可以从 Delta 表中删除数据:
deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete(col("date") < "2017-01-01")
Run Code Online (Sandbox Code Playgroud)
但有没有办法以某种方式删除重复项?就像 deltaTable.dropDuplicates()...
我不想将整个表作为数据帧读取,删除重复项,然后再次将其重写到存储中
我当前有一个 pyspark 数据框,我最初使用下面的代码创建了一个增量表 -
df.write.format("delta").saveAsTable("events")
Run Code Online (Sandbox Code Playgroud)
现在,由于上面的数据框根据我的要求每天填充数据,因此为了将新记录附加到增量表中,我使用了以下语法 -
df.write.format("delta").mode("append").saveAsTable("events")
Run Code Online (Sandbox Code Playgroud)
现在我在数据块和集群中完成了这一切。我想知道如何在 python 中编写通用 pyspark 代码,如果增量表不存在,则创建增量表,如果增量表存在,则追加记录。我想做这件事,因为如果我将我的 python 包给某人,他们不会在其环境中具有相同的增量表,因此应该从代码动态创建它。
我有一个表,其主键为多个列,因此我需要对多个列执行合并逻辑
DeltaTable.forPath(spark, "path")
.as("data")
.merge(
finalDf1.as("updates"),
"data.column1 = updates.column1 AND data.column2 = updates.column2 AND data.column3 = updates.column3 AND data.column4 = updates.column4 AND data.column5 = updates.column5")
.whenMatched
.updateAll()
.whenNotMatched
.insertAll()
.execute()
Run Code Online (Sandbox Code Playgroud)
当我检查数据计数时,它没有按预期更新。
有人可以帮我解决这个问题吗?
我知道你可以设置spark.sql.shuffle.partitions和spark.sql.adaptive.advisoryPartitionSizeInBytes。前者不适用于自适应查询执行,而后者由于某种原因仅适用于第一次洗牌,之后它仅使用默认的分区数量,即#cores。
有没有办法配置AQE来调整分区数量,使每个分区不超过100MB?
根据Databricks的文章,可以将 delta Lake 与 AWS Glue 集成。但是,我不确定是否也可以在 Databricks 平台之外进行。有人这样做过吗?另外,是否可以使用 Glue 爬虫添加与 Delta Lake 相关的元数据?
这是 Spark 2.4.4 和 Delta Lake 0.5.0。
我正在尝试使用 delta 数据源创建一个表,但似乎我遗漏了一些东西。尽管该CREATE TABLE USING delta命令工作正常,但表目录既没有创建也没有insertInto工作。
以下CREATE TABLE USING delta工作正常,但insertInto失败了。
scala> sql("""
create table t5
USING delta
LOCATION '/tmp/delta'
""").show
scala> spark.catalog.listTables.where('name === "t5").show
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
| t5| default| null| EXTERNAL| false|
+----+--------+-----------+---------+-----------+
scala> spark.range(5).write.option("mergeSchema", true).insertInto("t5")
org.apache.spark.sql.AnalysisException: `default`.`t5` requires that the data to be inserted have the same number of columns as the target table: target table has 0 column(s) but the inserted data …Run Code Online (Sandbox Code Playgroud) 我正在尝试将 Delta 的数据加载到 pyspark 数据框中。
path_to_data = 's3://mybucket/daily_data/'
df = spark.read.format("delta").load(path_to_data)
Run Code Online (Sandbox Code Playgroud)
现在基础数据按日期分区为
s3://mybucket/daily_data/
dt=2020-06-12
dt=2020-06-13
...
dt=2020-06-22
Run Code Online (Sandbox Code Playgroud)
有没有办法优化 Dataframe 的读取,给出:
目前我尝试的方法是:
s3://mybucket/daily_data/
dt=2020-06-12
dt=2020-06-13
...
dt=2020-06-22
Run Code Online (Sandbox Code Playgroud)
在上述状态下,Spark是否需要加载整个数据,根据日期范围过滤数据,然后过滤所需的列?由于数据已经分区,因此可以在 pyspark read 中进行任何优化来加载数据吗?
线上的一些东西:
df.registerTempTable("my_table")
new_df = spark.sql("select col1,col2 from my_table where dt_col > '2020-06-20' ")
# dt_col is column in dataframe of timestamp dtype.
Run Code Online (Sandbox Code Playgroud) 可以使用 Delta Lake 而不依赖于 Databricks Runtime 吗?(我的意思是,是否可以仅在本地使用 delta-lake 和 HDFS 和 Spark?)如果不能,您能否从技术角度详细说明为什么会这样?
delta-lake ×10
apache-spark ×8
databricks ×5
pyspark ×3
amazon-s3 ×1
aws-glue ×1
duplicates ×1
hdfs ×1
python ×1
sql ×1