Ter*_*rry 5 error-handling scala user-defined-functions dataframe apache-spark
Scala 2.11 版和 Spark 2.0.1。
我有一个数据框,我在 udf 中执行一些操作。我希望能够运行操作并仅在失败的行上返回错误。我还想将成功/失败作为附加字段返回。通过/失败可以在单独的列中。
这是我尝试过的:
val df = Seq(("as", 1, "df"), ("1", 2, "3")).toDF("a", "b", "c")
val df1 = Seq(("1", 1, "3"), ("1", 2, "3")).toDF("a", "b", "c")
def myUdf = udf((i: String, j: Int, k: Int) => {
def test (ii:String, jj:Int, kk:Int): Try[Int] = {
val q = i.toInt * j * k.toInt
val m = q * i.toInt
return (Try(q))
}
val q = Try(test(i, j, k)) match {
case Success(lines) => lines.toString
case _ => "Failed"
}
q
})
# First Example
val df2 = df.withColumn("D", myUdf($"a", $"b", $"c")) <-- This fails
# Second Example
val df3 = df1.withColumn("D", myUdf($"a", $"b", $"c"))
df3.show
+---+---+---+----------+
| a| b| c| D|
+---+---+---+----------+
| 1| 1| 3|Success(3)|
| 1| 2| 3|Success(6)|
+---+---+---+----------+
Run Code Online (Sandbox Code Playgroud)
1)如何获得整数情况下的 [0-9] 值(而不是 Success(3) 和 Success(6) - 即删除 Success 和括号 - 3 和 6 可以是字符)?另外,如何为每一行添加成功/失败?
2) 是否可以使用 Try 匹配来检查 Udf 何时失败,而无需在每一步进行错误处理 - 当它在一个计算失败时我们如何进入下一个计算?注意:“测试”方法中有大量计算。
3) 全局检查 udf 的潜在替代方法是什么?
您可以使用 来执行此操作Try,但是请注意,Try应该围绕方法的整个主体,test而不仅仅是应用于结果(您也不应该在return此处使用关键字)。之后使用match得到结果。
def myUdf = udf((i: String, j: Int, k: String) => {
def test(ii: String, jj: Int, kk: String): Try[Int] = Try {
val q = i.toInt * j * k.toInt
val m = q * i.toInt
q
}
test(i, j, k) match {
case Success(lines) => lines.toString
case _ => "Failed"
}
})
Run Code Online (Sandbox Code Playgroud)
请注意,k和kk都是 String 类型,因为这就是您在两个测试数据框中都有的内容。如果您使用Int并且无法隐式转换列值(例如“df”),则该行将不会运行 udf 并且您将获得null.
使用两个数据框的结果:
+---+---+---+------+
| a| b| c| D|
+---+---+---+------+
| as| 1| df|Failed|
| 1| 2| 3| 6|
+---+---+---+------+
+---+---+---+---+
| a| b| c| D|
+---+---+---+---+
| 1| 1| 3| 3|
| 1| 2| 3| 6|
+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)
可以看出,这将只给出值,或者"Failed"作为结果,成功被删除,即结果作为字符串返回。
在test方法失败时,将引发异常,该异常被Try. 这意味着该方法将在失败时退出并且不会继续到最后。
为了找到这些失败的所有行,使用filter方法:df2.filter($"D" === "Failed")。