Yoh*_*age 8 apache-spark spark-streaming
我正在开发一个基于Java的Spark Streaming应用程序,它响应来自Kafka主题的消息.对于每条消息,应用程序执行一些处理,并将结果写回到不同的Kafka主题.
有时由于意外的数据相关问题,在RDD上运行的代码可能会失败并引发异常.当发生这种情况时,我希望有一个通用的处理程序,可以采取必要的操作并将消息发送到错误主题.现在,这些异常是由Spark本身写在Spark的日志中的.
执行此操作的最佳方法是什么,而不是为每个处理RDD的代码块编写try-catch块?
您可以编写一个通用函数来执行此操作。您只需要将其包装在 RDD 操作周围,因为它们是唯一可以引发 Spark 异常的操作(像.map和 之类的转换器.filter是由操作延迟执行的)。
(假设这是在 Scala 中)你甚至可以尝试一些隐式的东西。创建一个包含 RDD 并处理错误的类。这是它可能的样子的草图:
implicit class FailSafeRDD[T](rdd: RDD[T]) {
def failsafeAction[U](fn: RDD[T] => U): Try[U] = Try {
fn(rdd)
}
}
Run Code Online (Sandbox Code Playgroud)
您可以添加错误主题消息failsafeAction或每次失败时您想要执行的任何操作。然后用法可能是这样的:
val rdd = ??? // Some rdd you already have
val resultOrException = rdd.failsafeAction { r => r.count() }
Run Code Online (Sandbox Code Playgroud)
除此之外,我认为“最佳”方法对应用程序需求有些主观。