Sha*_*lla 7 apache-spark spark-streaming checkpointing
我正在从HDFS检查点恢复流(例如,ConstantInputDSTream),但我一直在努力SparkException: <X> has not been initialized.
从checkpointing恢复时,我需要做些什么吗?
我可以看到,它要DStream.zeroTime设置,但在恢复流时zeroTime是null.由于它是私人成员IDK,因此无法恢复.我可以看到StreamingContext恢复的流引用的确有一个值zeroTime.
initialize是一种私有方法,可以被调用StreamingContext.graph.start但不是被调用StreamingContext.graph.restart,大概是因为它预期zeroTime会被持久化.
有人有一个从检查点恢复并具有非空值的流的示例zeroTime吗?
def createStreamingContext(): StreamingContext = {
    val ssc = new StreamingContext(sparkConf, Duration(1000))
    ssc.checkpoint(checkpointDir)
    ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)
val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)
Sha*_*lla 12
问题是我在从检查点重新创建StreamingContext之后创建了dstreams,即之后StreamingContext.getOrCreate.应该创建dstreams和所有转换createStreamingContext.
当从检查点恢复StreamingContext并且之后创建了dstream时,该问题被填充为[SPARK-13316]"SparkException:DStream尚未初始化".
| 归档时间: | 
 | 
| 查看次数: | 2657 次 | 
| 最近记录: |