在损坏的记录字段上进行过滤时,Spark的.count()函数与数据框的内容不同

Ric*_*ith 5 apache-spark apache-spark-sql pyspark

我有一个用Python编写的Spark作业,在检查其数据中的错误时,它的行为异常。简化的版本如下:

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, DoubleType
from pyspark.sql.functions import col, lit

spark = SparkSession.builder.master("local[3]").appName("pyspark-unittest").getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")


SCHEMA = StructType([
    StructField("headerDouble", DoubleType(), False),
    StructField("ErrorField", StringType(), False)
])

dataframe = (
    spark.read
    .option("header", "true")
    .option("mode", "PERMISSIVE")
    .option("columnNameOfCorruptRecord", "ErrorField")
    .schema(SCHEMA).csv("./x.csv")
)

total_row_count = dataframe.count()
print("total_row_count = " + str(total_row_count))

errors = dataframe.filter(col("ErrorField").isNotNull())
errors.show()
error_count = errors.count()
print("errors count = " + str(error_count))
Run Code Online (Sandbox Code Playgroud)

它正在读取的csv很简单:

headerDouble
wrong
Run Code Online (Sandbox Code Playgroud)

相关的输出是

total_row_count = 1
+------------+----------+
|headerDouble|ErrorField|
+------------+----------+
|        null|     wrong|
+------------+----------+

errors count = 0
Run Code Online (Sandbox Code Playgroud)

现在这怎么可能发生?如果数据帧中有一条记录,怎么算为0?这是Spark基础架构中的错误,还是我缺少什么?

编辑:看起来这可能是Spark 2.2上的一个已知错误,已在Spark 2.3中修复-https: //issues.apache.org/jira/browse/SPARK-21610

Ric*_*ith 6

谢谢@ user6910411-似乎是一个错误。我在Spark项目的错误跟踪器中提出了一个问题。

我推测由于ErrorField模式中存在,Spark变得混乱了,该模式也被指定为错误列并用于过滤数据帧。

同时,我想我已经找到一种解决方法,可以以合理的速度对数据框行进行计数:

def count_df_with_spark_bug_workaround(df):
    return sum(1 for _ in df.toLocalIterator())
Run Code Online (Sandbox Code Playgroud)

不太确定为什么.count()无法正常工作时会给出正确的答案。

我提出的吉拉票:https : //issues.apache.org/jira/browse/SPARK-24147

原来是这样的副本:https : //issues.apache.org/jira/browse/SPARK-21610

  • 您使用的是哪个Spark版本?我之所以问是因为我收到了“ AnalysisException:自Spark 2.3起,当引用的列仅包含内部损坏的记录列时,不允许来自原始JSON / CSV文件的查询” (2认同)