Spark 2.0如何处理列的可空性?

Wes*_*Wes 4 apache-spark apache-spark-sql pyspark apache-spark-2.0

在最近发布的Apache Spark数据工程师指南中,作者表示(第74页):

"...当你定义一个模式,其中所有列都被声明为没有空值时 - Spark不会强制执行该操作,并且很乐意将null值放入该列.可空信号只是帮助Spark SQL优化以处理该列.如果在不应具有空值的列中具有空值,则可能会得到不正确的结果,或者看到难以调试的奇怪异常."

在阅读笔记和之前的JIRA时,上面的陈述似乎真的不再适用.

根据SPARK-13​​740SPARK-15192,在DataFrame创建中定义模式时,可以强制执行可为空性.

我可以澄清一下吗?我不再确定这是什么行为.

uh_*_*boi 6

对于null类型,不同的DataFrame创建过程的处理方式不同.它并不是那么简单,因为至少有三个不同的区域,空值的处理方式完全不同.

  1. 首先,SPARK-15192是关于RowEncoders的.在RowEncoders的情况下,不允许空值,并且错误消息已得到改进.例如,由于二十几个重载SparkSession.createDataFrame(),有很多实现createDataFrame()基本上将RDD转换为DataFrame.在下面的示例中,没有接受空值.所以尝试类似于使用createDateFrame()如下方法将RDD转换为DataFrame的方法,您将获得相同的结果......

    val nschema = StructType(Seq(StructField("colA", IntegerType, nullable = false), StructField("colB", IntegerType, nullable = true), StructField("colC", IntegerType, nullable = false), StructField("colD", IntegerType, nullable = true)))
    val intNullsRDD = sc.parallelize(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row(2,null,null,null),org.apache.spark.sql.Row(null,3,null,null),org.apache.spark.sql.Row(null,null,null,4)))
    spark.createDataFrame(intNullsRDD, schema).show()
    
    Run Code Online (Sandbox Code Playgroud)

在Spark 2.1.1中,错误消息非常好.

17/11/23 21:30:37 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 6)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: The 0th field 'colA' of input row cannot be null.
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, colA), IntegerType) AS colA#73
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, colA), IntegerType)
   +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, colA)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
         +- input[0, org.apache.spark.sql.Row, true]
Run Code Online (Sandbox Code Playgroud)

单步执行代码,您可以看到发生这种情况的位置.方法下面的doGenCode()方法有验证.紧接着下面,当RowEncoder创建对象时val encoder = RowEncoder(schema),逻辑开始.

     @DeveloperApi
     @InterfaceStability.Evolving
     def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
     createDataFrame(rowRDD, schema, needsConversion = true)
    }

    private[sql] def createDataFrame(
      rowRDD: RDD[Row],
      schema: StructType,
      needsConversion: Boolean) = {
    // TODO: use MutableProjection when rowRDD is another DataFrame and the applied
    // schema differs from the existing schema on any field data type.
    val catalystRows = if (needsConversion) {
      val encoder = RowEncoder(schema)
      rowRDD.map(encoder.toRow)
    } else {
      rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
      }
      val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
      Dataset.ofRows(self, logicalPlan)
    }
Run Code Online (Sandbox Code Playgroud)

在更多地执行此逻辑之后,这是在objects.scala中改进的消息,这是代码处理空值的地方.实际上错误信息被传递到了,ctx.addReferenceObj(errMsg)但你明白了.

 case class GetExternalRowField(
    child: Expression,
    index: Int,
    fieldName: String) extends UnaryExpression with NonSQLExpression {

  override def nullable: Boolean = false
  override def dataType: DataType = ObjectType(classOf[Object])
  override def eval(input: InternalRow): Any =
    throw new UnsupportedOperationException("Only code-generated evaluation is supported")

  private val errMsg = s"The ${index}th field '$fieldName' of input row cannot be null."

  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    // Use unnamed reference that doesn't create a local field here to reduce the number of fields
    // because errMsgField is used only when the field is null.
    val errMsgField = ctx.addReferenceObj(errMsg)
    val row = child.genCode(ctx)
    val code = s"""
      ${row.code}

      if (${row.isNull}) {
        throw new RuntimeException("The input external row cannot be null.");
      }

      if (${row.value}.isNullAt($index)) {
        throw new RuntimeException($errMsgField);
      }

      final Object ${ev.value} = ${row.value}.get($index);
     """
    ev.copy(code = code, isNull = "false")
  }
} 
Run Code Online (Sandbox Code Playgroud)
  1. 从HDFS数据源拉出时会发生完全不同的事情.在这种情况下,当存在非可空列并且出现null时,将不会显示错误消息.该列仍接受空值.查看我创建的快速testFile"testFile.csv",然后将其放入hdfshdfs dfs -put testFile.csv /data/nullTest

       |colA|colB|colC|colD| 
       |    |    |    |    |
       |    |   2|   2|   2|
       |    |   3|    |    |
       |   4|    |    |    |
    
    Run Code Online (Sandbox Code Playgroud)

当我使用相同的nschema模式从下面的文件中读取时,即使该字段不可为空,所有空白值也变为空.有多种方法可以用不同的方式处理空白,但这是默认设置.csv和镶木地板都有相同的结果.

