我有一个用Python编写的Spark作业,在检查其数据中的错误时,它的行为异常。简化的版本如下:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, DoubleType
from pyspark.sql.functions import col, lit
spark = SparkSession.builder.master("local[3]").appName("pyspark-unittest").getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
SCHEMA = StructType([
StructField("headerDouble", DoubleType(), False),
StructField("ErrorField", StringType(), False)
])
dataframe = (
spark.read
.option("header", "true")
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "ErrorField")
.schema(SCHEMA).csv("./x.csv")
)
total_row_count = dataframe.count()
print("total_row_count = " + str(total_row_count))
errors = dataframe.filter(col("ErrorField").isNotNull())
errors.show()
error_count = errors.count()
print("errors count = " + str(error_count))
Run Code Online (Sandbox Code Playgroud)
它正在读取的csv很简单:
headerDouble
wrong
Run Code Online (Sandbox Code Playgroud)
相关的输出是
total_row_count = 1
+------------+----------+
|headerDouble|ErrorField|
+------------+----------+
| null| wrong|
+------------+----------+
errors …Run Code Online (Sandbox Code Playgroud)