Spark:广播杰克逊ObjectMapper

Ali*_*iza 2 scala jackson apache-spark

我有一个spark应用程序,它从文件中读取行并尝试使用jackson对它们进行反序列化.为了使这段代码能够工作,我需要在Map操作中定义ObjectMapper(否则我得到一个NullPointerException).

我有以下代码正在工作:

val alertsData = sc.textFile(rawlines).map(alertStr => {
      val mapper = new ObjectMapper()
      mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
      mapper.registerModule(DefaultScalaModule)
      broadcastVar.value.readValue(alertStr, classOf[Alert])
    })
Run Code Online (Sandbox Code Playgroud)

但是,如果我在地图外部定义映射器并对其进行广播,则会因NullPointerException而失败.

此代码失败:

val mapper = new ObjectMapper()
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    mapper.registerModule(DefaultScalaModule)
    val broadcastVar = sc.broadcast(mapper)

    val alertsData = sc.textFile(rawlines).map(alertStr => {
      broadcastVar.value.readValue(alertStr, classOf[Alert])
    })
Run Code Online (Sandbox Code Playgroud)

我在这里错过了什么?

谢谢,Aliza

Ali*_*iza 6

事实证明你可以广播映射器.有问题的部分mapper.registerModule(DefaultScalaModule)需要在每个从属(执行器)机器上执行,而不仅仅在驱动程序上执行.

所以这段代码有效:

val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
val broadcastVar = sc.broadcast(mapper)

val alertsData = sc.textFile(rawlines).map(alertStr => {
      broadcastVar.value.registerModule(DefaultScalaModule)
      broadcastVar.value.readValue(alertStr, classOf[Alert])
})
Run Code Online (Sandbox Code Playgroud)

我通过每个分区只运行一次registerModule(而不是RDD中的每个元素)进一步优化了代码.

val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

val broadcastVar = sc.broadcast(mapper)
val alertsRawData = sc.textFile(rawlines)

val alertsData = alertsRawData.mapPartitions({ iter: Iterator[String] => broadcastVar.value.registerModule(DefaultScalaModule)
      for (i <- iter) yield broadcastVar.value.readValue(i, classOf[Alert]) })
Run Code Online (Sandbox Code Playgroud)

阿里扎