Ali*_*iza 2 scala jackson apache-spark
我有一个spark应用程序,它从文件中读取行并尝试使用jackson对它们进行反序列化.为了使这段代码能够工作,我需要在Map操作中定义ObjectMapper(否则我得到一个NullPointerException).
我有以下代码正在工作:
val alertsData = sc.textFile(rawlines).map(alertStr => {
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
broadcastVar.value.readValue(alertStr, classOf[Alert])
})
Run Code Online (Sandbox Code Playgroud)
但是,如果我在地图外部定义映射器并对其进行广播,则会因NullPointerException而失败.
此代码失败:
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
val broadcastVar = sc.broadcast(mapper)
val alertsData = sc.textFile(rawlines).map(alertStr => {
broadcastVar.value.readValue(alertStr, classOf[Alert])
})
Run Code Online (Sandbox Code Playgroud)
我在这里错过了什么?
谢谢,Aliza
事实证明你可以广播映射器.有问题的部分mapper.registerModule(DefaultScalaModule)需要在每个从属(执行器)机器上执行,而不仅仅在驱动程序上执行.
所以这段代码有效:
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
val broadcastVar = sc.broadcast(mapper)
val alertsData = sc.textFile(rawlines).map(alertStr => {
broadcastVar.value.registerModule(DefaultScalaModule)
broadcastVar.value.readValue(alertStr, classOf[Alert])
})
Run Code Online (Sandbox Code Playgroud)
我通过每个分区只运行一次registerModule(而不是RDD中的每个元素)进一步优化了代码.
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
val broadcastVar = sc.broadcast(mapper)
val alertsRawData = sc.textFile(rawlines)
val alertsData = alertsRawData.mapPartitions({ iter: Iterator[String] => broadcastVar.value.registerModule(DefaultScalaModule)
for (i <- iter) yield broadcastVar.value.readValue(i, classOf[Alert]) })
Run Code Online (Sandbox Code Playgroud)
阿里扎
| 归档时间: |
|
| 查看次数: |
1871 次 |
| 最近记录: |