Hoo*_*erd 5 csv validation schema scala apache-spark
我在尝试使用Spark简单读取CSV文件时遇到了问题.经过这样的操作后,我想确保:
这是我使用的代码,但有以下问题:
val schema = Encoders.product[T].schema
val df = spark.read
.schema(schema)
.option("header", "true")
.csv(fileName)
Run Code Online (Sandbox Code Playgroud)
类型T
是类型Product
,即案例类.这可行,但它不检查列名是否正确,所以我可以给另一个文件,只要数据类型是正确的,不会发生错误,我不知道用户提供了错误的文件,但有些巧合正确的数据类型和正确的顺序.
我尝试使用推断模式然后.as[T]
在数据集上使用方法的选项,但是如果除了String
包含null 之外的任何列只有它由Spark解释为String
列,但在我的模式中它是Integer
.因此会发生强制转换异常,但已经检查了列名称.
总结一下:我找到了解决方案,我可以确保正确的数据类型,但没有标题和其他解决方案,我可以验证标题,但有数据类型的问题.有没有办法实现这两者,即标题和类型的完整验证?
我正在使用Spark 2.2.0.
看起来你必须自己通过阅读文件头两次来完成。
查看 Spark 的代码,如果用户提供自己的架构,则推断的标头将被完全忽略(从未实际读取),因此无法使 Spark 因这种不一致而失败。
要自己执行此比较:
val schema = Encoders.product[T].schema
// read the actual schema; This shouldn't be too expensive as Spark's
// laziness would avoid actually reading the entire file
val fileSchema = spark.read
.option("header", "true")
.csv("test.csv").schema
// read the file using your own schema. You can later use this DF
val df = spark.read.schema(schema)
.option("header", "true")
.csv("test.csv")
// compare actual and expected column names:
val badColumnNames = fileSchema.fields.map(_.name)
.zip(schema.fields.map(_.name))
.filter { case (actual, expected) => actual != expected }
// fail if any inconsistency found:
assert(badColumnNames.isEmpty,
s"file schema does not match expected; Bad column names: ${badColumnNames.mkString("; ")}")
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
4940 次 |
最近记录: |