Wes*_*Wes 4 apache-spark apache-spark-sql pyspark apache-spark-2.0
在最近发布的Apache Spark数据工程师指南中,作者表示(第74页):
"...当你定义一个模式,其中所有列都被声明为没有空值时 - Spark不会强制执行该操作,并且很乐意将null值放入该列.可空信号只是帮助Spark SQL优化以处理该列.如果在不应具有空值的列中具有空值,则可能会得到不正确的结果,或者看到难以调试的奇怪异常."
在阅读笔记和之前的JIRA时,上面的陈述似乎真的不再适用.
根据SPARK-13740和SPARK-15192,在DataFrame创建中定义模式时,可以强制执行可为空性.
我可以澄清一下吗?我不再确定这是什么行为.
对于null类型,不同的DataFrame创建过程的处理方式不同.它并不是那么简单,因为至少有三个不同的区域,空值的处理方式完全不同.
首先,SPARK-15192是关于RowEncoders的.在RowEncoders的情况下,不允许空值,并且错误消息已得到改进.例如,由于二十几个重载SparkSession.createDataFrame()
,有很多实现createDataFrame()
基本上将RDD转换为DataFrame.在下面的示例中,没有接受空值.所以尝试类似于使用createDateFrame()
如下方法将RDD转换为DataFrame的方法,您将获得相同的结果......
val nschema = StructType(Seq(StructField("colA", IntegerType, nullable = false), StructField("colB", IntegerType, nullable = true), StructField("colC", IntegerType, nullable = false), StructField("colD", IntegerType, nullable = true)))
val intNullsRDD = sc.parallelize(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row(2,null,null,null),org.apache.spark.sql.Row(null,3,null,null),org.apache.spark.sql.Row(null,null,null,4)))
spark.createDataFrame(intNullsRDD, schema).show()
Run Code Online (Sandbox Code Playgroud)在Spark 2.1.1中,错误消息非常好.
17/11/23 21:30:37 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 6)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: The 0th field 'colA' of input row cannot be null.
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, colA), IntegerType) AS colA#73
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, colA), IntegerType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, colA)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true]
Run Code Online (Sandbox Code Playgroud)
单步执行代码,您可以看到发生这种情况的位置.方法下面的doGenCode()
方法有验证.紧接着下面,当RowEncoder
创建对象时val encoder = RowEncoder(schema)
,逻辑开始.
@DeveloperApi
@InterfaceStability.Evolving
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
createDataFrame(rowRDD, schema, needsConversion = true)
}
private[sql] def createDataFrame(
rowRDD: RDD[Row],
schema: StructType,
needsConversion: Boolean) = {
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
// schema differs from the existing schema on any field data type.
val catalystRows = if (needsConversion) {
val encoder = RowEncoder(schema)
rowRDD.map(encoder.toRow)
} else {
rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
}
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
Dataset.ofRows(self, logicalPlan)
}
Run Code Online (Sandbox Code Playgroud)
在更多地执行此逻辑之后,这是在objects.scala中改进的消息,这是代码处理空值的地方.实际上错误信息被传递到了,ctx.addReferenceObj(errMsg)
但你明白了.
case class GetExternalRowField(
child: Expression,
index: Int,
fieldName: String) extends UnaryExpression with NonSQLExpression {
override def nullable: Boolean = false
override def dataType: DataType = ObjectType(classOf[Object])
override def eval(input: InternalRow): Any =
throw new UnsupportedOperationException("Only code-generated evaluation is supported")
private val errMsg = s"The ${index}th field '$fieldName' of input row cannot be null."
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
// Use unnamed reference that doesn't create a local field here to reduce the number of fields
// because errMsgField is used only when the field is null.
val errMsgField = ctx.addReferenceObj(errMsg)
val row = child.genCode(ctx)
val code = s"""
${row.code}
if (${row.isNull}) {
throw new RuntimeException("The input external row cannot be null.");
}
if (${row.value}.isNullAt($index)) {
throw new RuntimeException($errMsgField);
}
final Object ${ev.value} = ${row.value}.get($index);
"""
ev.copy(code = code, isNull = "false")
}
}
Run Code Online (Sandbox Code Playgroud)
从HDFS数据源拉出时会发生完全不同的事情.在这种情况下,当存在非可空列并且出现null时,将不会显示错误消息.该列仍接受空值.查看我创建的快速testFile"testFile.csv",然后将其放入hdfshdfs dfs -put testFile.csv /data/nullTest
|colA|colB|colC|colD|
| | | | |
| | 2| 2| 2|
| | 3| | |
| 4| | | |
Run Code Online (Sandbox Code Playgroud)当我使用相同的nschema
模式从下面的文件中读取时,即使该字段不可为空,所有空白值也变为空.有多种方法可以用不同的方式处理空白,但这是默认设置.csv和镶木地板都有相同的结果.
val nschema = StructType(Seq(StructField("colA", IntegerType, nullable = true), StructField("colB", IntegerType, nullable = true), StructField("colC", IntegerType, nullable = true), StructField("colD", IntegerType, nullable = true)))
val jListNullsADF = spark.createDataFrame(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row(2,null,null,null),org.apache.spark.sql.Row(null,3,null,null),org.apache.spark.sql.Row(null,null,null,4)).asJava,nschema)
jListNullsADF.write.format("parquet").save("/data/parquetnulltest")
spark.read.format("parquet").schema(schema).load("/data/parquetnulltest").show()
+----+----+----+----+
|colA|colB|colC|colD|
+----+----+----+----+
|null|null|null|null|
|null| 2| 2| 2|
|null|null| 3|null|
|null| 4|null| 4|
+----+----+----+----+
Run Code Online (Sandbox Code Playgroud)
允许空值的原因始于在DataFramerReader.scala DataFrameReader
中进行调用的创建baseRelationToDataFrame()
.baseRelationToDataFrame()
在SparkSession.scala中使用QueryPlan
方法中的类,并QueryPlan
重新创建StructType
.该方法fromAttributes()
这总是为空的领域基本上是相同的架构和原来的,但力量为空.因此,当它返回时RowEncoder()
,它现在是原始模式的可空版本.
紧接在DataFrameReader.scala下方,您可以看到baseRelationToDataFrame()
通话...
@scala.annotation.varargs
def load(paths: String*): DataFrame = {
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
}
Run Code Online (Sandbox Code Playgroud)
紧接着文件SparkSession.scala下面你可以看到Dataset.ofRows(self: SparkSession, lr: LogicalRelation)
正在调用的方法,密切注意LogicalRelation
计划构造函数.
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
Dataset.ofRows(self, LogicalRelation(baseRelation))
}
Run Code Online (Sandbox Code Playgroud)
在Dataset.scala中,分析的QueryPlan对象的schema属性作为第三个参数传递,以创建数据集new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
.
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
}
Run Code Online (Sandbox Code Playgroud)
在QueryPlan.scala中,StructType.fromAttributes()
正在使用该方法
lazy val schema: StructType = StructType.fromAttributes(output)
Run Code Online (Sandbox Code Playgroud)
最后在StructType.scala中,可空属性始终可以为空.
private[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
Run Code Online (Sandbox Code Playgroud)
关于基于可空性的查询计划是不同的,我认为LogicalPlan完全有可能根据列是否可为空而不同.很多信息都会传递到该对象中,并且有很多后续逻辑可以实现该计划.但是,正如我们在一秒钟前所看到的那样,它实际上在编写数据帧时并没有被保持为可空.
第三种情况取决于DataType.当您使用该方法创建DataFrame时,createDataFrame(rows: java.util.List[Row], schema: StructType)
它实际上将创建零,其中将null传递到不可为空的IntegerType字段.你可以看到下面的例子......
val schema = StructType(Seq(StructField("colA", IntegerType, nullable = false), StructField("colB", IntegerType, nullable = true), StructField("colC", IntegerType, nullable = false), StructField("colD", IntegerType, nullable = true)))
val jListNullsDF = spark.createDataFrame(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row(2,null,null,null),org.apache.spark.sql.Row(null,3,null,null),org.apache.spark.sql.Row(null,null,null,4)).asJava,schema)
jListNullsDF.show()
+----+----+----+----+
|colA|colB|colC|colD|
+----+----+----+----+
| 0|null| 0|null|
| 2|null| 0|null|
| 0| 3| 0|null|
| 0|null| 0| 4|
+----+----+----+----+
Run Code Online (Sandbox Code Playgroud)看起来有一些逻辑org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt()
可以用零代替零.但是,对于不可为空的StringType字段,空值不会正常处理.
val strschema = StructType(Seq(StructField("colA", StringType, nullable = false), StructField("colB", StringType, nullable = true), StructField("colC", StringType, nullable = false), StructField("colD", StringType, nullable = true)))
val strNullsRDD = sc.parallelize(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row("r2colA",null,null,null),org.apache.spark.sql.Row(null,"r3colC",null,null),org.apache.spark.sql.Row(null,null,null,"r4colD")))
spark.createDataFrame(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row("r2cA",null,null,null),org.apache.spark.sql.Row(null,"row3cB",null,null),org.apache.spark.sql.Row(null,null,null,"row4ColD")).asJava,strschema).show()
Run Code Online (Sandbox Code Playgroud)
但下面是不是非常有用的错误消息,没有指定字段的序号位置......
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1875 次 |
最近记录: |