没有找到对应于带有可序列化和 Base 的 Product 的 Java 类

Mil*_*avi 5 java scala apache-spark rdd apache-spark-dataset

我已经写two case class了扩展 Base abstract class。我有每个班级的两个列表(listAlistB)。当我想合并这两个列表时,我无法将最终列表转换为 Apache Spark 1.6.1 数据集。

abstract class Base

case class A(name: String) extends Base
case class B(age: Int) extends Base

val listA: List[A] = A("foo")::A("bar")::Nil
val listB: List[B] = B(10)::B(20)::Nil
val list: List[Base with Product with Serializable] = listA ++ listB

val result: RDD[Base with Product with Serializable] = sc.parallelize(list).toDS()
Run Code Online (Sandbox Code Playgroud)

Apache Spark 将引发此异常:

A needed class was not found. This could be due to an error in your runpath. Missing class: no Java class corresponding to Base with Product with Serializable found
java.lang.NoClassDefFoundError: no Java class corresponding to Base with Product with Serializable found
    at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1299)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:192)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:50)
    at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)
Run Code Online (Sandbox Code Playgroud)

当我想从listSpark创建 RDD 时不会抛出任何异常,但是当我使用toDS()方法将 RDD 转换为数据集时,这个先前的异常将抛出。

Ale*_*nov 4

首先,您可以list通过显式地使其成为一个更明智的类型List[Base],或者通过添加Base extends Product with Serializable(如果其目的只是通过案例类/对象来扩展它)来获得更明智的类型。但这还不够,因为

Spark 1.6 支持自动生成各种类型的编码器,包括原始类型(例如 String、Integer、Long)、Scala 案例类和 Java Bean。

Base请注意,不支持类似抽象类。并且也不支持自定义编码器。尽管您可以尝试使用kryo(或javaSerialization,作为最后的手段)编码器,请参阅如何在数据集中存储自定义对象?

这是完整的工作示例:

abstract class Base extends Serializable with Product

case class A(name: String) extends Base

case class B(age: Int) extends Base

object BaseEncoder {
  implicit def baseEncoder: org.apache.spark.Encoder[Base] = org.apache.spark.Encoders.kryo[Base]
}


val listA: Seq[A] = Seq(A("a"), A("b"))
val listB: Seq[B] = Seq(B(1), B(2))
val list: Seq[Base] = listA ++ listB

val ds = sc.parallelize(list).toDS
Run Code Online (Sandbox Code Playgroud)