在Scala中将实木复合地板读取为案例类对象的列表

mat*_*ter 3 scala apache-spark parquet

假设您已经将一些案例类的集合写到了实木复合地板上,然后想在另一个spark作业中读取它,回到同一案例类(也就是说,您已经编写了一些List[MyCaseClass]并且想读回它)。

为了通用起见,假设其中MyCaseClass有嵌套的案例类。

目前,我只能使用以下代码蓝图使此工作正常:

  /** applies the secret sauce for coercing to a case class that is implemented by spark's flatMap */
  private def toCaseClass(spark : SparkSession, inputDF : DataFrame) : Dataset[MyCaseClass] = {
    import spark.implicits._
    inputDF.as[MyCaseClass].flatMap(record => {
      Iterator[MyCaseClass](record)
    })
  }
Run Code Online (Sandbox Code Playgroud)

似乎在Spark 2.x中,flatMap将导致进行转换/强制转换的实验性Spark 代码(当使用调试器查看该代码时,在Spark 代码库中将其注释为实验性)。显然,在Java / Scala中,序列化通常是一个棘手的问题。还有其他安全的方法吗?

除星火之外,我发现在其他地方建议的独立代码解决方案在stackoverflow上不稳定,并且支持不佳。

我正在寻找不需要手动编码的干净,声明性方法,即如何转换每个字段,这些方法依赖于受良好支持的实体库,而这些库不依赖于超慢反射而无法克服优雅。这可能是绝望的混合,但这是一种以其案例类为傲并且将Spark作为其主要成就之一的语言。

相反,也欢迎评论为什么不使用案例类!

rlu*_*uta 5

正如Luis Miguel所评论的那样,大多数Dataset API都标记为实验性的,但已经稳定并且已在生产中使用了数年。

Dataset.as [U]的问题

您是完全正确的,.as[MyCaseClass]在显式实例化case类时,简单使用存在一些细微差别:最重要的一点是,Dataset.as[U]不能保证您的数据集仅包含由类型U定义的列,它可能会保留其他数据,这些数据以后可能会中断计算。

这是一个例子:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

case class MyData(value: Int)

val df: DataFrame = spark.createDataset(Seq(1,1,2,3)).withColumn("hidden",rand)

val ds: Dataset[MyData] = df.as[MyData]

ds.distinct.count
res3: Long = 4
Run Code Online (Sandbox Code Playgroud)

即使未在MyData类型中定义数据集,该数据集也会ds保留该hidden列值,并且可能会产生意外结果:将数据集ds视为上述集合的MyData人肯定会希望不重复计数为3而不是4。

如何安全地转换为数据集[MyData]?

如果您明确希望只将案例类列保留在数据集中,则有一个非常简单的解决方案(但性能欠佳):将其提取为RDD并将其重新转换为Dataset [U]。

val ds = df.as[MyData].rdd.toDS()

ds.distinct.count
res5: Long = 3
Run Code Online (Sandbox Code Playgroud)

它基本上可以flatMap用相同的成本来完成您要做什么:Spark需要从其内部行格式反序列化数据,以创建case类实例并将其重新序列化为内部行。它会产生不必要的垃圾,增加内存压力,并可能破坏WholeStage代码生成的优化。

我认为,更好的方法是在将数据集强制转换为指定案例类时,从源数据帧中选择必要的列。这将防止大多数的不良副作用,as[U]但不会产生反序列化/序列化成本。

一种完美的方法是利用Scala功能扩展具有隐式类的现有类和实例的行为:

import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql._

object SparkExtensions {
  implicit class ExtendedDataFrame(df: DataFrame) {
    def to[T <: Product: TypeTag]: Dataset[T] = {
      import df.sparkSession.implicits._
      import org.apache.spark.sql.functions.col
      df.select(Encoders.product[T].schema.map(f => col(f.name)): _*).as[T]
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

有了以上对象,我现在可以修改我的初始代码:

import SparkExtensions._

val ds: Dataset[MyData] = df.to[MyData]

ds.distinct.count
res11: Long = 3

ds.printSchema
root
 |-- value: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)