使用 Databricks 中的 PySpark 在 Azure DataLake 中使用 partitionBy 和覆盖策略

Dat*_*ice 7 python azure apache-spark apache-spark-sql databricks

我在 Azure 环境中有一个简单的 ETL 过程

blob 存储 > 数据工厂 > 原始数据 > 数据块 > 数据湖策划 > 数据仓库(主 ETL)。

这个项目的数据集不是很大(大约 100 万行 20 列给予或接受)但是我想将它们作为 Parquet 文件在我的数据湖中正确分区。

目前我运行一些简单的逻辑来确定每个文件应该在我的湖中的哪个位置基于业务日历。

文件模糊地看起来像这样

Year Week Data
2019 01   XXX
2019 02   XXX
Run Code Online (Sandbox Code Playgroud)

然后我将给定的文件分区为以下格式,替换存在的数据并为新数据创建新文件夹。

curated ---
           dataset --
                     Year 2019 
                              - Week 01 - file.pq + metadata
                              - Week 02 - file.pq + metadata
                              - Week 03 - file.pq + datadata #(pre existing file)
Run Code Online (Sandbox Code Playgroud)

元数据是成功和自动生成的提交

为此,我在 Pyspark 2.4.3 中使用以下查询

pyspark_dataframe.write.mode('overwrite')\
                         .partitionBy('Year','Week').parquet('\curated\dataset')
Run Code Online (Sandbox Code Playgroud)

现在,如果我单独使用此命令,它将覆盖目标分区中的任何现有数据

所以Week 03会丢失。

usingspark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")似乎可以解决问题,只覆盖目标文件,但我想知道这是否是处理数据湖中文件的最佳方式?

我还发现很难找到有关上述功能的任何文档。

我的第一直觉是循环遍历单个镶木地板并手动写入每个分区,这虽然给了我更好的控制,但循环会很慢。

我的下一个想法是将每个分区写入一个/tmp文件夹并移动每个镶木地板文件,然后根据需要使用上面的查询替换文件/创建文件。然后/tmp在创建某种元数据日志的同时清除文件夹。

有没有更好的方法/方法?

任何指导将不胜感激。

这里的最终目标是为所有“策划”数据提供一个干净和安全的区域,同时拥有镶木地板文件的日志,我可以将其读入数据仓库以进行进一步的 ETL。

mur*_*ash 14

我看到您在 azure 堆栈中使用数据块。我认为最可行和最推荐的方法是使用databricks中的新delta Lake 项目

它为对象存储(如 s3 或 azure 数据湖存储)提供了各种更新插入、合并和酸交易的选项。它基本上提供了数据仓库向数据湖提供的管理、安全、隔离和更新插入/合并。对于一个管道,由于其功能和灵活性,苹果实际上将其数据仓库替换为仅在增量数据块上运行。对于您的用例和许多其他使用镶木地板的用例,将 'parquet' 替换为 'delta'只是一个简单的更改,以便使用其功能(如果您有数据块)。三角洲基本上是一个自然演进实木复合地板和databricks通过提供附加的功能和以及开源它做了很多工作。

对于您的情况,我建议您尝试使用delta 中提供的replaceWhere选项。在进行此有针对性的更新之前,目标表的格式必须为delta

取而代之的是:

dataset.repartition(1).write.mode('overwrite')\
                         .partitionBy('Year','Week').parquet('\curataed\dataset')
Run Code Online (Sandbox Code Playgroud)

https://docs.databricks.com/delta/delta-batch.html

“你可以有选择性地覆盖只要该数据匹配谓词分列

你可以试试这个:

dataset.write.repartition(1)\
       .format("delta")\
       .mode("overwrite")\
       .partitionBy('Year','Week')\
       .option("replaceWhere", "Year == '2019' AND Week >='01' AND Week <='02'")\ #to avoid overwriting Week3
       .save("\curataed\dataset")
Run Code Online (Sandbox Code Playgroud)

此外,如果您希望将分区设置为 1,为什么不使用coalesce(1),因为它可以避免完全洗牌。

https://mungingdata.com/delta-lake/updating-partitions-with-replacewhere/

'当您必须运行计算成本高的算法时replaceWhere特别有用,但仅限于某些分区' '

因此,我个人认为使用 replacewhere 手动指定覆盖将更有针对性和计算效率,然后仅依赖于: spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

Databricks 对 delta 表进行了优化,通过 bin 打包和 z 排序使其成为镶木地板的更快、更有效的选择(因此是自然进化):

来自链接:https : //docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html

  • WHERE(装箱)

