标签: delta-live-tables

Databricks Delta Live 表:流式和增量式之间的差异

CREATE STREAMING LIVE TABLE和之间有区别吗CREATE INCREMENTAL LIVE TABLE?文档是混合的:例如,在这里STREAMING使用,而这里使用。我已经测试了两者,到目前为止我还没有注意到任何差异。INCREMENTAL

databricks delta-live-tables

10
推荐指数
1
解决办法
6663
查看次数

DataBricks:在 Python 中将 CSV 数据提取到 Delta Live 表会触发“表名称中的无效字符”错误 - 如何设置列映射模式?

首先,我可以说我在写这篇文章时正在学习 DataBricks,所以我想要更简单、更粗糙的解决方案以及更复杂的解决方案。

我正在读取这样的 CSV 文件:

df1 = spark.read.format("csv").option("header", True).load(path_to_csv_file)
Run Code Online (Sandbox Code Playgroud)

然后我将其保存为 Delta Live Table,如下所示:

df1.write.format("delta").save("table_path")
Run Code Online (Sandbox Code Playgroud)

CSV 标题中包含空格和&等字符/,我收到错误:

AnalysisException:在架构的列名称中的“,;{}()\n\t=”中发现无效字符。请通过将表属性“delta.columnMapping.mode”设置为“name”来启用列映射。有关更多详细信息,请参阅https://docs.databricks.com/delta/delta-column-mapping.html 或者您可以使用别名对其进行重命名。

我在该问题上看到的文档解释了如何在使用 创建表后将列映射模式设置为“名称” ALTER TABLE,但没有解释如何在创建时设置它,特别是在使用上面的 DataFrame API 时。有没有办法做到这一点?

有没有更好的方法将 CSV 放入新表中?


更新:

阅读此处此处的文档,并受到罗伯特回答的启发,我首先尝试了以下操作:

spark.conf.set("spark.databricks.delta.defaults.columnMapping.mode", "name")
Run Code Online (Sandbox Code Playgroud)

仍然没有运气,我遇到了同样的错误。有趣的是,对于初学者来说,将标题中包含空格的 CSV 文件写入 Delta Live Table 是多么困难

pyspark databricks delta-live-tables

8
推荐指数
1
解决办法
8082
查看次数

Databricks Delta Live 表 - 应用增量表中的更改

我正在使用 Databricks Delta Live Tables,但在向上游插入某些表时遇到一些问题。我知道下面的文字很长,但我试图尽可能清楚地描述我的问题。如果某些部分不清楚,请告诉我。

我有以下表格和流程:

Landing_zone -> 这是一个添加 JSON 文件的文件夹,其中包含插入或更新的记录的数据。Raw_table -> 这是 JSON 文件中的数据,但采用表格格式。该表采用增量格式。除了将 JSON 结构转换为表格结构(我进行了爆炸,然后从 JSON 键创建列)之外,未进行任何转换。Intermediate_table -> 这是 raw_table,但有一些额外的列(取决于其他列值)。

为了从我的着陆区转到原始表,我有以下 Pyspark 代码:

cloudfile = {"cloudFiles.format":"JSON", 
                       "cloudFiles.schemaLocation": sourceschemalocation, 
                       "cloudFiles.inferColumnTypes": True}

