Spark上使用json4s的NotSerializableException

λ A*_*r λ 9 json scala hdfs json4s apache-spark

基本上,我必须使用Spark分析HDFS上的一些复杂的JSON.

我使用"for comprehensions"来(预)过滤json的jSON和"extract"方法,将它包装成一个case类

这个工作正常!

def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]],  t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
  val jsonObj = parse(jsonString)
  val listsOfView = for {
    JObject(value) <- jsonObj
    JField(("v"), JObject(views)) <- value
    normalized <- views.map(x => (x._2))
  } yield normalized
}
Run Code Online (Sandbox Code Playgroud)

到现在为止还挺好!

当我尝试将(预)过滤的JSON提取到我的CaseClass时,我得到这个:

线程"main"中的异常org.apache.spark.SparkException:由于阶段失败而中止作业:任务不可序列化:java.io.NotSerializableException:org.json4s.DefaultFormats $

这里带有提取的代码:

def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]],  t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
  val jsonObj = parse(jsonString)
  val listsOfView = for {
    JObject(value) <- jsonObj
    JField(("v"), JObject(views)) <- value
    normalized <- views.map(x => (x._2))
  } yield normalized.extract[View]
}
Run Code Online (Sandbox Code Playgroud)

我已经在scala上尝试了我的代码,以及它的工作!我真的很擅长使用hdfs和spark,所以我会很感激.

maa*_*asg 6

Spark序列化RDD转换上的闭包,并将其"发送"给工作者以进行分布式执行.这要求闭包内的所有代码(通常也包含在包含对象中)应该是可序列化的.

看看org.json4s.DefaultFormat $(该特征的伴随对象)的impl :

object DefaultFormats extends DefaultFormats {
    val losslessDate = new ThreadLocal(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
    val UTC = TimeZone.getTimeZone("UTC")

}
Run Code Online (Sandbox Code Playgroud)

很明显,这个对象不是可序列化的,不能这样做.(ThreadLocal本身就是不可序列化的)

您似乎没有Date在代码上使用类型,那么您可以implicit val formats = DefaultFormats通过可序列化的东西摆脱 或替换DefaultFormats吗?