flink 解析地图中的 JSON:InvalidProgramException:任务不可序列化

flu*_*y03 2 serialization scala jackson apache-flink flink-streaming

我正在 Flink 项目上工作,想将源 JSON 字符串数据解析为 Json 对象。我正在使用jackson-module-scala进行 JSON 解析。但是,我在 Flink API 中使用 JSON 解析器时遇到了一些问题(map例如)。

以下是代码的一些示例,我无法理解其行为背后的原因。

情况一:

在这种情况下,我正在做jackson-module-scala 的官方 exmaple 代码告诉我要做的事情

  1. 创建一个新的ObjectMapper
  2. 注册DefaultScalaModule

    DefaultScalaModule是一个 Scala 对象,包含对所有当前支持的 Scala 数据类型的支持。

  3. 调用readValue以将 JSON 解析为Map

我得到的错误是:org.apache.flink.api.common.InvalidProgramException:Task not serializable

object JsonProcessing {
  def main(args: Array[String]) {

    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // get input data
    val text = env.readTextFile("xxx")

    val mapper = new ObjectMapper
    mapper.registerModule(DefaultScalaModule)
    val counts = text.map(mapper.readValue(_, classOf[Map[String, String]]))

    // execute and print result
    counts.print()

    env.execute("JsonProcessing")
  }

}
Run Code Online (Sandbox Code Playgroud)

情况2:

然后我做了一些谷歌,并提出了以下解决方案,其中registerModule移动到map函数中。

val mapper = new ObjectMapper
val counts = text.map(l => {
  mapper.registerModule(DefaultScalaModule)
  mapper.readValue(l, classOf[Map[String, String]])
})
Run Code Online (Sandbox Code Playgroud)

但是,我无法理解的是:为什么调用外部定义对象的方法会起作用mapper是因为它ObjectMapper本身是可序列化的,如此处所述ObjectMapper.java#L114吗?

现在,JSON 解析工作正常,但每次我都必须调用它,mapper.registerModule(DefaultScalaModule)我认为这可能会导致一些性能问题(是吗?)。我还尝试了另一种解决方案,如下。

情况3:

我创建了一个新的case class Jsen,并将其用作相应的解析类,注册 Scala 模块。而且它也运行良好。

但是,如果您的输入 JSON 经常变化,那么这就不那么灵活了。管理类是不可维护的Jsen

case class Jsen(
  @JsonProperty("a") a: String,
  @JsonProperty("c") c: String,
  @JsonProperty("e") e: String
)

object JsonProcessing {
  def main(args: Array[String]) {
    ...
    val mapper = new ObjectMapper
    val counts = text.map(mapper.readValue(_, classOf[Jsen]))
    ...

}
Run Code Online (Sandbox Code Playgroud)

此外,我还尝试使用JsonNode而不调用registerModule如下:

    ...
    val mapper = new ObjectMapper
    val counts = text.map(mapper.readValue(_, classOf[JsonNode]))
    ...
Run Code Online (Sandbox Code Playgroud)

它也工作得很好。

我的主要问题是:实际上是什么导致了 Task在幕后无法序列化的问题?registerModule(DefaultScalaModule)

如何确定您的代码在编码过程中是否可能导致此不可序列化问题?

Ser*_*gGr 5

事实是 Apache Flink 被设计为分布式的。这意味着它需要能够远程运行您的代码。所以这意味着你的所有处理函数都应该是可序列化的。在当前的实现中,即使您不会在任何分布式模式下运行流处理,也可以在构建流处理时尽早确保这一点。这是一种权衡,其明显的好处是为您提供反馈到破坏此契约的那一行(通过异常堆栈跟踪)。

所以当你写的时候

val counts = text.map(mapper.readValue(_, classOf[Map[String, String]]))
Run Code Online (Sandbox Code Playgroud)

你实际写的是类似的东西

val counts = text.map(new Function1[String, Map[String, String]] {
    val capturedMapper = mapper

    override def apply(param: String) = capturedMapper.readValue(param, classOf[Map[String, String]])
})
Run Code Online (Sandbox Code Playgroud)

这里重要的是,您从外部上下文中捕获并将其存储为必须可序列化的对象mapper的一部分。Function1这意味着mapper必须是可序列化的。Jackson 库的设计者认识到这种需求,并且由于映射器中没有本质上不可序列化的内容,因此他们将ObjectMapper其默认Module值设为可序列化。不幸的是,Scala Jackson 模块的设计者错过了这一点,并通过将所有子类DefaultScalaModule设为不可序列化,使其深度不可序列化。ScalaTypeModifier这就是为什么你的第二个代码可以工作,而第一个代码却不能:“原始”ObjectMapper是可序列化的,而ObjectMapper预注册DefaultScalaModule则不是。

有一些可能的解决方法。可能最简单的就是包裹ObjectMapper

object MapperWrapper extends java.io.Serializable {
  // this lazy is the important trick here
  // @transient adds some safety in current Scala (see also Update section)
  @transient lazy val mapper = {
    val mapper = new ObjectMapper
    mapper.registerModule(DefaultScalaModule)
    mapper
  }

  def readValue[T](content: String, valueType: Class[T]): T = mapper.readValue(content, valueType)
} 
Run Code Online (Sandbox Code Playgroud)

然后将其用作

val counts = text.map(MapperWrapper.readValue(_, classOf[Map[String, String]]))
Run Code Online (Sandbox Code Playgroud)

这个lazy技巧之所以有效,是因为虽然 的实例DefaultScalaModule不可序列化,但创建 的实例的函数DefaultScalaModule是可序列化的。


更新:@transient 怎么样?

lazy val如果我添加vs ,这里有什么区别@transient lazy val

这实际上是一个棘手的问题。编译出来的内容lazy val实际上是这样的:

object MapperWrapper extends java.io.Serializable {

  // @transient is set or not set for both fields depending on its presence at "lazy val" 
  [@transient] private var mapperValue: ObjectMapper = null
  [@transient] @volatile private var mapperInitialized = false

  def mapper: ObjectMapper = {
    if (!mapperInitialized) {
      this.synchronized {
        val mapper = new ObjectMapper
        mapper.registerModule(DefaultScalaModule)
        mapperValue = mapper
        mapperInitialized = true
      }
    }
    mapperValue
  }


  def readValue[T](content: String, valueType: Class[T]): T = mapper.readValue(content, valueType)
}
Run Code Online (Sandbox Code Playgroud)

@transient影响两个支持字段的位置lazy val。现在你可以明白为什么lazy val这个技巧有效了:

  1. 它在本地有效,因为它会延迟mapperValue字段的初始化,直到第一次访问该方法为止,因此在执行序列化检查时mapper该字段是安全的null

  2. 远程它可以工作,因为它MapperWrapper是完全可序列化的,并且如何初始化的逻辑lazy val被放入同一类的方法中(请参阅 参考资料def mapper)。

但请注意,据我所知,这种编译方式lazy val是当前 Scala 编译器的实现细节,而不是 Scala 规范的一部分。如果稍后某个类似于 .Net 的类Lazy被添加到 Java 标准库中,Scala 编译器可能会开始生成不同的代码。这很重要,因为它提供了一种权衡@transient。现在添加的好处@transient是它可以确保这样的代码也能正常工作:

val someJson:String = "..."
val something:Something = MapperWrapper.readValue(someJson:String, ...)
val counts = text.map(MapperWrapper.readValue(_, classOf[Map[String, String]]))
Run Code Online (Sandbox Code Playgroud)

没有@transient上面的代码将会失败,因为我们强制初始化支持lazy字段,现在它包含一个不可序列化的值。这 @transient不是问题,因为该字段根本不会被序列化。

一个潜在的缺点是@transient是,如果 Scala 更改了代码的生成方式lazy val并将字段标记为@transient,则在远程工作场景中它实际上可能不会被反序列化。

还有一个技巧,object因为对于objects Scala 编译器生成自定义反序列化逻辑(覆盖readResolve)以返回相同的单例对象。这意味着包含 的对象lazy val并未真正反序列化,而是object使用了 本身的值。这意味着@transient lazy valinside比远程场景中的objectinside 更加面向未来。class