Vin*_*Bdn 5 reflection serialization scala apache-spark
首先,我使用的是scala 2.10.4,上面的例子是在Spark 1.6中运行的(尽管我怀疑Spark与此有什么关系,但它只是一个序列化问题).
所以这是我的问题:假设我有一个Base由两个类B1和两个类实现的特征B2.现在我有一个通用的特性,它由一组类扩展,其中一个超过了Base例如子类型(这里我保留了Spark的RDD概念,但它实际上只要它被序列化就可以了;有些东西只是一个结果无论实际上是什么):
trait Foo[T] { def function(rdd: RDD[T]): Something }
class Foo1[B <: Base] extends Foo[B] { def function(rdd: RDD[B]): Something = ... }
class Foo2 extends Foo[A] { def function(rdd: RDD[A]): Something = ... }
...
Run Code Online (Sandbox Code Playgroud)
现在我需要一个对象RDD[T](这里假设没有模糊,它只是一个简化版本),它返回Something对应于与类型对应的函数的结果T.但它也适用于Array[T]合并策略.到目前为止它看起来像:
object Obj {
def compute[T: TypeTag](input: RDD[T]): Something = {
typeOf[T] match {
case t if t <:< typeOf[A] =>
val foo = new Foo[T]
foo.function(rdd)
case t if t <:< typeOf[Array[A]] =>
val foo = new Foo[A]
foo.function(rdd.map(x => mergeArray(x.asInstance[Array[A]])))
case t if t <:< typeOf[Base] =>
val foo = new Foo[T]
foo.function(rdd)
// here it gets ugly...
case t if t <:< typeOf[Array[_]] => // doesn't fall through with Array[Base]... why?
val tt = getSubInfo[T](0)
val tpe = tt.tpe
val foo = new Foo[tpe.type]
foo.function(rdd.map(x => (x._1, mergeArray(x._2.asInstanceOf[Array[tpe.type]]))
}
}
// strategy to transform arrays of T into a T object when possible
private def mergeArray[T: TypeTag](a: Array[T]): T = ...
// extract the subtype, e.g. if Array[Int] then at position 0 extracts a type tag for Int, I can provide the code but not fondamental for the comprehension of the problem though
private def getSubInfo[T: TypeTag](i: Int): TypeTag[_] = ...
}
Run Code Online (Sandbox Code Playgroud)
不幸的是,它似乎在本地机器上工作正常,但当它被发送到Spark(序列化)时,我得到一个org.apache.spark.SparkException: Task not serializable:
Caused by: java.io.NotSerializableException: scala.reflect.internal.Symbols$PackageClassSymbol
Serialization stack:
- object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol, value: package types)
- field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol)
Run Code Online (Sandbox Code Playgroud)
我确实有一个解决方法(很明显,列举了可能性),但是出于我的好奇心,有没有办法解决这个问题?为什么Symbol不能序列化,而它们在Manifests中的等价物呢?
谢谢您的帮助.
TypeTags 现在通常可以在 scala 中序列化,但奇怪的是,不能直接类型化(这很奇怪,因为 typetag 包含不是 :-/ 的符号)。
这可能会做你想做的
// implicit constructor TypeTag parameter is serialized.
abstract class TypeAware[T:TypeTag] extends Serializable {
def typ:Type = _typeCached
@transient
lazy val _typeCached:Type = typeOf[T]
}
trait Foo[T] extends Serializable {
def function(rdd: RDD[T]): Something {... impl here?...}
def typ:Type
}
class Concrete[T:TypeTag] extends TypeAware[T] with Foo[T] with Serializable{
def function(rdd: RDD[T]): Something {... impl here?...}
}
Run Code Online (Sandbox Code Playgroud)