我正在从HDFS检查点恢复流(例如,ConstantInputDSTream),但我一直在努力SparkException: <X> has not been initialized.
从checkpointing恢复时,我需要做些什么吗?
我可以看到,它要DStream.zeroTime设置,但在恢复流时zeroTime是null.由于它是私人成员IDK,因此无法恢复.我可以看到StreamingContext恢复的流引用的确有一个值zeroTime.
initialize是一种私有方法,可以被调用StreamingContext.graph.start但不是被调用StreamingContext.graph.restart,大概是因为它预期zeroTime会被持久化.
有人有一个从检查点恢复并具有非空值的流的示例zeroTime吗?
def createStreamingContext(): StreamingContext = {
val ssc = new StreamingContext(sparkConf, Duration(1000))
ssc.checkpoint(checkpointDir)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)
val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)
Run Code Online (Sandbox Code Playgroud) scala.util.Success没有apply方法但继承了scala.util.Try(AFAIK)的方法.但所有的一切scala.util.Try.apply都是Success.apply被一个try catch街区所包围.这怎么不是无限递归的,它是如何触发catch阻塞的呢?