小编Fol*_*ger的帖子

Pyspark 使用架构读取 csv、标头检查并存储损坏的记录

我正在尝试使用符合以下条件的 pyspark csv 阅读器:

  • 根据模式中的数据类型读取 csv
  • 检查标题和架构中的列名是否匹配
  • 将损坏的记录存储在新字段中

这是我尝试过的。

file: ab.csv
------
a,b
1,2
3,four
Run Code Online (Sandbox Code Playgroud)
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
DDL = "a INTEGER, b INTEGER"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=False,
                    columnNameOfCorruptRecord='broken')
print(df.show())
Run Code Online (Sandbox Code Playgroud)

输出:

+----+----+
|   a|   b|
+----+----+
|   1|   2|
|null|null|
+----+----+
Run Code Online (Sandbox Code Playgroud)

此命令不存储损坏的记录。如果我添加broken到架构并删除标头验证,则该命令会发出警告。

DDL = "a INTEGER, b INTEGER, broken STRING"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=True, 
                    columnNameOfCorruptRecord='broken')
print(df.show())
Run Code Online (Sandbox Code Playgroud)

输出:

WARN  CSVDataSource:66 - Number of column in CSV header is not equal to …
Run Code Online (Sandbox Code Playgroud)

python csv apache-spark pyspark

10
推荐指数
1
解决办法
4769
查看次数

标签 统计

apache-spark ×1

csv ×1

pyspark ×1

python ×1