小编Sha*_*lla的帖子

为什么Spark从检查点恢复时抛出"SparkException:DStream尚未初始化"?

我正在从HDFS检查点恢复流(例如,ConstantInputDSTream),但我一直在努力SparkException: <X> has not been initialized.

从checkpointing恢复时,我需要做些什么吗?

我可以看到,它要DStream.zeroTime设置,但在恢复流时zeroTimenull.由于它是私人成员IDK,因此无法恢复.我可以看到StreamingContext恢复的流引用的确有一个值zeroTime.

initialize是一种私有方法,可以被调用StreamingContext.graph.start但不是被调用StreamingContext.graph.restart,大概是因为它预期zeroTime会被持久化.

有人有一个从检查点恢复并具有非空值的流的示例zeroTime吗?

def createStreamingContext(): StreamingContext = {
    val ssc = new StreamingContext(sparkConf, Duration(1000))
    ssc.checkpoint(checkpointDir)
    ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)

val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming checkpointing

7
推荐指数
1
解决办法
2657
查看次数

为什么scala.util.Success.apply无限递归?

scala.util.Success没有apply方法但继承了scala.util.Try(AFAIK)的方法.但所有的一切scala.util.Try.apply都是Success.apply被一个try catch街区所包围.这怎么不是无限递归的,它是如何触发catch阻塞的呢?

recursion scala try-catch

2
推荐指数
1
解决办法
93
查看次数