为什么Spark从检查点恢复时抛出"SparkException:DStream尚未初始化"?

Sha*_*lla 7 apache-spark spark-streaming checkpointing

我正在从HDFS检查点恢复流(例如,ConstantInputDSTream),但我一直在努力SparkException: <X> has not been initialized.

从checkpointing恢复时,我需要做些什么吗?

我可以看到,它要DStream.zeroTime设置,但在恢复流时zeroTimenull.由于它是私人成员IDK,因此无法恢复.我可以看到StreamingContext恢复的流引用的确有一个值zeroTime.

initialize是一种私有方法,可以被调用StreamingContext.graph.start但不是被调用StreamingContext.graph.restart,大概是因为它预期zeroTime会被持久化.

有人有一个从检查点恢复并具有非空值的流的示例zeroTime吗?

def createStreamingContext(): StreamingContext = {
    val ssc = new StreamingContext(sparkConf, Duration(1000))
    ssc.checkpoint(checkpointDir)
    ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)

val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)
Run Code Online (Sandbox Code Playgroud)

Sha*_*lla 12

问题是我在从检查点重新创建StreamingContext之后创建了dstreams,即之后StreamingContext.getOrCreate.应该创建dstreams和所有转换createStreamingContext.

当从检查点恢复StreamingContext并且之后创建了dstream时,该问题被填充为[SPARK-13​​316]"SparkException:DStream尚未初始化".

  • 您能举一些如何摆脱问题的例子吗?我无法从检查点StreamingContext获取ReceiverStreamObject (2认同)