Nil*_*esh 19 java serialization scala apache-spark
看看这个问题:Scala + Spark - 任务不可序列化:java.io.NotSerializableExceptionon.当只在类而不是对象上调用闭包外的函数时.
问题:
假设我的映射器可以是函数(def),它在内部调用其他类并创建对象并在其中执行不同的操作.(或者它们甚至可以是扩展(Foo)=> Bar的类并在其apply方法中进行处理 - 但是现在让我们忽略这种情况)
Spark仅支持Java Serialization for closures.有没有办法解决这个问题?我们可以用东西而不是封闭来做我想做的事吗?我们可以使用Hadoop轻松完成这类工作.这一点让Spark几乎无法使用.人们不能指望所有第三方库都将所有类扩展为Serializable!
可能的解决方案:
这样的事情似乎有用吗:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
它当然似乎是一个包装器的答案,但我不知道究竟是怎么回事.
Nil*_*esh 12
我想出了自己如何做到这一点!
您只需要在通过闭包之前序列化对象,然后进行反序列化.即使您的类不是Serializable,这种方法也可以正常工作,因为它在幕后使用Kryo.你需要的只是一些咖喱.;)
这是我如何做到的一个例子:
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
(foo: Foo) : Bar = {
kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()
object Blah(abc: ABC) extends (Foo => Bar) {
def apply(foo: Foo) : Bar = { //This is the real function }
}
Run Code Online (Sandbox Code Playgroud)
随意使Blah像你想要的那样复杂,类,伴随对象,嵌套类,对多个第三方库的引用.
KryoSerializationWrapper参考:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala