首先,我可以说我在写这篇文章时正在学习 DataBricks,所以我想要更简单、更粗糙的解决方案以及更复杂的解决方案。
我正在读取这样的 CSV 文件:
df1 = spark.read.format("csv").option("header", True).load(path_to_csv_file)
Run Code Online (Sandbox Code Playgroud)
然后我将其保存为 Delta Live Table,如下所示:
df1.write.format("delta").save("table_path")
Run Code Online (Sandbox Code Playgroud)
CSV 标题中包含空格和&等字符/,我收到错误:
AnalysisException:在架构的列名称中的“,;{}()\n\t=”中发现无效字符。请通过将表属性“delta.columnMapping.mode”设置为“name”来启用列映射。有关更多详细信息,请参阅https://docs.databricks.com/delta/delta-column-mapping.html 或者您可以使用别名对其进行重命名。
我在该问题上看到的文档解释了如何在使用 创建表后将列映射模式设置为“名称” ALTER TABLE,但没有解释如何在创建时设置它,特别是在使用上面的 DataFrame API 时。有没有办法做到这一点?
有没有更好的方法将 CSV 放入新表中?
更新:
阅读此处和此处的文档,并受到罗伯特回答的启发,我首先尝试了以下操作:
spark.conf.set("spark.databricks.delta.defaults.columnMapping.mode", "name")
Run Code Online (Sandbox Code Playgroud)
仍然没有运气,我遇到了同样的错误。有趣的是,对于初学者来说,将标题中包含空格的 CSV 文件写入 Delta Live Table 是多么困难
我正在使用 Databricks Delta Live Tables,但在向上游插入某些表时遇到一些问题。我知道下面的文字很长,但我试图尽可能清楚地描述我的问题。如果某些部分不清楚,请告诉我。
我有以下表格和流程:
Landing_zone -> 这是一个添加 JSON 文件的文件夹,其中包含插入或更新的记录的数据。Raw_table -> 这是 JSON 文件中的数据,但采用表格格式。该表采用增量格式。除了将 JSON 结构转换为表格结构(我进行了爆炸,然后从 JSON 键创建列)之外,未进行任何转换。Intermediate_table -> 这是 raw_table,但有一些额外的列(取决于其他列值)。
为了从我的着陆区转到原始表,我有以下 Pyspark 代码:
cloudfile = {"cloudFiles.format":"JSON",
"cloudFiles.schemaLocation": sourceschemalocation,
"cloudFiles.inferColumnTypes": True}
@dlt.view('landing_view')
def inc_view():
df = (spark
.readStream
.format('cloudFiles')
.options(**cloudFilesOptions)
.load(filpath_to_landing)
<Some transformations to go from JSON to tabular (explode, ...)>
return df
dlt.create_target_table('raw_table',
table_properties = {'delta.enableChangeDataFeed': 'true'})
dlt.apply_changes(target='raw_table',
source='landing_view',
keys=['id'],
sequence_by='updated_at')
Run Code Online (Sandbox Code Playgroud)
这段代码按预期工作。我运行它,将一个changes.JSON文件添加到登陆区域,重新运行管道,并将更新插入正确地应用于“raw_table”
(但是,每次在 delta 文件夹中创建包含所有数据的新 parquet 文件时,我希望只添加包含插入和更新行的 parquet 文件?并且有关当前版本的一些信息保留在 delta 中日志?不确定这是否与我的问题相关。我已经将“raw_table”的 table_properties 更改为enableChangeDataFeed = true。“intermediate_table”的 readStream 有选项(readChangeFeed,“true”))。
然后我有以下代码从“raw_table”转到“intermediate_table”: …
databricks delta-lake databricks-autoloader delta-live-tables
使用 DLT 时,我们可以使用 STREAMING LIVE TABLE 或 LIVE TABLE 创建实时表,如文档中所述:
创建或刷新{流式直播表| 实时表 } 表名
两种语法有什么区别?
这个问题非常简单。似乎在 DLT 中,您可以定义输出表名称,如下所示:
@dlt.table(name="my_table_name")
def my_pipeline():
...
Run Code Online (Sandbox Code Playgroud)
这会写入 hive_metastore 目录,但如何针对不同的目录自定义它?
pyspark databricks delta-live-tables databricks-unity-catalog
到底如何创建一个高效且可重用的 Databricks 工作流程来将原始 SQL 数据库转储到 Delta Lake 中。这里的一些混淆是为了实现以下目的的最佳方法:
人们可能会想象以下过程:
table_names = spark.read.jdbc(url=jdbcUrl, table="information_schema.tables",
properties=connectionProperties) \
.filter("table_schema = 'public'") \
.select("table_name") \
.rdd.flatMap(lambda x: x) \
.collect()
for table in table_names:
...
Run Code Online (Sandbox Code Playgroud)
像 Airbyte 和其他公司这样的第三方供应商提供了这项服务——并不是因为它确实应该如此难以实施。但更有可能的是,由于 Databricks DLT/Delta Lake 方面此通用流程的文档或参考实现乏善可陈。
令人满意的答案将是(I)对 OP 中包含的(错误?)假设的一些背景/验证,(II)此工作流程缺少的代码,以及(III)对提出的 3 点的答案/澄清。
apache-spark pyspark databricks delta-lake delta-live-tables
我正在尝试使用增量实时表从 eventhub 创建流,但我在安装库时遇到问题。是否可以使用 sh /pip 使用 Delta Live 表安装 maven 库?
我想安装 com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.17
我正在尝试在我的 databricks delta live table 笔记本中导入另一个模块或包,但收到一条错误消息,指出不支持 %run 或任何魔法命令。只是想知道是否还有其他方法来导入模块或包。
如果我需要在 Metastore 中的两个不同数据库中发布两个表,是否需要创建两个不同的 DLT 管道?我问这个是因为我看到在管道设置中,我只能指定 1 个目标。