@dlt.view('landing_view')
def inc_view():
    df = (spark
             .readStream
             .format('cloudFiles')
             .options(**cloudFilesOptions)
             .load(filpath_to_landing)
     <Some transformations to go from JSON to tabular (explode, ...)>
     return df

dlt.create_target_table('raw_table', 
                        table_properties = {'delta.enableChangeDataFeed': 'true'})
  
dlt.apply_changes(target='raw_table',
                  source='landing_view',
                  keys=['id'],
                  sequence_by='updated_at')
Run Code Online (Sandbox Code Playgroud)

这段代码按预期工作。我运行它,将一个changes.JSON文件添加到登陆区域,重新运行管道,并将更新插入正确地应用于“raw_table”

(但是,每次在 delta 文件夹中创建包含所有数据的新 parquet 文件时,我希望只添加包含插入和更新行的 parquet 文件?并且有关当前版本的一些信息保留在 delta 中日志?不确定这是否与我的问题相关。我已经将“raw_table”的 table_properties 更改为enableChangeDataFeed = true。“intermediate_table”的 readStream 有选项(readChangeFeed,“true”))。

然后我有以下代码从“raw_table”转到“intermediate_table”: …

databricks delta-lake databricks-autoloader delta-live-tables

7
推荐指数
1
解决办法
7742
查看次数

LIVE TABLE 和 Streaming LIVE TABLE 之间的区别

使用 DLT 时,我们可以使用 STREAMING LIVE TABLE 或 LIVE TABLE 创建实时表,如文档中所述:

创建或刷新{流式直播表| 实时表 } 表名

两种语法有什么区别?

databricks delta-live-tables

7
推荐指数
1
解决办法
8736
查看次数

模块“dlt”没有属性“表”-databricks 和 delta live 表

我是 databricks 和 delta live 表的新手。我在 python 中创建增量实时表时遇到问题。

在此输入图像描述

在此输入图像描述

在此输入图像描述

如何从文件存储中的 json 文件创建增量实时表?

python databricks delta-lake delta-live-tables

6
推荐指数
1
解决办法
5445
查看次数

如何将 DLT 目标表定向到 Unity Catalog Metastore

这个问题非常简单。似乎在 DLT 中,您可以定义输出表名称,如下所示:

@dlt.table(name="my_table_name")
def my_pipeline():
  ...
Run Code Online (Sandbox Code Playgroud)

这会写入 hive_metastore 目录,但如何针对不同的目录自定义它?

pyspark databricks delta-live-tables databricks-unity-catalog

6
推荐指数
1
解决办法
2529
查看次数

如何从 Postgres RDB 到 Databricks Lakehouse Delta Lake?

到底如何创建一个高效且可重用的 Databricks 工作流程来将原始 SQL 数据库转储到 Delta Lake 中。这里的一些混淆是为了实现以下目的的最佳方法:

  • 处理模式中的偏差(数据库表中的列)=> 对存储的表进行简单的覆盖可以吗?
  • 捕获数据变化(CDC)并高效合并现有数据;身份证上说。这对于关系数据库是否仍然相关?
  • Delta Live Table (DLT) 格式适合这个吗?

人们可能会想象以下过程:

  1. 迭代公共表 information_schema:
table_names = spark.read.jdbc(url=jdbcUrl, table="information_schema.tables",
                               properties=connectionProperties) \
                               .filter("table_schema = 'public'") \
                               .select("table_name") \
                               .rdd.flatMap(lambda x: x) \
                               .collect()

for table in table_names:
    ...
Run Code Online (Sandbox Code Playgroud)
  1. 然后对于每个表:
  • (A) 创建一个新的 Delta Lake 表,如果它不存在(或者可能在架构方面已经过时),否则;
  • (B) 将新数据/更新数据合并到 Delta Lake 中。

像 Airbyte 和其他公司这样的第三方供应商提供了这项服务——并不是因为它确实应该如此难以实施。但更有可能的是,由于 Databricks DLT/Delta Lake 方面此通用流程的文档或参考实现乏善可陈。

令人满意的答案将是(I)对 OP 中包含的(错误?)假设的一些背景/验证,(II)此工作流程缺少的代码,以及(III)对提出的 3 点的答案/澄清。

apache-spark pyspark databricks delta-lake delta-live-tables

6
推荐指数
1
解决办法
677
查看次数

带有 EventHub 的达美实时表

我正在尝试使用增量实时表从 eventhub 创建流,但我在安装库时遇到问题。是否可以使用 sh /pip 使用 Delta Live 表安装 maven 库?

我想安装 com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.17

https://learn.microsoft.com/pl-pl/azure/databricks/spark/latest/structed-streaming/streaming-event-hubs

azure-eventhub pyspark databricks delta-live-tables

5
推荐指数
1
解决办法
1974
查看次数

如何在 Databricks 增量实时表中导入另一个模块或包

我正在尝试在我的 databricks delta live table 笔记本中导入另一个模块或包,但收到一条错误消息,指出不支持 %run 或任何魔法命令。只是想知道是否还有其他方法来导入模块或包。

apache-spark pyspark databricks delta-live-tables

5
推荐指数
1
解决办法
1977
查看次数

Databricks 中的 Delta 实时表只能采用一个目标

如果我需要在 Metastore 中的两个不同数据库中发布两个表,是否需要创建两个不同的 DLT 管道?我问这个是因为我看到在管道设置中,我只能指定 1 个目标。

databricks delta-live-tables

5
推荐指数
1
解决办法
1586
查看次数