Ley*_*Lee 6 apache-flink flink-streaming flink-cep
我遇到了几个可序列化的异常,并且我在Flink的互联网和文档上进行了一些搜索;有一些著名的解决方案,如瞬态、扩展序列化等。每次异常的起源都非常清楚,但就我而言,我无法找到它到底在哪里没有序列化。
问:遇到这种异常应该如何调试?
A.斯卡拉:
class executor ( val sink: SinkFunction[List[String]] {
def exe(): Unit = {
xxx.....addSink(sinks)
}
}
Run Code Online (Sandbox Code Playgroud)
B.scala:
class Main extends App {
def createSink: SinkFunction[List[String]] = new StringSink()
object StringSink {
// static
val stringList: List[String] = List()
}
// create a testing sink
class StringSink extends SinkFunction[List[String]] {
override def invoke(strs: List[String]): Unit = {
// add strs into the variable "stringList" of the compagin object StringSink
}
}
new executor(createSink()).exe()
// then do somethings with the strings
}
Run Code Online (Sandbox Code Playgroud)
例外的是:
SinkFunction的实现不可序列化。该对象可能包含或引用不可序列化的字段。
我发现两个可疑点:
StringSink被传递到另一个文件中。StringSink静态变量。stringList小智 4
我的第一个猜测是你在 StringSink 中没有无参数构造函数
\n如果满足以下条件,Flink 会将数据类型识别为 POJO 类型(并允许 \xe2\x80\x9cby-name\xe2\x80\x9d 字段引用):
\n class StringSink extends SinkFunction[List[String]] {\n public StringSink() {\n }\n \n @override def invoke(strs: List[String]): Unit = {\n // add strs into the variable "stringList" of the compagin object StringSink\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
8974 次 |
| 最近记录: |