Fol*_*ger 10 python csv apache-spark pyspark
我正在尝试使用符合以下条件的 pyspark csv 阅读器:
这是我尝试过的。
file: ab.csv
------
a,b
1,2
3,four
Run Code Online (Sandbox Code Playgroud)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
DDL = "a INTEGER, b INTEGER"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=False,
columnNameOfCorruptRecord='broken')
print(df.show())
Run Code Online (Sandbox Code Playgroud)
输出:
+----+----+
| a| b|
+----+----+
| 1| 2|
|null|null|
+----+----+
Run Code Online (Sandbox Code Playgroud)
此命令不存储损坏的记录。如果我添加broken到架构并删除标头验证,则该命令会发出警告。
DDL = "a INTEGER, b INTEGER, broken STRING"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=True,
columnNameOfCorruptRecord='broken')
print(df.show())
Run Code Online (Sandbox Code Playgroud)
输出:
WARN CSVDataSource:66 - Number of column in CSV header is not equal to number of fields in the schema:
Header length: 2, schema size: 3
CSV file: file:/// ... /ab.csv
+----+----+------+
| a| b|broken|
+----+----+------+
| 1| 2| null|
|null|null|3,four|
+----+----+------+
Run Code Online (Sandbox Code Playgroud)
这是预期的行为还是存在破坏第一个示例的错误?有一个更好的方法吗?
还有一件事。我想处理损坏记录中格式正确的字段以获得这样的数据帧。
+--+----+------+
| a| b|broken|
+--+----+------+
| 1| 2| null|
| 3|null|3,four|
+--+----+------+
Run Code Online (Sandbox Code Playgroud)
我应该在阅读后做一个额外的步骤来获得它,还是我错过了一些更宽容的选择。
小智 1
这是正确的默认行为。如果您正在推断架构,它会在输出架构中隐式添加一个columnNameOfCorruptRecord字段,否则您必须在用户定义架构中提供一个名为columnNameOfCorruptRecord的字符串类型字段,或者更改列名称(如损坏)并将相同名称添加到架构中。
正如您提到的,没有选项可以部分处理数据,为此,您需要编写自己的自定义解析器,在 Spark 中扩展 CSVFileFormat。有关所有 csvoptions 的列表,请检查 org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
| 归档时间: |
|
| 查看次数: |
4769 次 |
| 最近记录: |