我正在使用一个使用mapWithState和初始RDD的spark流式传输作业.重新启动应用程序并从检查点恢复时,它失败并显示错误:
这个RDD缺少SparkContext.它可能发生在以下情况:
https://issues.apache.org/jira/browse/SPARK-13758中描述了此行为,但并未真正描述如何解决此问题.我的RDD不是由流媒体作业定义的,但我仍然需要在该状态.
这是我的图表的示例:
class EventStreamingApplication {
private val config: Config = ConfigFactory.load()
private val sc: SparkContext = {
val conf = new SparkConf()
.setAppName(config.getString("streaming.appName"))
.set("spark.cassandra.connection.host", config.getString("streaming.cassandra.host"))
val sparkContext = new SparkContext(conf)
System.setProperty("com.amazonaws.services.s3.enableV4", "true")
sparkContext.hadoopConfiguration.set("com.amazonaws.services.s3.enableV4", "true")
sparkContext
}
def run(): Unit = {
// streaming.eventCheckpointDir is an S3 Bucket
val ssc: StreamingContext = StreamingContext.getOrCreate(config.getString("streaming.eventCheckpointDir"), createStreamingContext)
ssc.start()
ssc.awaitTermination()
}
def receiver(ssc: StreamingContext): DStream[Event] = {
RabbitMQUtils.createStream(ssc, Map(
"hosts" -> config.getString("streaming.rabbitmq.host"),
"virtualHost" -> config.getString("streaming.rabbitmq.virtualHost"),
"userName" …Run Code Online (Sandbox Code Playgroud)