'优化匹配给定分区谓词的行子集。仅支持涉及分区键属性的过滤器。

  • ZORDER BY

'在同一组文件中并置列信息。Delta Lake 数据跳过算法使用共定位来显着减少需要读取的数据量。

  • 通过索引、统计和自动缓存支持更快地执行查询

  • 具有丰富模式验证和事务保证的数据可靠性

  • 简化的数据管道,具有灵活的 UPSERT支持和单一数据源上的统一结构化流 + 批处理

您还可以查看开源项目的完整文档:https : //docs.delta.io/latest/index.html

.. 我还想说我不为 databricks/delta 湖工作。我刚刚看到他们的改进和功能使我在工作中受益。

更新:

问题的要点“替换现有数据并为新数据创建新文件夹”,并以高度可扩展和有效的方式进行。

在 parquet 中使用动态分区覆盖可以完成这项工作,但是我觉得该方法的自然演变是使用增量表合并操作,这些操作基本上是为了“将来自 Spark DataFrames 的数据集成到 Delta Lake”而创建的。它们为您提供了额外的功能和优化,可以根据希望如何发生合并数据并在表上保留所有操作的日志,以便您可以在需要时回滚版本。

Delta Lake python api(用于合并):https : //docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder

数据块优化:https ://kb.databricks.com/delta/delta-merge-into.html#discussion

使用单个合并操作,您可以指定条件合并,在这种情况下,它可以是年份和周以及 id 的组合,然后如果记录匹配(意味着它们存在于您的 spark 数据框和增量表中,第 1 周和第 2 周),使用 spark 数据框中的数据更新它们,并保持其他记录不变:

#you can also add additional condition if the records match, but not required
.whenMatchedUpdateAll(condition=None)
Run Code Online (Sandbox Code Playgroud)

在某些情况下,如果没有匹配项,那么您可能想要插入并创建新的行和分区,为此您可以使用:

.whenNotMatchedInsertAll(condition=None)
Run Code Online (Sandbox Code Playgroud)

您可以使用 。converttodelta操作https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.convertToDelta,将您的镶木地板表转换为增量表,以便您可以使用接口。

'您现在可以将 Parquet 表就地转换为 Delta Lake 表,而无需重写任何数据。这对于转换非常大的 Parquet 表非常有用,而将其重写为 Delta 表的成本很高。此外,这个过程是可逆的'

您的合并案例替换存在的数据并在不存在时创建新记录)可能如下所示:

(未测试,语法参考examples+api)

%python  
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`\curataed\dataset`")

deltaTable.alias("target").merge(dataset, "target.Year= dataset.Year  AND target.Week = dataset.Week") \
  .whenMatchedUpdateAll()\
  .whenNotMatchedInsertAll()\
  .execute()
Run Code Online (Sandbox Code Playgroud)

如果增量表正确分区(年,周)并且您正确使用 whenmatched 子句,则这些操作将得到高度优化,并且在您的情况下可能需要几秒钟。它还为您提供一致性、原子性和数据完整性以及回滚选项。

提供的更多功能是,您可以指定要在匹配时更新的列集(如果您只需要更新某些列)。您还可以启用spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true"),以便 delta 使用最少的目标分区来执行合并(更新、删除、创建)。

总的来说,我认为使用这种方法是一种执行有针对性的更新的非常新颖和创新的方式,因为它可以让您更好地控制它,同时保持操作高效。使用带有动态分区覆盖模式的镶木地板也可以正常工作,但是,delta 湖功能为您的数据湖带来无与伦比的数据质量

我的建议: 我会说,现在,使用动态分区覆盖模式木地板的文件做你的更新,你可以尝试,并试图用的databricks优化使用增量合并上只是一个表spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true"),并.whenMatchedUpdateAll()和比较两者的性能(您的文件很小,所以我认为这不会有很大的不同)。合并文章的 databricks 分区修剪优化于 2 月发布,因此它确实是新的,并且可能会改变产生的开销增量合并操作的游戏规则(因为在幕后他们只是创建新文件,但分区修剪可以加快速度)

python、scala、sql 中合并示例:https : //docs.databricks.com/delta/delta-update.html#merge-examples

https://databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html

  • 我发现一个可能的解决方案可能是使用 delta merge 和 databricks 优化,是的,我的第一直觉是只使用 parquet 的动态覆盖,而 databricks delta(merge) 可能是未来的可行解决方案。这绝对是一个新的、未知的领域,因此我添加了我的推荐并更新了我的帖子。 (2认同)