在 udf 中使用 Try 匹配进行错误处理 - 并记录失败的行

Ter*_*rry 5 error-handling scala user-defined-functions dataframe apache-spark

Scala 2.11 版和 Spark 2.0.1。

我有一个数据框,我在 udf 中执行一些操作。我希望能够运行操作并仅在失败的行上返回错误。我还想将成功/失败作为附加字段返回。通过/失败可以在单独的列中。

这是我尝试过的:

val df = Seq(("as", 1, "df"), ("1", 2, "3")).toDF("a", "b", "c")
val df1 = Seq(("1", 1, "3"), ("1", 2, "3")).toDF("a", "b", "c")

def myUdf = udf((i: String, j: Int, k: Int) => { 
   def test (ii:String, jj:Int, kk:Int): Try[Int] = {
     val q = i.toInt * j * k.toInt
     val m = q * i.toInt
     return (Try(q))
  }
  val q = Try(test(i, j, k)) match { 
    case Success(lines) => lines.toString
    case _ => "Failed"
  }
  q
})

# First Example
val df2 = df.withColumn("D", myUdf($"a", $"b", $"c")) <-- This fails

# Second Example 
val df3 = df1.withColumn("D", myUdf($"a", $"b", $"c"))
df3.show
  +---+---+---+----------+
  |  a|  b|  c|         D|
  +---+---+---+----------+
  |  1|  1|  3|Success(3)|
  |  1|  2|  3|Success(6)|
  +---+---+---+----------+
Run Code Online (Sandbox Code Playgroud)

1)如何获得整数情况下的 [0-9] 值(而不是 Success(3) 和 Success(6) - 即删除 Success 和括号 - 3 和 6 可以是字符)?另外,如何为每一行添加成功/失败?

2) 是否可以使用 Try 匹配来检查 Udf 何时失败,而无需在每一步进行错误处理 - 当它在一个计算失败时我们如何进入下一个计算?注意:“测试”方法中有大量计算。

3) 全局检查 udf 的潜在替代方法是什么?

Sha*_*ica 5

您可以使用 来执行此操作Try,但是请注意,Try应该围绕方法的整个主体,test而不仅仅是应用于结果(您也不应该在return此处使用关键字)。之后使用match得到结果。

def myUdf = udf((i: String, j: Int, k: String) => { 
  def test(ii: String, jj: Int, kk: String): Try[Int] = Try {
    val q = i.toInt * j * k.toInt
    val m = q * i.toInt
    q
  }

  test(i, j, k) match { 
    case Success(lines) => lines.toString
    case _ => "Failed"
  }
})
Run Code Online (Sandbox Code Playgroud)

请注意,kkk都是 String 类型,因为这就是您在两个测试数据框中都有的内容。如果您使用Int并且无法隐式转换列值(例如“df”),则该行将不会运行 udf 并且您将获得null.

使用两个数据框的结果:

+---+---+---+------+
|  a|  b|  c|     D|
+---+---+---+------+
| as|  1| df|Failed|
|  1|  2|  3|     6|
+---+---+---+------+

+---+---+---+---+
|  a|  b|  c|  D|
+---+---+---+---+
|  1|  1|  3|  3|
|  1|  2|  3|  6|
+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)
  1. 可以看出,这将只给出值,或者"Failed"作为结果,成功被删除,即结果作为字符串返回。

  2. test方法失败时,将引发异常,该异常被Try. 这意味着该方法将在失败时退出并且不会继续到最后。

  3. 为了找到这些失败的所有行,使用filter方法:df2.filter($"D" === "Failed")