小编Ric*_*ith的帖子

在损坏的记录字段上进行过滤时,Spark的.count()函数与数据框的内容不同

我有一个用Python编写的Spark作业,在检查其数据中的错误时,它的行为异常。简化的版本如下:

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, DoubleType
from pyspark.sql.functions import col, lit

spark = SparkSession.builder.master("local[3]").appName("pyspark-unittest").getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")


SCHEMA = StructType([
    StructField("headerDouble", DoubleType(), False),
    StructField("ErrorField", StringType(), False)
])

dataframe = (
    spark.read
    .option("header", "true")
    .option("mode", "PERMISSIVE")
    .option("columnNameOfCorruptRecord", "ErrorField")
    .schema(SCHEMA).csv("./x.csv")
)

total_row_count = dataframe.count()
print("total_row_count = " + str(total_row_count))

errors = dataframe.filter(col("ErrorField").isNotNull())
errors.show()
error_count = errors.count()
print("errors count = " + str(error_count))
Run Code Online (Sandbox Code Playgroud)

它正在读取的csv很简单:

headerDouble
wrong
Run Code Online (Sandbox Code Playgroud)

相关的输出是

total_row_count = 1
+------------+----------+
|headerDouble|ErrorField|
+------------+----------+
|        null|     wrong|
+------------+----------+

errors …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
461
查看次数

标签 统计

apache-spark ×1

apache-spark-sql ×1

pyspark ×1