如何读取 Scala 序列化堆栈(从 Spark)?

kin*_*ion 7 scala apache-spark

如何读取序列化堆栈?

我正在 Spark 上构建分布式 NLP 应用程序。我定期遇到这些 NotSerialized 异常,并且总是摸索着解决它们。但是,我从未找到关于序列化堆栈中所有内容含义的良好文档。

如何读取 Scala 中伴随 NotSerialized 错误的序列化堆栈?如何查明导致错误的类或对象?堆栈中的“field”、“object”、“writeObject”和“writeReplace”字段有何意义?

示例如下:

Caused by: java.io.NotSerializableException: MyPackage.testing.PreprocessTest$$typecreator1$1
Serialization stack:
        - object not serializable (class: MyPackage.testing.PreprocessTest$$typecreator1$1, value: MyPackage.testing.PreprocessTest$$typecreator1$1@27f6854b)
        - writeObject data (class: scala.reflect.api.SerializedTypeTag)
        - object (class scala.reflect.api.SerializedTypeTag, scala.reflect.api.SerializedTypeTag@4a571516)
        - writeReplace data (class: scala.reflect.api.SerializedTypeTag)
        - object (class scala.reflect.api.TypeTags$TypeTagImpl, TypeTag[String])
        - field (class: MyPackage.package$$anonfun$deserializeMaps$1, name: evidence$1$1, type: interface scala.reflect.api.TypeTags$TypeTag)
        - object (class MyPackage.package$$anonfun$deserializeMaps$1, <function1>)
        - field (class: MyPackage.package$$anonfun$deserializeMaps$1$$anonfun$apply$4, name: $outer, type: class MyPackage.package$$anonfun$deserializeMaps$1)
        - object (class MyPackage.package$$anonfun$deserializeMaps$1$$anonfun$apply$4, <function1>)
        - field (class: MyPackage.package$$anonfun$deserializeMaps$1$$anonfun$apply$4$$anonfun$apply$5, name: $outer, type: class MyPackage.package$$anonfun$deserializeMaps$1$$anonfun$apply$4)
        - object (class MyPackage.package$$anonfun$deserializeMaps$1$$anonfun$apply$4$$anonfun$apply$5, <function1>)
        - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
        - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(UDF(tokenMap#149)))
        - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
        - object (class org.apache.spark.sql.catalyst.expressions.Alias, UDF(UDF(tokenMap#149)) AS tokenMap#3131)
        - writeObject data (class: scala.collection.immutable.$colon$colon)
        - object (class scala.collection.immutable.$colon$colon, List(id#148, UDF(UDF(tokenMap#149)) AS tokenMap#3131, UDF(UDF(bigramMap#150)) AS bigramMap#3132, sentences#151, se_sentence_count#152, se_word_count#153, se_subjective_count#154, se_objective_count#155, se_document_sentiment#156, UDF(UDF(se_category#157)) AS se_category#3133))
        - field (class: org.apache.spark.sql.execution.Project, name: projectList, type: interface scala.collection.Seq)
        - object (class org.apache.spark.sql.execution.Project, Project [id#148,UDF(UDF(tokenMap#149)) AS tokenMap#3131,UDF(UDF(bigramMap#150)) AS bigramMap#3132,sentences#151,se_sentence_count#152,se_word_count#153,se_subjective_count#154,se_objective_count#155,se_document_sentiment#156,UDF(UDF(se_category#157)) AS se_category#3133]
+- InMemoryColumnarTableScan [se_sentence_count#152,bigramMap#150,id#148,tokenMap#149,se_word_count#153,sentences#151,se_document_sentiment#156,se_subjective_count#154,se_category#157,se_objective_count#155], InMemoryRelation [id#148,tokenMap#149,bigramMap#150,sentences#151,se_sentence_count#152,se_word_count#153,se_subjective_count#154,se_objective_count#155,se_document_sentiment#156,se_category#157], true, 10000, StorageLevel(true, true, false, true, 1), Union, None
)
        - field (class: org.apache.spark.sql.execution.ConvertToSafe, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
        - object (class org.apache.spark.sql.execution.ConvertToSafe, ConvertToSafe
+- Project [id#148,UDF(UDF(tokenMap#149)) AS tokenMap#3131,UDF(UDF(bigramMap#150)) AS bigramMap#3132,sentences#151,se_sentence_count#152,se_word_count#153,se_subjective_count#154,se_objective_count#155,se_document_sentiment#156,UDF(UDF(se_category#157)) AS se_category#3133]
   +- InMemoryColumnarTableScan [se_sentence_count#152,bigramMap#150,id#148,tokenMap#149,se_word_count#153,sentences#151,se_document_sentiment#156,se_subjective_count#154,se_category#157,se_objective_count#155], InMemoryRelation [id#148,tokenMap#149,bigramMap#150,sentences#151,se_sentence_count#152,se_word_count#153,se_subjective_count#154,se_objective_count#155,se_document_sentiment#156,se_category#157], true, 10000, StorageLevel(true, true, false, true, 1), Union, None
)
        - field (class: org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2, name: $outer, type: class org.apache.spark.sql.execution.ConvertToSafe)
        - object (class org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2, <function1>)
Run Code Online (Sandbox Code Playgroud)

Koe*_*dlt 0

SerializationDebugger.scala查看的访问方法时,您可以了解该输出的含义。

在写这篇评论时,它看起来像这样:

    /**
     * Visit the object and its fields and stop when we find an object that is not serializable.
     * Return the path as a list. If everything can be serialized, return an empty list.
     */
    def visit(o: Any, stack: List[String]): List[String] = {
      if (o == null) {
        List.empty
      } else if (visited.contains(o)) {
        List.empty
      } else {
        visited += o
        o match {
          // Primitive value, string, and primitive arrays are always serializable
          case _ if o.getClass.isPrimitive => List.empty
          case _: String => List.empty
          case _ if o.getClass.isArray && o.getClass.getComponentType.isPrimitive => List.empty

          // Traverse non primitive array.
          case a: Array[_] if o.getClass.isArray && !o.getClass.getComponentType.isPrimitive =>
            val elem = s"array (class ${a.getClass.getName}, size ${a.length})"
            visitArray(o.asInstanceOf[Array[_]], elem :: stack)

          case e: java.io.Externalizable =>
            val elem = s"externalizable object (class ${e.getClass.getName}, $e)"
            visitExternalizable(e, elem :: stack)

          case s: Object with java.io.Serializable =>
            val elem = s"object (class ${s.getClass.getName}, $s)"
            visitSerializable(s, elem :: stack)

          case _ =>
            // Found an object that is not serializable!
            s"object not serializable (class: ${o.getClass.getName}, value: $o)" :: stack
        }
      }
    }
Run Code Online (Sandbox Code Playgroud)

所以这个visit方法会遍历你需要序列化的对象。有多种情况:

有些情况不存在序列化问题:

  • 你的对象是null
  • visited.contains(o)=> 你已经毫无问题地遍历了整个对象
  • o.getClass.isPrimitive=> 你正在序列化一个原始类型
  • String可以序列化
  • o.getClass.isArray && o.getClass.getComponentType.isPrimitive=> 序列化原始类型数组

那么有3种情况,遇到比较复杂的对象,需要检查该复杂对象是否包含不可序列化的对象。他们调用序列化堆栈上的visitArray,visitExternalizablevisitSerializable函数,并弹出当前元素(请参阅elem :: stack),并最终再次调用visit。这会递归发生,直到:

  • 你已经完成了对象的遍历 ( visited.contains(o))
  • 你弹到一个不可序列化的对象上

就您而言,您似乎制作了一个需要序列化的 UDF。它包含一堆Serializable对象(ConvertToSafe、Project、List、Alias、ScalaUDF、一些匿名函数、TypeTags),然后最终反弹到不可序列化的对象:MyPackage.testing.PreprocessTest。因此,如果您希望这一点通过,则需要将最后一项设为可序列化。

希望这可以帮助!