Pyspark 使用架构读取 csv、标头检查并存储损坏的记录

Fol*_*ger 10 python csv apache-spark pyspark

我正在尝试使用符合以下条件的 pyspark csv 阅读器:

  • 根据模式中的数据类型读取 csv
  • 检查标题和架构中的列名是否匹配
  • 将损坏的记录存储在新字段中

这是我尝试过的。

file: ab.csv
------
a,b
1,2
3,four
Run Code Online (Sandbox Code Playgroud)
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
DDL = "a INTEGER, b INTEGER"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=False,
                    columnNameOfCorruptRecord='broken')
print(df.show())
Run Code Online (Sandbox Code Playgroud)

输出:

+----+----+
|   a|   b|
+----+----+
|   1|   2|
|null|null|
+----+----+
Run Code Online (Sandbox Code Playgroud)

此命令不存储损坏的记录。如果我添加broken到架构并删除标头验证,则该命令会发出警告。

DDL = "a INTEGER, b INTEGER, broken STRING"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=True, 
                    columnNameOfCorruptRecord='broken')
print(df.show())
Run Code Online (Sandbox Code Playgroud)

输出:

WARN  CSVDataSource:66 - Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 2, schema size: 3
CSV file: file:/// ... /ab.csv
+----+----+------+
|   a|   b|broken|
+----+----+------+
|   1|   2|  null|
|null|null|3,four|
+----+----+------+
Run Code Online (Sandbox Code Playgroud)

这是预期的行为还是存在破坏第一个示例的错误?有一个更好的方法吗?

还有一件事。我想处理损坏记录中格式正确的字段以获得这样的数据帧。

+--+----+------+
| a|   b|broken|
+--+----+------+
| 1|   2|  null|
| 3|null|3,four|
+--+----+------+
Run Code Online (Sandbox Code Playgroud)

我应该在阅读后做一个额外的步骤来获得它,还是我错过了一些更宽容的选择。

小智 1

这是正确的默认行为。如果您正在推断架构,它会在输出架构中隐式添加一个columnNameOfCorruptRecord字段,否则您必须在用户定义架构中提供一个名为columnNameOfCorruptRecord的字符串类型字段,或者更改列名称(如损坏)并将相同名称添加到架构中。

正如您提到的,没有选项可以部分处理数据,为此,您需要编写自己的自定义解析器,在 Spark 中扩展 CSVFileFormat。有关所有 csvoptions 的列表,请检查 org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala

  • _如果您正在推断一个架构,它会在输出架构中隐式添加一个columnNameOfCorruptRecord字段。_我刚刚对此进行了测试。它没有添加列。它只是删除了记录并将 None 放入字段中(就像我在第一个示例中所做的那样)。我通过制作一个主要包含整数的较长 ab.csv 文件并降低推断模式的采样率来对其进行测试。`spark.read.csv('ab.csv',header=True,inferSchema=True,enforceSchema=False,columnNameOfCorruptRecord='破碎',samplingRatio=0.1)` (2认同)