Aru*_*run 8 schema scala apache-spark-sql
我正在尝试编写一些测试用例来验证源(.csv)文件和目标(hive表)之间的数据.其中一个验证是表的结构验证.
我已将.csv数据(使用已定义的模式)加载到一个数据框中,并将hive表数据提取到另一个数据框中.
当我现在尝试比较两个数据帧的模式时,它返回false.不知道为什么.对此有什么想法吗?
源数据帧架构:
scala> res39.printSchema
root
 |-- datetime: timestamp (nullable = true)
 |-- load_datetime: timestamp (nullable = true)
 |-- source_bank: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- header_row_count: integer (nullable = true)
 |-- emp_hours: double (nullable = true)
目标数据帧架构:
scala> targetRawData.printSchema
root
 |-- datetime: timestamp (nullable = true)
 |-- load_datetime: timestamp (nullable = true)
 |-- source_bank: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- header_row_count: integer (nullable = true)
 |-- emp_hours: double (nullable = true)
当我比较时,它返回false:
scala> res39.schema == targetRawData.schema
res47: Boolean = false
两个数据帧中的数据如下所示:
scala> res39.show
+-------------------+-------------------+-----------+--------+----------------+---------+
|           datetime|      load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03|        RBS| Naveen |             100|    15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03|        RBS| Naveen |             100|   115.78|
|2015-04-02 23:24:25|2015-04-02 23:24:25|        RBS|   Arun |             200|     2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14|        RBS|   Arun |             100|    30.98|
|2018-06-04 10:11:12|2018-06-04 10:11:12|        XZX|   Arun |             400|     12.0|
+-------------------+-------------------+-----------+--------+----------------+---------+
scala> targetRawData.show
+-------------------+-------------------+-----------+--------+----------------+---------+
|           datetime|      load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03|        RBS|  Naveen|             100|    15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03|        RBS|  Naveen|             100|   115.78|
|2015-04-02 23:25:25|2015-04-02 23:25:25|        RBS|    Arun|             200|     2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14|        RBS|    Arun|             100|    30.98|
+-------------------+-------------------+-----------+--------+----------------+---------+
完整代码如下所示:
//import org.apache.spark
import org.apache.spark.sql.hive._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.text._
import java.util.Date
import scala.util._
import org.apache.spark.sql.hive.HiveContext
  //val conf = new SparkConf().setAppName("Simple Application")
  //val sc = new SparkContext(conf)
  val hc = new HiveContext(sc)
  val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master", "local").getOrCreate()
   // set source and target location
    val sourceDataLocation = "hdfs://localhost:9000/source.txt"
    val targetTableName = "TableA"
    // Extract source data
    println("Extracting SAS source data from csv file location " + sourceDataLocation);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val sourceRawCsvData = sc.textFile(sourceDataLocation)
    println("Extracting target data from hive table " + targetTableName)
    val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count, emp_hours from " + targetTableName)
    // Add the test cases here
    // Test 2 - Validate the Structure
       val headerColumns = sourceRawCsvData.first().split(",").to[List]
       val schema = TableASchema(headerColumns)
       val data = sourceRawCsvData.mapPartitionsWithIndex((index, element) => if (index == 0) element.drop(1) else element)
       .map(_.split(",").toList)
       .map(row)
       val dataFrame = spark.createDataFrame(data,schema)
       val sourceDataFrame = dataFrame.toDF(dataFrame.columns map(_.toLowerCase): _*)
       data.collect
       data.getClass
    // Test 3 - Validate the data
    // Test 4 - Calculate the average and variance of Int or Dec columns
    // Test 5 - Test 5
  def UpdateResult(tableName: String, returnCode: Int, description: String){
    val insertString = "INSERT INTO TestResult VALUES('" + tableName + "', " + returnCode + ",'" + description + "')"
    val a = hc.sql(insertString)
    }
  def TableASchema(columnName: List[String]): StructType = {
    StructType(
      Seq(
        StructField(name = "datetime", dataType = TimestampType, nullable = true),
        StructField(name = "load_datetime", dataType = TimestampType, nullable = true),
        StructField(name = "source_bank", dataType = StringType, nullable = true),
        StructField(name = "emp_name", dataType = StringType, nullable = true),
        StructField(name = "header_row_count", dataType = IntegerType, nullable = true),
        StructField(name = "emp_hours", dataType = DoubleType, nullable = true)
        )
    )
  }
  def row(line: List[String]): Row = {
       Row(convertToTimestamp(line(0).trim), convertToTimestamp(line(1).trim), line(2).trim, line(3).trim, line(4).toInt, line(5).toDouble)
    }
  def convertToTimestamp(s: String) : Timestamp = s match {
     case "" => null
     case _ => {
        val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss")
        Try(new Timestamp(format.parse(s).getTime)) match {
        case Success(t) => t
        case Failure(_) => null
      }
    }
  }
  }
基于@Derek Kaknes的答案,这是我想出的用于比较schema的解决方案,仅关注列名,数据类型和可空性以及对元数据的冷漠
// Extract relevant information: name (key), type & nullability (values) of columns
def getCleanedSchema(df: DataFrame): Map[String, (DataType, Boolean)] = {
    df.schema.map { (structField: StructField) =>
      structField.name.toLowerCase -> (structField.dataType, structField.nullable)
    }.toMap
  }
