Ric*_*ith 5 apache-spark apache-spark-sql pyspark
我有一个用Python编写的Spark作业,在检查其数据中的错误时,它的行为异常。简化的版本如下:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, DoubleType
from pyspark.sql.functions import col, lit
spark = SparkSession.builder.master("local[3]").appName("pyspark-unittest").getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
SCHEMA = StructType([
StructField("headerDouble", DoubleType(), False),
StructField("ErrorField", StringType(), False)
])
dataframe = (
spark.read
.option("header", "true")
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "ErrorField")
.schema(SCHEMA).csv("./x.csv")
)
total_row_count = dataframe.count()
print("total_row_count = " + str(total_row_count))
errors = dataframe.filter(col("ErrorField").isNotNull())
errors.show()
error_count = errors.count()
print("errors count = " + str(error_count))
Run Code Online (Sandbox Code Playgroud)
它正在读取的csv很简单:
headerDouble
wrong
Run Code Online (Sandbox Code Playgroud)
相关的输出是
total_row_count = 1
+------------+----------+
|headerDouble|ErrorField|
+------------+----------+
| null| wrong|
+------------+----------+
errors count = 0
Run Code Online (Sandbox Code Playgroud)
现在这怎么可能发生?如果数据帧中有一条记录,怎么算为0?这是Spark基础架构中的错误,还是我缺少什么?
编辑:看起来这可能是Spark 2.2上的一个已知错误,已在Spark 2.3中修复-https: //issues.apache.org/jira/browse/SPARK-21610
谢谢@ user6910411-似乎是一个错误。我在Spark项目的错误跟踪器中提出了一个问题。
我推测由于ErrorField
模式中存在,Spark变得混乱了,该模式也被指定为错误列并用于过滤数据帧。
同时,我想我已经找到一种解决方法,可以以合理的速度对数据框行进行计数:
def count_df_with_spark_bug_workaround(df):
return sum(1 for _ in df.toLocalIterator())
Run Code Online (Sandbox Code Playgroud)
不太确定为什么.count()
无法正常工作时会给出正确的答案。
我提出的吉拉票:https : //issues.apache.org/jira/browse/SPARK-24147
原来是这样的副本:https : //issues.apache.org/jira/browse/SPARK-21610
归档时间: |
|
查看次数: |
461 次 |
最近记录: |