如何在不使用 Azure Databricks 中的 Pyspark 缓存数据的情况下查询损坏记录?

Gui*_*abs 5 caching corrupt apache-spark pyspark databricks

我遇到了数据块中记录损坏的问题。我们想要对损坏的记录进行计数,并将损坏的记录保存在特定位置作为增量表。为此,我们正在阅读PERMISSIVE_corrupt_record专栏的使用内容并进行查询。

我们在 Azure Databricks 中将 pyspark 与 Apache Spark 3.0.1 结合使用。

这是我们收到的错误消息:

从 Spark 2.3 开始,当引用的列仅包含内部损坏记录列(默认情况下名为 _corrupt_record)时,不允许从原始 JSON/CSV 文件进行查询。例如:spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()和spark.read.schema(schema).json(file).select("_corrupt_record" )。展示()。

根据此文档,如果要查询列损坏记录,则必须缓存或保存数据。

在此输入图像描述

但我们不想在 ETL 中缓存数据。ETL 用于在同一集群上运行的许多作业,我们可以将 150GB 的大文件作为输入。缓存数据可能会导致集群崩溃。

有没有办法在缓存数据的情况下查询这些损坏的记录?

#1 将数据保存在 blob 存储上可能是另一种选择,但这听起来开销很大。

#2 我们还尝试使用选项BadRecordsPath:将坏记录保存到 BadRecordsPath 并读回以进行计数,但是没有简单的方法可以知道坏记录文件是否已被写入(以及该文件位于哪个分区)书面)。分区看起来像/20210425T102409/bad_records

在这里查看我的其他问题

#3 另一种方法是从许可读取中减去 dropmalformed 读取。例如:

dataframe_with_corrupt = spark.read.format('csv').option("mode", "PERMISSIVE").load(path)
dataframe_without_corrupt = spark.read.format('csv').option("mode", "DROPMALFORMED").load(path)

corrupt_df = dataframe_with_corrupt.exceptAll(dataframe_without_corrupt)
Run Code Online (Sandbox Code Playgroud)

但我不确定它会比缓存占用更少的内存!

任何建议或意见将不胜感激!提前致谢