如何验证Spark Dataframe的内容

ssh*_*off 4 validation scala dataframe apache-spark apache-spark-sql

我有Scala Spark代码库,它运行良好,但不应该.

第二列有混合类型的数据,而在Schema我定义了它IntegerType.我的实际程序有超过100列,并DataFrames在转换后继续派生多个子项.

如何验证RDDDataFrame字段的内容是否具有正确的数据类型值,从而忽略无效行或将列的内容更改为某个默认值.任何更多的数据质量检查指针DataFrameRDD赞赏.

var theSeq = Seq(("X01", "41"),
    ("X01", 41),
    ("X01", 41),
    ("X02", "ab"),
    ("X02", "%%"))

val newRdd = sc.parallelize(theSeq)
val rowRdd = newRdd.map(r => Row(r._1, r._2))

val theSchema = StructType(Seq(StructField("ID", StringType, true),
    StructField("Age", IntegerType, true)))
val theNewDF = sqc.createDataFrame(rowRdd, theSchema)
theNewDF.show()  
Run Code Online (Sandbox Code Playgroud)

zer*_*323 11

首先,传递schema只是一种避免类型推断的方法.在DataFrame创建期间未验证或强制执行此操作.在一个侧面说明,我不会形容ClassCastException运作良好.有那么一刻我以为你真的发现了一个bug.

我认为重要的问题是如何获得像theSeq/ 一样的数据newRdd.它是你自己解析的东西,是从外部组件收到的吗?只需查看您已经知道的类型(Seq[(String, Any)]/ RDD[(String, Any)]分别),它就不是一个有效的输入DataFrame.处理这个级别的事情的方法可能是接受静态类型.斯卡拉提供了相当多的整洁的方式来处理意外情况(Try,Either,Option),其中最后一个是最简单的一个,并作为奖金星火SQL效果很好.相当简单的处理事情的方式可能看起来像这样

def validateInt(x: Any) = x match {
  case x: Int => Some(x)
  case _ => None
}

def validateString(x: Any) = x match { 
  case x: String => Some(x)
  case _ => None
}

val newRddOption: RDD[(Option[String], Option[Int])] = newRdd.map{
  case (id, age) => (validateString(id), validateInt(age))}
Run Code Online (Sandbox Code Playgroud)

由于Options可以轻松编写,您可以添加如下所示的其他检查:

def validateAge(age: Int) = {
  if(age >= 0 && age < 150) Some(age)
  else None
}

val newRddValidated: RDD[(Option[String], Option[Int])] = newRddOption.map{
  case (id, age) => (id, age.flatMap(validateAge))}
Run Code Online (Sandbox Code Playgroud)

接下来,而不是Row一个非常粗糙的容器,我将使用案例类:

case class Record(id: Option[String], age: Option[Int])

val records: RDD[Record] = newRddValidated.map{case (id, age) => Record(id, age)}
Run Code Online (Sandbox Code Playgroud)

此时你需要做的就是打电话toDF:

import org.apache.spark.sql.DataFrame

val df: DataFrame = records.toDF
df.printSchema

// root
//  |-- id: string (nullable = true)
//  |-- age: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

这是艰难但可以说是更优雅的方式.更快的是让SQL构建系统为您完成工作.首先让我们将一切转换为Strings:

val stringRdd: RDD[(String, String)] = sc.parallelize(theSeq).map(
  p => (p._1.toString, p._2.toString))
Run Code Online (Sandbox Code Playgroud)

接下来创建一个DataFrame:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.col

val df: DataFrame = stringRdd.toDF("id", "age")

val expectedTypes = Seq(StringType, IntegerType)
val exprs: Seq[Column] = df.columns.zip(expectedTypes).map{
  case (c, t) => col(c).cast(t).alias(c)}

val dfProcessed: DataFrame = df.select(exprs: _*)
Run Code Online (Sandbox Code Playgroud)

结果如下:

dfProcessed.printSchema

// root
//  |-- id: string (nullable = true)
//  |-- age: integer (nullable = true)


dfProcessed.show

// +---+----+
// | id| age|
// +---+----+
// |X01|  41|
// |X01|  41|
// |X01|  41|
// |X02|null|
// |X02|null|
// +---+----+
Run Code Online (Sandbox Code Playgroud)


Jus*_*ony 0

在 1.4 或更早版本中

import org.apache.spark.sql.execution.debug._
theNewDF.typeCheck
Run Code Online (Sandbox Code Playgroud)

但它已通过 SPARK-9754 被删除。我没有检查过,但我认为typeChecksqlContext.debug提前