And*_* T. 11 scala apache-spark spark-streaming
好吧,我问过一个有点类似的问题涉及到星火内部如何处理异常,但我当时的例子是真不明白还是完整的.那里的答案指向了某个方向,但我无法解释一些事情.
我已经设置了一个虚拟火花流媒体应用程序,在转换阶段,我有一个俄罗斯轮盘表达式,可能会或不会抛出异常.如果抛出异常,我会停止Spark流式上下文.就是这样,没有其他逻辑,没有RDD转变.
object ImmortalStreamingJob extends App {
val conf = new SparkConf().setAppName("fun-spark").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
val elems = (1 to 1000).grouped(10)
.map(seq => ssc.sparkContext.parallelize(seq))
.toSeq
val stream = ssc.queueStream(mutable.Queue[RDD[Int]](elems: _*))
val transformed = stream.transform { rdd =>
try {
if (Random.nextInt(6) == 5) throw new RuntimeException("boom")
else println("lucky bastard")
rdd
} catch {
case e: Throwable =>
println("stopping streaming context", e)
ssc.stop(stopSparkContext = true, stopGracefully = false)
throw e
}
}
transformed.foreachRDD { rdd =>
println(rdd.collect().mkString(","))
}
ssc.start()
ssc.awaitTermination()
}
Run Code Online (Sandbox Code Playgroud)
在IntelliJ中运行它将在某个时刻抛出异常.有趣的部分:
RDD处理了至少一个异常后抛出异常,应用程序在打印错误消息后挂起并且永不停止,这不是我想要的为什么应用程序挂起而不是在第二种情况下死亡?
我在Scala 2.11.8上运行Spark 2.1.0.获取try-catch解决了问题(Spark自行停止).此外,移出try-catch内部foreachRDD解决了这个问题.
但是我正在寻找一个可以帮助我理解这个特定例子中发生了什么的答案.
您只会看到异常在操作(如foreachRDD本例)中体现出来,而不是在转换(如transform本例)中体现,因为操作会延迟执行转换。这意味着您的转变在行动之前不会发生。之所以需要这样做,是因为需要改变您对分布式处理工作方式的思维模型。
考虑一个传统的单线程程序。代码逐行进行,如果抛出异常且未处理,则后续代码行不会执行。在分布式系统中,相同的 Spark 转换在多台机器上并行运行(并且以不同的速度),抛出异常时会发生什么?它并不那么简单,因为一台机器上的异常独立于其他机器上的代码处理,这正是您想要的。希望分布在整个集群中的所有独立任务在出现异常时关闭只是单机思维,无法转化为分布式范例。司机该如何处理呢?
根据现在 Databricks 的 Matei Zaharia 和伯克利 Spark 的创建者之一的说法,“异常应该被发送回驱动程序并在那里记录(SparkException如果任务失败超过 4 次,则抛出异常)。 ”(顺便说一句,可以使用 .) 更改此默认重试次数spark.task.maxFailures。因此,如果 Log4J 在执行器上正确配置,异常将被记录在那里;然后它会被序列化并发送回驱动程序,默认情况下驱动程序会再尝试 3次。
在你的特殊情况下,我猜你可能会遇到一些问题。首先,您在单台机器上运行,这会对分布式模型中异常处理的工作方式产生误导。其次,您过早地停止了上下文。停止上下文是一种极具破坏性的操作,其中包括停止所有侦听器和DAGScheduler. 坦率地说,我不知道当你基本上关掉灯时,你怎么能指望 Spark 如此整齐地完成所有事情。
最后,我要提到的是,更优雅的异常处理模型可能会在Try. 您最终可能会得到更麻烦的代码,因为您的转换将返回RDD[Try[T]]or DStream[Try[T]],这意味着您必须处理每个元素的Success和情况。Failure但是您将能够利用 monad 提供的所有好处(包括映射RDD[Try[A]] => RDD[Try[B]],甚至使用for推导式)向下游传播成功和错误信息(凭借flatMap)。
| 归档时间: |
|
| 查看次数: |
883 次 |
| 最近记录: |