Ian*_*Ian 14 java serialization scala apache-spark
我正在尝试在Spark中使用java.time.format中的DateTimeFormatter,但它似乎不可序列化.这是相关的代码块:
val pattern = "<some pattern>".r
val dtFormatter = DateTimeFormatter.ofPattern("<some non-ISO pattern>")
val logs = sc.wholeTextFiles(path)
val entries = logs.flatMap(fileContent => {
val file = fileContent._1
val content = fileContent._2
content.split("\\r?\\n").map(line => line match {
case pattern(dt, ev, seq) => Some(LogEntry(LocalDateTime.parse(dt, dtFormatter), ev, seq.toInt))
case _ => logger.error(s"Cannot parse $file: $line"); None
})
})
Run Code Online (Sandbox Code Playgroud)
我该如何避免java.io.NotSerializableException: java.time.format.DateTimeFormatter
异常?是否有更好的库来解析时间戳?我已经读过Joda也不是可序列化的,并且已经被整合到Java 8的时间库中.
Tza*_*har 29
您可以通过两种方式避免序列化:
假设它的值可以是常量,将格式化程序置于object
(使其"静态").这意味着可以在每个worker中访问静态值,而不是驱动程序序列化它并发送给worker:
object MyUtils {
val dtFormatter = DateTimeFormatter.ofPattern("<some non-ISO pattern>")
}
import MyUtils._
logs.flatMap(fileContent => {
// can safely use formatter here
})
Run Code Online (Sandbox Code Playgroud)在匿名函数中为每个记录实例化它.这会带来一些性能损失(因为实例化会一遍又一遍地发生,每个记录),所以如果第一个不能应用,则只使用此选项:
logs.flatMap(fileContent => {
val dtFormatter = DateTimeFormatter.ofPattern("<some non-ISO pattern>")
// use formatter here
})
Run Code Online (Sandbox Code Playgroud)另一种方法是使 DateTimeFormatter变为瞬态。这告诉 JVM/Spark 该变量不要被序列化,而是从头开始构造。对于每个执行器构造成本低廉的东西(例如 DateTimeFormatter),这是一个很好的方法。
归档时间: |
|
查看次数: |
5864 次 |
最近记录: |