Ata*_*ais 7 iterator scala jackson kryo apache-flink
有一个 Scala Flink 应用程序,我在其中使用 Jackson 库解析 JSON。解析由自定义方法处理,它使用延迟启动概念来保持快速。
现在,无论出于何种原因,在 Flink 管道中进一步传递带有惰性值的模型会导致一些奇怪的错误,util.Iterator
这是读取 JSON 的主干。我怀疑问题可能实际上来自Kryo
但我不知道如何确认。值得注意的是,.toList
在同一个(flink)中急切地初始化模型(使用)map
修复了这个问题。但事实并非如此,我想进一步传递我的懒惰模型。
最后,我提供了一个带有演示代码的存储库,但我也想在 StackOverflow 中提供所有详细信息。
示例模型和解析定义:
case class Root(items: Collection[Data])
case class Data(data: Collection[Double])
def toRoot(node: JsonNode): Root = {
val data: util.Iterator[JsonNode] = if (node.hasNonNull("items")) node.get("items").elements() else node.elements()
val items: Collection[Data] = data.asScala.map(x => toData(x))
Root(items)
}
Run Code Online (Sandbox Code Playgroud)
JSON 数据类似于:
{
"items": [
{
"data": [
11.71476355252127,
48.342882259940176,
507.3,
11.714791605037252,
...
Run Code Online (Sandbox Code Playgroud)
并在一部map
作品中完成所有工作:
env.fromCollection(Seq(input))
.map(i => flatten(read(i)))
.print()
Run Code Online (Sandbox Code Playgroud)
但进一步传递失败:
env.fromCollection(Seq(input))
.map(i => read(i))
.map(i => flatten(i))
.print()
Run Code Online (Sandbox Code Playgroud)
随着错误:
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
at java.util.ArrayList$Itr.next(ArrayList.java:861)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
Run Code Online (Sandbox Code Playgroud)
Caused by: java.lang.NullPointerException
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
... 29 more
Run Code Online (Sandbox Code Playgroud)
我创建了一个示范项目,所有这些例子准备测试使用Scala 2.11和2.12,因为它实际上给出了不同的结果提供HERE