Spark,在EMR中抛出SparkException时的错误行为

Sor*_*mar 7 amazon-emr amazon-dynamodb hadoop-yarn apache-spark

我在EMR中运行一个火花作业,YARN作为资源管理器和2个节点.如果我的条件不满足,我需要故意使步骤失败,因此下一步不会按照配置执行.为了实现这一点,我在dynamoDB中插入日志消息后抛出了一个自定义异常.

它运行正常,但Dynamo中的记录被插入两次.

以下是我的代码.

if(<condition>) {
  <method call to insert in dynamo> 
  throw new SparkException(<msg>);
  return;
}
Run Code Online (Sandbox Code Playgroud)

如果我删除行以抛出异常,它工作正常但步骤已完成.

如何在不获取日志消息两次的情况下使步骤失败.

谢谢您的帮助.

此致,索拉布

Rya*_*ier 2

您的发电机消息被插入两次的原因可能是因为您的错误条件被两个不同的执行程序命中并处理。Spark 将要完成的工作分配给它的工作人员,而这些工作人员不共享任何知识。

我不确定是什么促使您要求 Spark 步骤失败,但我建议您在应用程序代码中跟踪该失败案例,而不是尝试直接让 Spark 死亡。换句话说,编写代码来检测错误并将其传递回 Spark 驱动程序,然后酌情对其进行操作。

实现此目的的一种方法是使用累加器来计算处理数据时发生的任何错误。它看起来大致像这样(我假设是 scala 和 DataFrames,但您可以根据需要适应 RDD 和/或 python):

val accum = sc.longAccumulator("Error Counter")
def doProcessing(a: String, b: String): String = {
   if(condition) {
     accum.add(1)
     null
   }
   else {
     doComputation(a, b)
   }
}
val doProcessingUdf = udf(doProcessing _)

df = df.withColumn("result", doProcessing($"a", $"b"))

df.write.format(..).save(..)  // Accumulator value not computed until an action occurs!

if(accum.value > 0) {
    // An error detected during computation! Do whatever needs to be done.
    <insert dynamo message here>
}
Run Code Online (Sandbox Code Playgroud)

这种方法的一个好处是,如果您在 Spark UI 中寻找反馈,您将能够在运行时看到累加器值。作为参考,这里是有关累加器的文档: http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators