如何调试Flink中的可序列化异常?

Ley*_*Lee 6 apache-flink flink-streaming flink-cep

我遇到了几个可序列化的异常,并且我在Flink的互联网和文档上进行了一些搜索;有一些著名的解决方案,如瞬态、扩展序列化等。每次异常的起源都非常清楚,但就我而言,我无法找到它到底在哪里没有序列化。

问:遇到这种异常应该如何调试?

A.斯卡拉:

class executor ( val sink: SinkFunction[List[String]] {
    def exe(): Unit = {
        xxx.....addSink(sinks)
    }
}
Run Code Online (Sandbox Code Playgroud)

B.scala:

class Main extends App {
  def createSink: SinkFunction[List[String]] = new StringSink()

  object StringSink {
    // static
    val stringList: List[String] = List()
  }

  // create a testing sink
  class StringSink extends SinkFunction[List[String]] {
    override def invoke(strs: List[String]): Unit = {
        // add strs into the variable "stringList" of the compagin object StringSink
    }
  }

  new executor(createSink()).exe()

  // then do somethings with the strings
}
Run Code Online (Sandbox Code Playgroud)

例外的是:

SinkFunction的实现不可序列化。该对象可能包含或引用不可序列化的字段。

我发现两个可疑点:

  1. 的实例StringSink被传递到另一个文件中。
  2. 在 类中,它使用 其 compagin 对象的StringSink静态变量。stringList

小智 4

我的第一个猜测是你在 StringSink 中没有无参数构造函数

\n

POJO 类型的规则从这里剪辑

\n

如果满足以下条件,Flink 会将数据类型识别为 POJO 类型(并允许 \xe2\x80\x9cby-name\xe2\x80\x9d 字段引用):

\n
    \n
  1. 该类是公共且独立的(没有非静态内部类)
  2. \n
  3. 该类有一个公共无参构造函数
  4. \n
  5. 类(以及所有超类)中的所有非静态、非瞬态字段要么是公共的(且非最终的),要么具有遵循 Java bean 的 getter 和 setter 命名约定的公共 getter 和 setter 方法。
  6. \n
\n

只需添加一个无参数构造函数,然后重试

\n
    class StringSink extends SinkFunction[List[String]] {\n        public StringSink() {\n        }\n        \n        @override def invoke(strs: List[String]): Unit = {\n            // add strs into the variable "stringList" of the compagin object StringSink\n        }\n}\n
Run Code Online (Sandbox Code Playgroud)\n