标签: delta-lake

在 databricks 中加载增量表特定分区的最佳实践是什么?

我想知道加载增量表特定分区的最佳方法是什么?选项 2 是否在过滤之前加载所有表?

选项1 :

df = spark.read.format("delta").option('basePath','/mnt/raw/mytable/')\
   .load('/mnt/raw/mytable/ingestdate=20210703')
Run Code Online (Sandbox Code Playgroud)

(这里需要basePath选项吗?)

选项2:

df = spark.read.format("delta").load('/mnt/raw/mytable/')
df = df.filter(col('ingestdate')=='20210703')
Run Code Online (Sandbox Code Playgroud)

提前谢谢了 !

partitioning apache-spark pyspark azure-databricks delta-lake

6
推荐指数
1
解决办法
9306
查看次数

异常:org.apache.spark.sql.delta.ConcurrentAppendException:文件通过并发更新添加到表的根目录

我有一个简单的 Spark 作业,将数据流式传输到 Delta 表。该表非常小并且没有分区。

创建了许多小镶木地板文件。

按照文档(https://docs.delta.io/1.0.0/best-practices.html)中的建议,我添加了每天运行一次的压缩作业。

    val path = "..."
    val numFiles = 16
    
    spark.read
     .format("delta")
     .load(path)
     .repartition(numFiles)
     .write
     .option("dataChange", "false")
     .format("delta")
     .mode("overwrite")
     .save(path)
Run Code Online (Sandbox Code Playgroud)

每次压缩作业运行时,流作业都会出现以下异常:

org.apache.spark.sql.delta.ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.
Run Code Online (Sandbox Code Playgroud)

我尝试将以下配置参数添加到流作业中:

spark.databricks.delta.retryWriteConflict.enabled = true  # would be false by default
spark.databricks.delta.retryWriteConflict.limit = 3  # optionally limit the maximum amout of retries
Run Code Online (Sandbox Code Playgroud)

这没有帮助。

知道如何解决这个问题吗?

parquet spark-streaming databricks delta-lake

6
推荐指数
1
解决办法
1万
查看次数

Koalas/pyspark 找不到数据源:delta

当我尝试在本地使用 koalas.DataFrame.to_delta() 将 Koalas DataFrame 直接写入增量表时,出现以下 Pyspark 异常:
java.lang.ClassNotFoundException: Failed to find data source: delta
编辑:忽略下面,直接调用 Pyspark 也会出现问题。

如果我将 Koalas DataFrame 转换为 Spark DataFrame 然后写入 delta,我似乎没有问题。是否存在 Koalas 不知道但 Pyspark 知道的底层库?看起来很奇怪,因为我认为在幕后使用相同的 Pyspark 模块...我应该注意到 Koalas to_delta() 方法似乎确实在 Databricks 上工作,这表明我的本地设置缺少与 Delta 相关的库。

失败的考拉代码:

kdf = ks.DataFrame({'eid': [1, 2, 3],
                        'contigName': ['chr1', 'chr2', 'chr3'],
                        'phen1': [0.123, 0.456, 0.789],
                        'phen2': [0.987, 0.654, 0.321]})
kdf.to_delta(path='tmp/test.delta', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)

编辑:不让考拉 Spark 到 Delta 代码:

kdf = ks.DataFrame({'eid': [1, 2, 3],
                        'contigName': ['chr1', 'chr2', 'chr3'],
                        'phen1': [0.123, 0.456, 0.789], …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark databricks delta-lake spark-koalas

6
推荐指数
1
解决办法
1万
查看次数

DeltaLake 合并具有空值的列

我正在使用 DeltaLake API 使用下面的代码更新表中的行

DeltaTable.forPath(sparkSession, cleanDataPath)
          .as("target")
          .merge(df.as("source"), "target.desk_mirror_name = source.desk_mirror_name AND target.price = source.price AND target.valuationdate = source.valuationdate AND target.valuationversion = source.valuationversion")
          .whenMatched()
          .updateAll()
          .whenNotMatched()
          .insertAll()
          .execute();
Run Code Online (Sandbox Code Playgroud)

这应该匹配源表和目标表之间的所有列,除了列valuationtag

合并之前,目标表如下所示

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|          Sample|499.97|   2021-06-10|    210611170317|210611170317|
|          Sample|967.93|   2021-06-10|    210611170317|210611170317|
|          Sample| 500.0|   2021-06-10|    210611170317|210611170317|
+----------------+------+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)

源表(应更新目标表)如下

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|          Sample|499.97|   2021-06-10|    210611170317|    OFFICIAL|
|          Sample| 500.0|   2021-06-10|    210611170317|    OFFICIAL|
|          Sample|967.93|   2021-06-10|    210611170317|    OFFICIAL|
+----------------+------+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)

valuationtag更改为OFFICIAL。有了这个,更新后的表是

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+ …
Run Code Online (Sandbox Code Playgroud)

merge apache-spark apache-spark-sql delta-lake

6
推荐指数
1
解决办法
4888
查看次数

Databricks - 覆盖架构

我多次在 Databricks 中更新增量表时遇到问题,第一次覆盖架构失败,但第二次成功。我的问题的解决方案是简单地再次运行它,但此时我无法重现。如果再次发生这种情况,我会回来并发布确切的错误消息,但这本质上是架构不匹配错误。还有其他人遇到过类似的问题吗?

overwriteSchema = True
DF.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", overwriteSchema) \
.partitionBy(datefield) \
.saveAsTable(deltatable)
Run Code Online (Sandbox Code Playgroud)

pyspark databricks azure-databricks delta-lake

6
推荐指数
1
解决办法
3万
查看次数

Trigger.AvailableNow 用于 PySpark (Databricks) 中的 Delta 源流查询

Databricks文档中的所有示例均采用 Scala 语言。无法从 PySpark 找到如何使用此触发器类型。是否有等效的 API 或解决方法?

pyspark databricks spark-structured-streaming delta-lake

6
推荐指数
1
解决办法
1万
查看次数

Databricks DELTA CTAS 与使用 %sql 的 LOCATION

DELTA不具有CREATE TABLE LIKE。它确实有CTAS

我只想复制表的定义LOCATION,但还要指定.

例如,这不起作用:

CREATE TABLE IF NOT EXISTS NEW_CUSTOMER_FEED 
AS SELECT * from NEW_CUSTOMER_FEED WHERE 1 = 0 
LOCATION '/atRest/data'
Run Code Online (Sandbox Code Playgroud)

我缺少什么?

databricks delta-lake databricks-sql

6
推荐指数
1
解决办法
3811
查看次数

三角洲湖上的蜂巢表

我是 Spark 和 Delta Lake 的新手。我正在增量表之上创建配置单元表。我有必要的罐子 delta-core-shaded- assembly_2.11-0.1.0.jar、hive-delta_2.11-0.1.0.jar;在蜂巢类路径中。设置以下属性。

SET hive.input.format=io.delta.hive.HiveInputFormat;
SET hive.tez.input.format=io.delta.hive.HiveInputFormat;
Run Code Online (Sandbox Code Playgroud)

但是在创建表时

CREATE EXTERNAL TABLE hive_table(col1 INT, col2 STRING) STORED BY 'io.delta.hive.DeltaStorageHandler' LOCATION '/path/to/my/delta/table'

得到以下异常:

FAILED:执行错误,从 org.apache.hadoop.hive.ql.exec.DDLTask 返回代码 1。org.apache.spark.network.util.JavaUtils.byteStringAs(Ljava/lang/String;Lorg/apache/spark/network/util/ByteUnit;)J

两个表的模式匹配。堆栈详细信息:Spark:2.4.4 Hive:1.2.1

任何帮助深表感谢。提前致谢。

hive apache-spark delta-lake

5
推荐指数
1
解决办法
3671
查看次数

Delta Lake 表元数据

Delta Lake 在哪里存储表元数据信息。我在我的独立机器上使用 Spark 2.6(不是 Databricks)。我的假设是,如果我重新启动 Spark,在 Delta Lake Spark 中创建的表将被删除(从 Jupyter Notebook 尝试)。但事实并非如此。

delta-lake

5
推荐指数
1
解决办法
8578
查看次数

将 Spark 流作业中创建的增量表迁移到新位置的最佳方法是什么?

我有两个 pyspark 流工作:

  1. streaming_job_a从 kafka 读取,将包含一列中的原始数据和另一列中的时间戳的数据帧写入 s3 中的位置,并使用位置A创建非托管增量表table_aA
  2. streaming_job_b从增量表中读取table_a,将原始数据提取到单独的列中,写入到Bs3 中的位置,并创建非托管增量表table_b

如果我想更改这两个作业使用的位置和表名称,如何以保留数据、不会导致检查点问题并且花费最少时间的方式进行更改?必须保留这两个表,因为其他团队会读取这两个表。理想情况下,最终结果如下所示:

  1. streaming_job_a从 kafka 读取,写入A_news3 中的位置并创建增量表table_a_new
  2. streaming_job_b从 delta table 读取table_a_new,写入B_news3 中的位置,并创建 delta table table_b_new

我知道我可以从旧位置读取并写入新位置,如下所示:

incoming_df = spark.readStream.format("delta").table("table_a")

writer_df = (
    incoming_df
    .writeStream.format("delta")
    .option("checkpointLocation", "A_new/_checkpoints")
    .option("path", "A_new")
    .trigger(once=True)
)

writer_df.start()
Run Code Online (Sandbox Code Playgroud)

然后创建新表:

spark.sql("create table table_a_new using delta location 'A_new'")
Run Code Online (Sandbox Code Playgroud)

然后执行类似的操作,但在这种方法中,我担心在迁移发生时丢失streaming_job_b写入位置的新数据。一般来说,我对 Spark Streaming 还很陌生,所以非常感谢任何建议!Astreaming_job_b

amazon-s3 apache-spark pyspark spark-structured-streaming delta-lake

5
推荐指数
0
解决办法
518
查看次数