Unc*_*air 2 polymorphism design-patterns scala apache-spark
我们将Spark 2.x与Scala一起用于具有13种不同ETL操作的系统。它们中的7个相对简单,每个都由单个域类驱动,并且主要区别在于此类以及如何处理负载的一些细微差别。
加载类的简化版本如下所示,在本示例中,要加载7个比萨饼配料,这里是Pepperoni:
object LoadPepperoni {
def apply(inputFile: Dataset[Row],
historicalData: Dataset[Pepperoni],
mergeFun: (Pepperoni, PepperoniRaw) => Pepperoni): Dataset[Pepperoni] = {
val sparkSession = SparkSession.builder().getOrCreate()
import sparkSession.implicits._
val rawData: Dataset[PepperoniRaw] = inputFile.rdd.map{ case row : Row =>
PepperoniRaw(
weight = row.getAs[String]("weight"),
cost = row.getAs[String]("cost")
)
}.toDS()
val validatedData: Dataset[PepperoniRaw] = ??? // validate the data
val dedupedRawData: Dataset[PepperoniRaw] = ??? // deduplicate the data
val dedupedData: Dataset[Pepperoni] = dedupedRawData.rdd.map{ case datum : PepperoniRaw =>
Pepperoni( value = ???, key1 = ???, key2 = ??? )
}.toDS()
val joinedData = dedupedData.joinWith(historicalData,
historicalData.col("key1") === dedupedData.col("key1") &&
historicalData.col("key2") === dedupedData.col("key2"),
"right_outer"
)
joinedData.map { case (hist, delta) =>
if( /* some condition */) {
hist.copy(value = /* some transformation */)
}
}.flatMap(list => list).toDS()
}
}
Run Code Online (Sandbox Code Playgroud)
换句话说,该类对数据执行了一系列操作,这些操作大部分相同,并且总是按相同的顺序进行,但是每个顶部的操作可能会略有不同,就像从“原始”到“域”的映射以及合并功能一样。
要对7种浇头(例如,蘑菇,奶酪等)执行此操作,我宁愿不要简单地复制/粘贴类并更改所有名称,因为结构和逻辑对于所有负载都是公用的。相反,我宁愿使用通用类型定义通用“ Load”类,如下所示:
object Load {
def apply[R,D](inputFile: Dataset[Row],
historicalData: Dataset[D],
mergeFun: (D, R) => D): Dataset[D] = {
val sparkSession = SparkSession.builder().getOrCreate()
import sparkSession.implicits._
val rawData: Dataset[R] = inputFile.rdd.map{ case row : Row =>
...
Run Code Online (Sandbox Code Playgroud)
对于每个特定于类的操作(例如,从“原始”到“域”的映射或合并),都有一个实现特定特性的特征或抽象类。这将是典型的依赖注入/多态性模式。
但是我遇到了一些问题。从Spark 2.x开始,仅为本机类型和案例类提供编码器,并且无法将类通用地标识为案例类。因此,使用泛型类型时,推断的toDS()和其他隐式功能不可用。
同样如我的相关问题中所述copy,使用泛型时,case类方法也不可用。
我已经研究了Scala和Haskell常见的其他设计模式,例如类型类或即席多态性,但障碍是Spark Dataset基本上只适用于案例类,而不能抽象定义。
看来这将是Spark系统中的常见问题,但我找不到解决方案。任何帮助表示赞赏。
启用的隐式转换.toDS为:
implicit def rddToDatasetHolder[T](rdd: RDD[T])(implicit arg0: Encoder[T]): DatasetHolder[T]
Run Code Online (Sandbox Code Playgroud)
(来自https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLImplicits)
您完全正确,因为Encoder[T]您已经使Apply方法成为泛型,因此在范围内没有隐式值,因此这种转换不会发生。但是您可以简单地接受一个作为隐式参数!
object Load {
def apply[R,D](inputFile: Dataset[Row],
historicalData: Dataset[D],
mergeFun: (D, R) => D)(implicit enc: Encoder[D]): Dataset[D] = {
...
Run Code Online (Sandbox Code Playgroud)
然后,在调用具有特定类型的负载时,它应该能够找到该类型的编码器。请注意,您也必须import sparkSession.implicits._在调用上下文中。
编辑:类似的方法是newProductEncoder[T <: Product](implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[T]): Encoder[T]通过限制类型(apply[R, D <: Product])并接受隐式JavaUniverse.TypeTag[D]作为参数来使隐式工作。