小编Joc*_*uhr的帖子

Spark Streaming Job无法恢复

我正在使用一个使用mapWithState和初始RDD的spark流式传输作业.重新启动应用程序并从检查点恢复时,它失败并显示错误:

这个RDD缺少SparkContext.它可能发生在以下情况:

  1. RDD转换和操作不是由驱动程序调用,而是在其他转换内部调用; 例如,rdd1.map(x => rdd2.values.count()*x)无效,因为无法在rdd1.map转换中执行值转换和计数操作.有关更多信息,请参阅SPARK-5063.
  2. 当Spark Streaming作业从检查点恢复时,如果在DStream操作中使用了未由流作业定义的RDD的引用,则会触发此异常.有关更多信息,请参阅SPARK-13​​758

https://issues.apache.org/jira/browse/SPARK-13​​758中描述了此行为,但并未真正描述如何解决此问题.我的RDD不是由流媒体作业定义的,但我仍然需要在该状态.

这是我的图表的示例:

class EventStreamingApplication {
  private val config: Config = ConfigFactory.load()
  private val sc: SparkContext = {
    val conf = new SparkConf()
      .setAppName(config.getString("streaming.appName"))
      .set("spark.cassandra.connection.host", config.getString("streaming.cassandra.host"))
    val sparkContext = new SparkContext(conf)
    System.setProperty("com.amazonaws.services.s3.enableV4", "true")
    sparkContext.hadoopConfiguration.set("com.amazonaws.services.s3.enableV4", "true")
    sparkContext
  }

  def run(): Unit = {
    // streaming.eventCheckpointDir is an S3 Bucket
    val ssc: StreamingContext = StreamingContext.getOrCreate(config.getString("streaming.eventCheckpointDir"), createStreamingContext)
    ssc.start()
    ssc.awaitTermination()
  }

  def receiver(ssc: StreamingContext): DStream[Event] = {
    RabbitMQUtils.createStream(ssc, Map(
      "hosts" -> config.getString("streaming.rabbitmq.host"),
      "virtualHost" -> config.getString("streaming.rabbitmq.virtualHost"),
      "userName" …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming

11
推荐指数
1
解决办法
713
查看次数

标签 统计

apache-spark ×1

spark-streaming ×1