如何最有效地将Scala DataFrame的Row转换为case类?

ari*_*ero 44 scala apache-spark apache-spark-sql

一旦我在Spark中获得了一些Row类,无论是Dataframe还是Catalyst,我想将它转换为我的代码中的case类.这可以通过匹配来完成

someRow match {case Row(a:Long,b:String,c:Double) => myCaseClass(a,b,c)}
Run Code Online (Sandbox Code Playgroud)

但是当行有大量的列,比如十几个双打,一些布尔,甚至偶尔的空位时,它变得丑陋.

我想能够将-all-cast转换为myCaseClass.是否可能,或者我已经获得了最经济的语法?

小智 37

DataFrame只是Dataset [Row]的类型别名.与强类型Scala/Java数据集一起提供的"类型转换"相比,这些操作也称为"无类型转换".

从数据集[Row]到Dataset [Person]的转换在spark中非常简单

val DFtoProcess = SQLContext.sql("SELECT * FROM peoples WHERE name='test'")

此时,Spark将您的数据转换为DataFrame = Dataset [Row],这是一个通用Row对象的集合,因为它不知道确切的类型.

// Create an Encoders for Java class (In my eg. Person is a JAVA class)
// For scala case class you can pass Person without .class reference
val personEncoder = Encoders.bean(Person.class) 

val DStoProcess = DFtoProcess.as[Person](personEncoder)
Run Code Online (Sandbox Code Playgroud)

现在,Spark转换Dataset[Row] -> Dataset[Person]特定于类型的Scala/Java JVM对象,如Person类所示.

有关详细信息,请参阅databricks提供的以下链接

https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

  • 你只有一个答案 - 但无论如何它都是一个好的*!在找到这个答案之前,我找不到关于如何创建自定义Spark编码器的*任何*信息.顺便说一下,`scala`方式是`Encoders.bean [Person]` (5认同)
  • @sasgorilla案例类的正确答案如下 - 你需要`import spark.implicits._`然后你可以使用`df.as [Person]` (3认同)
  • 应该注意的是 `as` 是不安全的,因为它不检查强制转换是否有效。我不明白他们是如何在他们的 API 中添加这样一个丑陋的函数的(它应该被称为 `unsafeAs` - 安全的 `as` 在哪里同时进行过滤或返回 `DataSet[Option[T]]`?) (2认同)
  • 当我想将 `Dataset[Row]` 转换为 `Dataset[Person]` 时,这显然是最干净的解决方案,但是如果我只想转换单个 `Row` 对象怎么办?我只依赖于从 `Row` 中的每个字段手动构建 `Person`。有没有更好的办法? (2认同)

Gle*_*olt 23

据我所知,你不能将一行写入一个案例类,但我有时会选择直接访问行字段,比如

map(row => myCaseClass(row.getLong(0), row.getString(1), row.getDouble(2))
Run Code Online (Sandbox Code Playgroud)

我发现这更容易,特别是如果case类构造函数只需要行中的某些字段.

  • 而你避免匹配java nulls的问题:-) (2认同)

sec*_*ree 15

scala> import spark.implicits._    
scala> val df = Seq((1, "james"), (2, "tony")).toDF("id", "name")
df: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> case class Student(id: Int, name: String)
defined class Student

scala> df.as[Student].collectAsList
res6: java.util.List[Student] = [Student(1,james), Student(2,tony)]
Run Code Online (Sandbox Code Playgroud)

这里sparkspark.implicits._是你的SparkSession.如果您在REPL中,会话已经定义为,spark否则您需要相应地调整名称以对应您的SparkSession.

  • 对于Spark 2.1.0,我必须导入spark.implicits._才能完成这项工作 - Scala的优秀解决方案 (3认同)

Gia*_*gna 7

当然,您可以将Row对象匹配到案例类中.假设您的SchemaType有很多字段,并且您希望将它们中的一些匹配到您的案例类中.如果您没有空字段,则可以执行以下操作:

case class MyClass(a: Long, b: String, c: Int, d: String, e: String)

dataframe.map {
  case Row(a: java.math.BigDecimal, 
    b: String, 
    c: Int, 
    _: String,
    _: java.sql.Date, 
    e: java.sql.Date,
    _: java.sql.Timestamp, 
    _: java.sql.Timestamp, 
    _: java.math.BigDecimal, 
    _: String) => MyClass(a = a.longValue(), b = b, c = c, d = d.toString, e = e.toString)
}
Run Code Online (Sandbox Code Playgroud)

在空值的情况下,此方法将失败,并且还要求您明确定义每个单个字段的类型.如果必须处理空值,则需要通过执行操作丢弃包含空值的所有行

dataframe.na.drop()
Run Code Online (Sandbox Code Playgroud)

即使空字段不是您的案例类的模式匹配中使用的空字段,也会丢弃记录.或者,如果要处理它,可以将Row对象转换为List,然后使用选项模式:

case class MyClass(a: Long, b: String, c: Option[Int], d: String, e: String)

dataframe.map(_.toSeq.toList match {
  case List(a: java.math.BigDecimal, 
    b: String, 
    c: Int, 
    _: String,
    _: java.sql.Date, 
    e: java.sql.Date,
    _: java.sql.Timestamp, 
    _: java.sql.Timestamp, 
    _: java.math.BigDecimal, 
    _: String) => MyClass(
      a = a.longValue(), b = b, c = Option(c), d = d.toString, e = e.toString)
}
Run Code Online (Sandbox Code Playgroud)

检查这个github项目Sparkz(),它将很快引入很多库来简化Spark和DataFrame API,并使它们更具功能性.