地图无法在scala中序列化?

Car*_*ter 27 serialization scala apache-spark

我是Scala的新手.为什么"map"函数不可序列化?如何使其可序列化?例如,如果我的代码如下所示:

val data = sc.parallelize(List(1,4,3,5,2,3,5))

def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
  val lst = List(("a", 1),("b", 2),("c",3), ("a",2))
  var res = List[Int]()
  while (iter.hasNext) {
    val cur = iter.next
    val a = lst.groupBy(x => x._1).mapValues(_.size)
    //val b= a.map(x => x._2)
    res = res ::: List(cur)
  }
  res.iterator
}

data.mapPartitions(myfunc).collect
Run Code Online (Sandbox Code Playgroud)

如果我取消注释该行

val b= a.map(x => x._2)
Run Code Online (Sandbox Code Playgroud)

代码返回一个异常:

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: scala.collection.immutable.MapLike$$anon$2
Serialization stack:
    - object not serializable (class: scala.collection.immutable.MapLike$$anon$2, value: Map(1 -> 3))
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: a, type: interface scala.collection.immutable.Map)
Run Code Online (Sandbox Code Playgroud)

非常感谢你.

Eug*_*nev 58

它是众所周知的scala bug:https://issues.scala-lang.org/browse/SI-7005 Map#mapValues不可序列化

我们在Spark应用程序中map(identity)遇到此问题,解决了这个问题

rdd.groupBy(_.segment).mapValues(v => ...).map(identity)
Run Code Online (Sandbox Code Playgroud)

  • 这对我有用.谢谢!我永远不会想到这一点.为什么这样做? (2认同)

Ksh*_*tha 5

下面提供了该函数的实际实现mapValues,正如您所看到的,它不可序列化,并且仅创建一个视图,而不是正确存在的数据,因此您会收到此错误。就具体情况而言,mapValues可以有很多优势。

protected class MappedValues[C](f: B => C) extends AbstractMap[A, C] with DefaultMap[A, C] {
    override def foreach[D](g: ((A, C)) => D): Unit = for ((k, v) <- self) g((k, f(v)))
    def iterator = for ((k, v) <- self.iterator) yield (k, f(v))
    override def size = self.size
    override def contains(key: A) = self.contains(key)
    def get(key: A) = self.get(key).map(f)
}
Run Code Online (Sandbox Code Playgroud)