λ A*_*r λ 9 json scala hdfs json4s apache-spark
基本上,我必须使用Spark分析HDFS上的一些复杂的JSON.
我使用"for comprehensions"来(预)过滤json的jSON和"extract"方法,将它包装成一个case类
这个工作正常!
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized
}
Run Code Online (Sandbox Code Playgroud)
到现在为止还挺好!
当我尝试将(预)过滤的JSON提取到我的CaseClass时,我得到这个:
线程"main"中的异常org.apache.spark.SparkException:由于阶段失败而中止作业:任务不可序列化:java.io.NotSerializableException:org.json4s.DefaultFormats $
这里带有提取的代码:
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized.extract[View]
}
Run Code Online (Sandbox Code Playgroud)
我已经在scala上尝试了我的代码,以及它的工作!我真的很擅长使用hdfs和spark,所以我会很感激.
Spark序列化RDD转换上的闭包,并将其"发送"给工作者以进行分布式执行.这要求闭包内的所有代码(通常也包含在包含对象中)应该是可序列化的.
看看org.json4s.DefaultFormat $(该特征的伴随对象)的impl :
object DefaultFormats extends DefaultFormats {
val losslessDate = new ThreadLocal(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
val UTC = TimeZone.getTimeZone("UTC")
}
Run Code Online (Sandbox Code Playgroud)
很明显,这个对象不是可序列化的,不能这样做.(ThreadLocal本身就是不可序列化的)
您似乎没有Date
在代码上使用类型,那么您可以implicit val formats = DefaultFormats
通过可序列化的东西摆脱
或替换DefaultFormats吗?