// Compare relevant information
def getSchemaDifference(schema1: Map[String, (DataType, Boolean)],
                        schema2: Map[String, (DataType, Boolean)]
                       ): Map[String, (Option[(DataType, Boolean)], Option[(DataType, Boolean)])] = {
  (schema1.keys ++ schema2.keys).
    map(_.toLowerCase).
    toList.distinct.
    flatMap { (columnName: String) =>
      val schema1FieldOpt: Option[(DataType, Boolean)] = schema1.get(columnName)
      val schema2FieldOpt: Option[(DataType, Boolean)] = schema2.get(columnName)
      if (schema1FieldOpt == schema2FieldOpt) None
      else Some(columnName -> (schema1FieldOpt, schema2FieldOpt))
    }.toMap
}
getCleanedSchema方法提取感兴趣的信息-列数据类型和可为空性,并将map列名返回tuple
getSchemaDifference方法返回一个map仅包含两个架构中不同的列的。如果两个模式之一中都不存在列,则其对应的属性为None
我刚遇到了完全相同的问题。从Hive读取数据时,架构的StructField组件有时会在字段中包含Hive元数据metadata。打印模式时看不到它,因为此字段不是toString定义的一部分。
这是我已决定使用的解决方案,在比较之前,我只是获得了具有空元数据的架构副本:
schema.map(_.copy(metadata = Metadata.empty))
小智 5
我以前遇到过这个问题,它是由StructField.metadata属性差异引起的。开箱即用地识别这一点几乎是不可能的,因为对StructField's 的简单检查只会显示名称、数据类型和可为空的值。我的调试建议是比较字段的元数据。可能是这样的:
res39.schema.zip(targetRawData.schema).foreach{ case (r: StructField, t: StructField) => 
  println(s"Field: ${r.name}\n--| res_meta: ${r.metadata}\n--|target_meta: ${t.metadata}")}
如果您想比较模式但忽略元数据,那么我没有很好的解决方案。我能想到的最好方法是迭代StructFields并手动删除元数据,然后创建一个没有元数据的数据帧的临时副本。所以你可以做这样的事情(假设这df是你想要剥离元数据的数据帧):
val schemaWithoutMetadata = StructType(df.schema.map{ case f: StructField => 
  StructField(f.name, f.dataType, f.nullable)
})
val tmpDF = spark.sqlContext.createDataFrame(df.rdd, schemaWithoutMetadata)
然后,您可以直接比较数据帧或按照您尝试的方式比较模式。我认为这个解决方案不会有性能,所以应该只用于小数据集。
选项 1 - StructField.toString
这是基于观察的另一种解决方案,即name + DataType + nullable每列的字符串表示都是唯一的。正如这里看到的 toString 实现StructField已经支持该规则,因此我们可以直接使用它来比较不同模式的列:
import org.apache.spark.sql.types.{StructType, StructField}
val schemaDiff = (s1 :StructType, s2 :StructType) => {
      val s1Keys = s1.map{_.toString}.toSet
      val s2Keys = s2.map{_.toString}.toSet
      val commonKeys =  s1Keys.intersect(s2Keys)
      val diffKeys = s1Keys ++ s2Keys -- commonKeys
      (s1 ++ s2).filter(sf => diffKeys.contains(sf.toString)).toList
}
请注意,字段名称区分大小写,因此不同的列名称意味着不同的列。
步骤:
StructField($name,$dataType,$nullable)选项 2 - case class, eq, ==
StructField和StructType都是案例类,因此我们期望eq方法和==运算符都基于从其成员的值生成的散列。您可以通过应用@cheseaux 指出的更改来确认这一点,例如:
val s1 = StructType(res39.schema.map(_.copy(metadata = Metadata.empty)))
val s2 = StructType(targetRawData.schema.map(_.copy(metadata = Metadata.empty)))
s1 == s2 // true 
这是预期的,因为==可以在两个案例类列表之间应用,并且仅当两个列表包含相同的项目时才返回 true。在前面的例子中,==运算符被应用于两个StructType 
 对象之间,因此Seq[StructField]我们可以在构造函数定义中看到两个对象之间。正如已经讨论过的,由于metadata模式之间的值不同,您的情况下的比较失败了。
注意,==如果我们修改列的顺序,操作符在模式之间是不安全的。那是因为 的列表实现也==考虑了项目的顺序。为了克服这个障碍,我们可以安全地将集合转换为一个集合,toSet如上所示。
最后,我们可以利用上述观察结果将第一个版本重写为下一个:
val schemaDiff = (s1 :StructType, s2 :StructType) => {
      val s1Set = s1.map(_.copy(metadata = Metadata.empty)).toSet
      val s2Set = s2.map(_.copy(metadata = Metadata.empty)).toSet
      val commonItems =  s1Set.intersect(s2Set)
      (s1Set ++ s2Set -- commonItems).toList
}
第二个选项的性能缺陷是我们需要StructField通过设置重新创建项目metadata = Metadata.empty。
| 归档时间: | 
 | 
| 查看次数: | 5537 次 | 
| 最近记录: |