Spark / Scala,数据集和案例类的多态性

Unc*_*air 2 polymorphism design-patterns scala apache-spark

我们将Spark 2.x与Scala一起用于具有13种不同ETL操作的系统。它们中的7个相对简单,每个都由单个域类驱动,并且主要区别在于此类以及如何处理负载的一些细微差别。

加载类的简化版本如下所示,在本示例中,要加载7个比萨饼配料,这里是Pepperoni:

object LoadPepperoni {
  def apply(inputFile: Dataset[Row],
            historicalData: Dataset[Pepperoni],
            mergeFun: (Pepperoni, PepperoniRaw) => Pepperoni): Dataset[Pepperoni] = {
    val sparkSession = SparkSession.builder().getOrCreate()
    import sparkSession.implicits._

    val rawData: Dataset[PepperoniRaw] = inputFile.rdd.map{ case row : Row =>
      PepperoniRaw(
          weight = row.getAs[String]("weight"),
          cost = row.getAs[String]("cost")
        )
    }.toDS()

    val validatedData: Dataset[PepperoniRaw] = ??? // validate the data

    val dedupedRawData: Dataset[PepperoniRaw] = ??? // deduplicate the data

    val dedupedData: Dataset[Pepperoni] = dedupedRawData.rdd.map{ case datum : PepperoniRaw =>
        Pepperoni( value = ???, key1 = ???, key2 = ??? )
    }.toDS()

    val joinedData = dedupedData.joinWith(historicalData,
      historicalData.col("key1") === dedupedData.col("key1") && 
        historicalData.col("key2") === dedupedData.col("key2"),
      "right_outer"
    )

    joinedData.map { case (hist, delta) =>
      if( /* some condition */) {
        hist.copy(value = /* some transformation */)
      }
    }.flatMap(list => list).toDS()
  }
}
Run Code Online (Sandbox Code Playgroud)

换句话说,该类对数据执行了一系列操作,这些操作大部分相同,并且总是按相同的顺序进行,但是每个顶部的操作可能会略有不同,就像从“原始”到“域”的映射以及合并功能一样。

要对7种浇头(例如,蘑菇,奶酪等)执行此操作,我宁愿不要简单地复制/粘贴类并更改所有名称,因为结构和逻辑对于所有负载都是公用的。相反,我宁愿使用通用类型定义通用“ Load”类,如下所示:

object Load {
  def apply[R,D](inputFile: Dataset[Row],
            historicalData: Dataset[D],
            mergeFun: (D, R) => D): Dataset[D] = {
    val sparkSession = SparkSession.builder().getOrCreate()
    import sparkSession.implicits._

    val rawData: Dataset[R] = inputFile.rdd.map{ case row : Row =>
...
Run Code Online (Sandbox Code Playgroud)

对于每个特定于类的操作(例如,从“原始”到“域”的映射或合并),都有一个实现特定特性的特征或抽象类。这将是典型的依赖注入/多态性模式。

但是我遇到了一些问题。从Spark 2.x开始,仅为本机类型和案例类提供编码器,并且无法将类通用地标识为案例类。因此,使用泛型类型时,推断的toDS()和其他隐式功能不可用。

同样如我的相关问题中所述copy,使用泛型时,case类方法也不可用。

我已经研究了Scala和Haskell常见的其他设计模式,例如类型类或即席多态性,但障碍是Spark Dataset基本上只适用于案例类,而不能抽象定义。

看来这将是Spark系统中的常见问题,但我找不到解决方案。任何帮助表示赞赏。

Joe*_*e K 5

启用的隐式转换.toDS为:

implicit def rddToDatasetHolder[T](rdd: RDD[T])(implicit arg0: Encoder[T]): DatasetHolder[T]
Run Code Online (Sandbox Code Playgroud)

(来自https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLImplicits

您完全正确,因为Encoder[T]您已经使Apply方法成为泛型,因此在范围内没有隐式值,因此这种转换不会发生。但是您可以简单地接受一个作为隐式参数!

object Load {
  def apply[R,D](inputFile: Dataset[Row],
            historicalData: Dataset[D],
            mergeFun: (D, R) => D)(implicit enc: Encoder[D]): Dataset[D] = {
...
Run Code Online (Sandbox Code Playgroud)

然后,在调用具有特定类型的负载时,它应该能够找到该类型的编码器。请注意,您也必须import sparkSession.implicits._在调用上下文中。

编辑:类似的方法是newProductEncoder[T <: Product](implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[T]): Encoder[T]通过限制类型(apply[R, D <: Product])并接受隐式JavaUniverse.TypeTag[D]作为参数来使隐式工作。