val nschema = StructType(Seq(StructField("colA", IntegerType, nullable = true), StructField("colB", IntegerType, nullable = true), StructField("colC", IntegerType, nullable = true), StructField("colD", IntegerType, nullable = true)))
val jListNullsADF = spark.createDataFrame(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row(2,null,null,null),org.apache.spark.sql.Row(null,3,null,null),org.apache.spark.sql.Row(null,null,null,4)).asJava,nschema)
jListNullsADF.write.format("parquet").save("/data/parquetnulltest")
spark.read.format("parquet").schema(schema).load("/data/parquetnulltest").show()

+----+----+----+----+
|colA|colB|colC|colD|
+----+----+----+----+
|null|null|null|null|
|null|   2|   2|   2|
|null|null|   3|null|
|null|   4|null|   4|
+----+----+----+----+
Run Code Online (Sandbox Code Playgroud)

允许空值的原因始于在DataFramerReader.scala DataFrameReader中进行调用的创建baseRelationToDataFrame().baseRelationToDataFrame()在SparkSession.scala中使用QueryPlan方法中的类,并QueryPlan重新创建StructType.该方法fromAttributes()总是为空的领域基本上是相同的架构和原来的,但力量为空.因此,当它返回时RowEncoder(),它现在是原始模式的可空版本.

紧接在DataFrameReader.scala下方,您可以看到baseRelationToDataFrame()通话...

  @scala.annotation.varargs
  def load(paths: String*): DataFrame = {
    sparkSession.baseRelationToDataFrame(
      DataSource.apply(
        sparkSession,
        paths = paths,
        userSpecifiedSchema = userSpecifiedSchema,
        className = source,
        options = extraOptions.toMap).resolveRelation())
  }
Run Code Online (Sandbox Code Playgroud)

紧接着文件SparkSession.scala下面你可以看到Dataset.ofRows(self: SparkSession, lr: LogicalRelation)正在调用的方法,密切注意LogicalRelation计划构造函数.

  def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
    Dataset.ofRows(self, LogicalRelation(baseRelation))
  }
Run Code Online (Sandbox Code Playgroud)

在Dataset.scala中,分析的QueryPlan对象的schema属性作为第三个参数传递,以创建数据集new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)).

  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
    val qe = sparkSession.sessionState.executePlan(logicalPlan)
    qe.assertAnalyzed()
    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
  }
}
Run Code Online (Sandbox Code Playgroud)

在QueryPlan.scala中,StructType.fromAttributes()正在使用该方法

 lazy val schema: StructType = StructType.fromAttributes(output)
Run Code Online (Sandbox Code Playgroud)

最后在StructType.scala中,可空属性始终可以为空.

  private[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
    StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
Run Code Online (Sandbox Code Playgroud)

关于基于可空性的查询计划是不同的,我认为LogicalPlan完全有可能根据列是否可为空而不同.很多信息都会传递到该对象中,并且有很多后续逻辑可以实现该计划.但是,正如我们在一秒钟前所看到的那样,它实际上在编写数据帧时并没有被保持为可空.

  1. 第三种情况取决于DataType.当您使用该方法创建DataFrame时,createDataFrame(rows: java.util.List[Row], schema: StructType)它实际上将创建零,其中将null传递到不可为空的IntegerType字段.你可以看到下面的例子......

      val schema = StructType(Seq(StructField("colA", IntegerType, nullable = false), StructField("colB", IntegerType, nullable = true), StructField("colC", IntegerType, nullable = false), StructField("colD", IntegerType, nullable = true))) 
      val jListNullsDF = spark.createDataFrame(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row(2,null,null,null),org.apache.spark.sql.Row(null,3,null,null),org.apache.spark.sql.Row(null,null,null,4)).asJava,schema)
      jListNullsDF.show() 
    
      +----+----+----+----+
      |colA|colB|colC|colD|
      +----+----+----+----+
      |   0|null|   0|null|
      |   2|null|   0|null|
      |   0|   3|   0|null|
      |   0|null|   0|   4|
      +----+----+----+----+
    
    Run Code Online (Sandbox Code Playgroud)

看起来有一些逻辑org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt()可以用零代替零.但是,对于不可为空的StringType字段,空值不会正常处理.

   val strschema = StructType(Seq(StructField("colA", StringType, nullable = false), StructField("colB", StringType, nullable = true), StructField("colC", StringType, nullable = false), StructField("colD", StringType, nullable = true)))
   val strNullsRDD = sc.parallelize(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row("r2colA",null,null,null),org.apache.spark.sql.Row(null,"r3colC",null,null),org.apache.spark.sql.Row(null,null,null,"r4colD")))
spark.createDataFrame(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row("r2cA",null,null,null),org.apache.spark.sql.Row(null,"row3cB",null,null),org.apache.spark.sql.Row(null,null,null,"row4ColD")).asJava,strschema).show()
Run Code Online (Sandbox Code Playgroud)

但下面是不是非常有用的错误消息,没有指定字段的序号位置......

java.lang.NullPointerException
  at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)
Run Code Online (Sandbox Code Playgroud)