如何处理spark map()函数中的Exception?

use*_*932 6 scala apache-spark

我想在map()函数中忽略Exception,例如:

rdd.map(_.toInt)
Run Code Online (Sandbox Code Playgroud)

其中rdd是一个RDD[String].

但如果它遇到非数字字符串,它将失败.

什么是忽略任何异常并忽略该行的简单方法?(我不想使用过滤器来处理异常,因为可能有很多其他异常......)

小智 20

您可以结合使用Try和map/filter.

如果它们的行为符合预期,Try会将您的计算包装成Success,如果抛出异常,则尝试将失败.然后,您可以过滤所需的内容 - 在这种情况下是成功的计算,但您也可以过滤错误情况以进行日志记录.

以下代码是一个可能的起点.您可以在scastie.org中运行和浏览它,看它是否符合您的需求.

import scala.util.Try

object Main extends App {

  val in = List("1", "2", "3", "abc")

  val out1 = in.map(a => Try(a.toInt))
  val results = out1.filter(_.isSuccess).map(_.get)

  println(results)

}
Run Code Online (Sandbox Code Playgroud)


ban*_*ara 5

我建议您使用过滤器/地图

rdd.filter(r=>NumberUtils.isNumber(r)).map(r=> r.toInt)
Run Code Online (Sandbox Code Playgroud)

或平面图

exampleRDD.flatMap(r=> {if (NumberUtils.isNumber(r)) Some(r.toInt) else  None})
Run Code Online (Sandbox Code Playgroud)

否则你可以在地图函数中捕获异常

myRDD.map(r => { try{
        r.toInt
    }catch {
        case runtime: RuntimeException => {
        -1
        }
    }
})
Run Code Online (Sandbox Code Playgroud)

然后应用过滤器(-1)