kin*_*ion 7 scala apache-spark
如何读取序列化堆栈?
我正在 Spark 上构建分布式 NLP 应用程序。我定期遇到这些 NotSerialized 异常,并且总是摸索着解决它们。但是,我从未找到关于序列化堆栈中所有内容含义的良好文档。
如何读取 Scala 中伴随 NotSerialized 错误的序列化堆栈?如何查明导致错误的类或对象?堆栈中的“field”、“object”、“writeObject”和“writeReplace”字段有何意义?
示例如下:
Caused by: java.io.NotSerializableException: MyPackage.testing.PreprocessTest$$typecreator1$1
Serialization stack:
- object not serializable (class: MyPackage.testing.PreprocessTest$$typecreator1$1, value: MyPackage.testing.PreprocessTest$$typecreator1$1@27f6854b)
- writeObject data (class: scala.reflect.api.SerializedTypeTag)
- object (class scala.reflect.api.SerializedTypeTag, scala.reflect.api.SerializedTypeTag@4a571516)
- writeReplace data (class: scala.reflect.api.SerializedTypeTag)
- object (class scala.reflect.api.TypeTags$TypeTagImpl, TypeTag[String])
- field (class: MyPackage.package$$anonfun$deserializeMaps$1, name: evidence$1$1, type: interface scala.reflect.api.TypeTags$TypeTag)
- object (class MyPackage.package$$anonfun$deserializeMaps$1, <function1>)
- field (class: MyPackage.package$$anonfun$deserializeMaps$1$$anonfun$apply$4, name: $outer, type: class MyPackage.package$$anonfun$deserializeMaps$1)
- object (class MyPackage.package$$anonfun$deserializeMaps$1$$anonfun$apply$4, <function1>)
- field (class: MyPackage.package$$anonfun$deserializeMaps$1$$anonfun$apply$4$$anonfun$apply$5, name: $outer, type: class MyPackage.package$$anonfun$deserializeMaps$1$$anonfun$apply$4)
- object (class MyPackage.package$$anonfun$deserializeMaps$1$$anonfun$apply$4$$anonfun$apply$5, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(UDF(tokenMap#149)))
- field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Alias, UDF(UDF(tokenMap#149)) AS tokenMap#3131)
- writeObject data (class: scala.collection.immutable.$colon$colon)
- object (class scala.collection.immutable.$colon$colon, List(id#148, UDF(UDF(tokenMap#149)) AS tokenMap#3131, UDF(UDF(bigramMap#150)) AS bigramMap#3132, sentences#151, se_sentence_count#152, se_word_count#153, se_subjective_count#154, se_objective_count#155, se_document_sentiment#156, UDF(UDF(se_category#157)) AS se_category#3133))
- field (class: org.apache.spark.sql.execution.Project, name: projectList, type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.execution.Project, Project [id#148,UDF(UDF(tokenMap#149)) AS tokenMap#3131,UDF(UDF(bigramMap#150)) AS bigramMap#3132,sentences#151,se_sentence_count#152,se_word_count#153,se_subjective_count#154,se_objective_count#155,se_document_sentiment#156,UDF(UDF(se_category#157)) AS se_category#3133]
+- InMemoryColumnarTableScan [se_sentence_count#152,bigramMap#150,id#148,tokenMap#149,se_word_count#153,sentences#151,se_document_sentiment#156,se_subjective_count#154,se_category#157,se_objective_count#155], InMemoryRelation [id#148,tokenMap#149,bigramMap#150,sentences#151,se_sentence_count#152,se_word_count#153,se_subjective_count#154,se_objective_count#155,se_document_sentiment#156,se_category#157], true, 10000, StorageLevel(true, true, false, true, 1), Union, None
)
- field (class: org.apache.spark.sql.execution.ConvertToSafe, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
- object (class org.apache.spark.sql.execution.ConvertToSafe, ConvertToSafe
+- Project [id#148,UDF(UDF(tokenMap#149)) AS tokenMap#3131,UDF(UDF(bigramMap#150)) AS bigramMap#3132,sentences#151,se_sentence_count#152,se_word_count#153,se_subjective_count#154,se_objective_count#155,se_document_sentiment#156,UDF(UDF(se_category#157)) AS se_category#3133]
+- InMemoryColumnarTableScan [se_sentence_count#152,bigramMap#150,id#148,tokenMap#149,se_word_count#153,sentences#151,se_document_sentiment#156,se_subjective_count#154,se_category#157,se_objective_count#155], InMemoryRelation [id#148,tokenMap#149,bigramMap#150,sentences#151,se_sentence_count#152,se_word_count#153,se_subjective_count#154,se_objective_count#155,se_document_sentiment#156,se_category#157], true, 10000, StorageLevel(true, true, false, true, 1), Union, None
)
- field (class: org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2, name: $outer, type: class org.apache.spark.sql.execution.ConvertToSafe)
- object (class org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2, <function1>)
Run Code Online (Sandbox Code Playgroud)
SerializationDebugger.scala查看的访问方法时,您可以了解该输出的含义。
在写这篇评论时,它看起来像这样:
/**
* Visit the object and its fields and stop when we find an object that is not serializable.
* Return the path as a list. If everything can be serialized, return an empty list.
*/
def visit(o: Any, stack: List[String]): List[String] = {
if (o == null) {
List.empty
} else if (visited.contains(o)) {
List.empty
} else {
visited += o
o match {
// Primitive value, string, and primitive arrays are always serializable
case _ if o.getClass.isPrimitive => List.empty
case _: String => List.empty
case _ if o.getClass.isArray && o.getClass.getComponentType.isPrimitive => List.empty
// Traverse non primitive array.
case a: Array[_] if o.getClass.isArray && !o.getClass.getComponentType.isPrimitive =>
val elem = s"array (class ${a.getClass.getName}, size ${a.length})"
visitArray(o.asInstanceOf[Array[_]], elem :: stack)
case e: java.io.Externalizable =>
val elem = s"externalizable object (class ${e.getClass.getName}, $e)"
visitExternalizable(e, elem :: stack)
case s: Object with java.io.Serializable =>
val elem = s"object (class ${s.getClass.getName}, $s)"
visitSerializable(s, elem :: stack)
case _ =>
// Found an object that is not serializable!
s"object not serializable (class: ${o.getClass.getName}, value: $o)" :: stack
}
}
}
Run Code Online (Sandbox Code Playgroud)
所以这个visit方法会遍历你需要序列化的对象。有多种情况:
有些情况不存在序列化问题:
nullvisited.contains(o)=> 你已经毫无问题地遍历了整个对象o.getClass.isPrimitive=> 你正在序列化一个原始类型String可以序列化o.getClass.isArray && o.getClass.getComponentType.isPrimitive=> 序列化原始类型数组那么有3种情况,遇到比较复杂的对象,需要检查该复杂对象是否包含不可序列化的对象。他们调用序列化堆栈上的visitArray,visitExternalizable和visitSerializable函数,并弹出当前元素(请参阅elem :: stack),并最终再次调用visit。这会递归发生,直到:
visited.contains(o))就您而言,您似乎制作了一个需要序列化的 UDF。它包含一堆Serializable对象(ConvertToSafe、Project、List、Alias、ScalaUDF、一些匿名函数、TypeTags),然后最终反弹到不可序列化的对象:MyPackage.testing.PreprocessTest。因此,如果您希望这一点通过,则需要将最后一项设为可序列化。
希望这可以帮助!
| 归档时间: |
|
| 查看次数: |
329 次 |
| 最近记录: |