mat*_*ter 3 scala apache-spark parquet
假设您已经将一些案例类的集合写到了实木复合地板上,然后想在另一个spark作业中读取它,回到同一案例类(也就是说,您已经编写了一些List[MyCaseClass]并且想读回它)。
为了通用起见,假设其中MyCaseClass有嵌套的案例类。
目前,我只能使用以下代码蓝图使此工作正常:
/** applies the secret sauce for coercing to a case class that is implemented by spark's flatMap */
private def toCaseClass(spark : SparkSession, inputDF : DataFrame) : Dataset[MyCaseClass] = {
import spark.implicits._
inputDF.as[MyCaseClass].flatMap(record => {
Iterator[MyCaseClass](record)
})
}
Run Code Online (Sandbox Code Playgroud)
似乎在Spark 2.x中,flatMap将导致进行转换/强制转换的实验性Spark 代码(当使用调试器查看该代码时,在Spark 代码库中将其注释为实验性)。显然,在Java / Scala中,序列化通常是一个棘手的问题。还有其他安全的方法吗?
除星火之外,我发现在其他地方建议的独立代码解决方案在stackoverflow上不稳定,并且支持不佳。
我正在寻找不需要手动编码的干净,声明性方法,即如何转换每个字段,这些方法依赖于受良好支持的实体库,而这些库不依赖于超慢反射而无法克服优雅。这可能是绝望的混合,但这是一种以其案例类为傲并且将Spark作为其主要成就之一的语言。
相反,也欢迎评论为什么不使用案例类!
正如Luis Miguel所评论的那样,大多数Dataset API都标记为实验性的,但已经稳定并且已在生产中使用了数年。
Dataset.as [U]的问题
您是完全正确的,.as[MyCaseClass]在显式实例化case类时,简单使用存在一些细微差别:最重要的一点是,Dataset.as[U]不能保证您的数据集仅包含由类型U定义的列,它可能会保留其他数据,这些数据以后可能会中断计算。
这是一个例子:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
case class MyData(value: Int)
val df: DataFrame = spark.createDataset(Seq(1,1,2,3)).withColumn("hidden",rand)
val ds: Dataset[MyData] = df.as[MyData]
ds.distinct.count
res3: Long = 4
Run Code Online (Sandbox Code Playgroud)
即使未在MyData类型中定义数据集,该数据集也会ds保留该hidden列值,并且可能会产生意外结果:将数据集ds视为上述集合的MyData人肯定会希望不重复计数为3而不是4。
如何安全地转换为数据集[MyData]?
如果您明确希望只将案例类列保留在数据集中,则有一个非常简单的解决方案(但性能欠佳):将其提取为RDD并将其重新转换为Dataset [U]。
val ds = df.as[MyData].rdd.toDS()
ds.distinct.count
res5: Long = 3
Run Code Online (Sandbox Code Playgroud)
它基本上可以flatMap用相同的成本来完成您要做什么:Spark需要从其内部行格式反序列化数据,以创建case类实例并将其重新序列化为内部行。它会产生不必要的垃圾,增加内存压力,并可能破坏WholeStage代码生成的优化。
我认为,更好的方法是在将数据集强制转换为指定案例类时,从源数据帧中选择必要的列。这将防止大多数的不良副作用,as[U]但不会产生反序列化/序列化成本。
一种完美的方法是利用Scala功能扩展具有隐式类的现有类和实例的行为:
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql._
object SparkExtensions {
implicit class ExtendedDataFrame(df: DataFrame) {
def to[T <: Product: TypeTag]: Dataset[T] = {
import df.sparkSession.implicits._
import org.apache.spark.sql.functions.col
df.select(Encoders.product[T].schema.map(f => col(f.name)): _*).as[T]
}
}
}
Run Code Online (Sandbox Code Playgroud)
有了以上对象,我现在可以修改我的初始代码:
import SparkExtensions._
val ds: Dataset[MyData] = df.to[MyData]
ds.distinct.count
res11: Long = 3
ds.printSchema
root
|-- value: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)