Spark - 任务不可序列化:如何使用调用外部类/对象的复杂映射闭包?

Nil*_*esh 19 java serialization scala apache-spark

看看这个问题:Scala + Spark - 任务不可序列化:java.io.NotSerializableExceptionon.当只在类而不是对象上调用闭包外的函数时.

问题:

假设我的映射器可以是函数(def),它在内部调用其他类并创建对象并在其中执行不同的操作.(或者它们甚至可以是扩展(Foo)=> Bar的类并在其apply方法中进行处理 - 但是现在让我们忽略这种情况)

Spark仅支持Java Serialization for closures.有没有办法解决这个问题?我们可以用东西而不是封闭来做我想做的事吗?我们可以使用Hadoop轻松完成这类工作.这一点让Spark几乎无法使用.人们不能指望所有第三方库都将所有类扩展为Serializable!

可能的解决方案:

这样的事情似乎有用吗:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

它当然似乎是一个包装器的答案,但我不知道究竟是怎么回事.

Nil*_*esh 12

我想出了自己如何做到这一点!

您只需要在通过闭包之前序列化对象,然后进行反序列化.即使您的类不是Serializable,这种方法也可以正常工作,因为它在幕后使用Kryo.你需要的只是一些咖喱.;)

这是我如何做到的一个例子:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}
Run Code Online (Sandbox Code Playgroud)

随意使Blah像你想要的那样复杂,类,伴随对象,嵌套类,对多个第三方库的引用.

KryoSerializationWrapper参考:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala