Scalatest和Spark给出"java.io.NotSerializableException:org.scalatest.Assertions $ AssertionsHelper"

Tus*_*ake 9 serialization scala scalatest apache-spark rdd

我正在" com.holdenkarau.spark-testing-base "和scalatest的帮助下测试Spark Streaming应用程序.

import com.holdenkarau.spark.testing.StreamingSuiteBase
import org.apache.spark.rdd.RDD
import org.scalatest.{ BeforeAndAfter, FunSuite }

class Test extends FunSuite with BeforeAndAfter with StreamingSuiteBase {

  var delim: String = ","

  before {
    System.clearProperty("spark.driver.port")
   }

  test(“This Fails“) {

    val source = scala.io.Source.fromURL(getClass.getResource(“/some_logs.csv"))
    val input = source.getLines.toList

    val rowRDDOut = Calculator.do(sc.parallelize(input))   //Returns DataFrame

    val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + delim + row.getAs[String](1))

    source.close
  }
}
Run Code Online (Sandbox Code Playgroud)

我得到字段' delim '的序列化异常:

org.apache.spark.SparkException: Task not serializable
[info]   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
[info]   at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
[info]   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
[info]   at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
[info]   at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
[info]   at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
[info]   at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
[info]   at org.apache.spark.rdd.RDD.map(RDD.scala:323)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
[info] Serialization stack:
[info]  - object not serializable (class: org.scalatest.Assertions$AssertionsHelper, value: org.scalatest.Assertions$AssertionsHelper@78b339fa)
[info]  - field (class: org.scalatest.FunSuite, name: assertionsHelper, type: class org.scalatest.Assertions$AssertionsHelper)
Run Code Online (Sandbox Code Playgroud)

如果我将'delim'替换为String值,它可以正常工作.

val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + “,” + row.getAs[String](1))
Run Code Online (Sandbox Code Playgroud)

第一版和第二版有什么区别?

提前致谢!

mar*_*ios 14

问题不在于delimdelim本身的(String)类型.

尽量不要在test()方法之外定义变量.如果你delm在里面定义test它应该工作.

test(“This Fails“) {
   val delim = ","
   ...
}
Run Code Online (Sandbox Code Playgroud)

现在,你可能会问为什么?好吧,当你delim从外部范围引用时,Scala会尝试将封闭对象组合在一起class Test.该对象包含org.scalatest.Assertions$AssertionsHelper对它不可序列化的引用(请参阅stacktrace).