累加器何时真正可靠?

Dan*_*don 47 apache-spark

我想使用累加器来收集有关我在Spark作业上操作的数据的一些统计信息.理想情况下,我会在作业计算所需的转换时执行此操作,但由于Spark会在不同情况下重新计算任务,因此累加器不会反映真实的指标.以下是文档描述的方式:

对于仅在操作内执行的累加器更新,Spark保证每个任务对累加器的更新仅应用一次,即重新启动的任务不会更新该值.在转换中,用户应该知道,如果重新执行任务或作业阶段,则可以多次应用每个任务的更新.

这很令人困惑,因为大多数操作都不允许运行自定义代码(可以使用累加器),它们主要采用先前转换的结果(懒惰).文档还显示了这一点:

val acc = sc.accumulator(0)
data.map(x => acc += x; f(x))
// Here, acc is still 0 because no actions have cause the `map` to be computed.
Run Code Online (Sandbox Code Playgroud)

但是,如果我们data.count()在最后添加,这将保证是正确的(没有重复)或不是吗?显然acc,不使用"仅内部动作",因为地图是一种转变.所以不应该保证.

另一方面,关于相关Jira门票的讨论谈论"结果任务"而不是"行动".例如这里这里.这似乎表明结果确实可以保证是正确的,因为我们在acc之前和行动之前使用,因此应该作为单个阶段计算.

我猜这个"结果任务"的概念与所涉及的操作类型有关,是包含一个动作的最后一个,就像在这个例子中一样,它显示了几个操作如何分成几个阶段(洋红色,从这里拍摄的图像):

将多个操作划分为多个紫色阶段的工作

所以假设,count()在该链的末尾的一个动作将是同一个最后阶段的一部分,我将保证在最后一个地图上使用的累加器不会包含任何重复项?

澄清这个问题会很棒!谢谢.

Aja*_*pta 22

回答"蓄能器何时真正可靠?"的问题.

答案:当它们出现在Action操作中时.

根据Action Task中的文档,即使存在任何重新启动的任务,它也只会更新一次Accumulator.

对于仅在操作内执行的累加器更新,Spark保证每个任务对累加器的更新仅应用一次,即重新启动的任务不会更新该值.在转换中,用户应该知道,如果重新执行任务或作业阶段,则可以多次应用每个任务的更新.

Action允许运行自定义代码.

对于Ex.

val accNotEmpty = sc.accumulator(0)
ip.foreach(x=>{
  if(x!=""){
    accNotEmpty += 1
  }
})
Run Code Online (Sandbox Code Playgroud)

但是,为什么Map + Action即.结果任务操作对于累加器操作不可靠

  1. 由于代码中的一些异常,任务失败.Spark将尝试4次(默认尝试次数).如果任务失败,每次都会发出异常.如果成功,则Spark将继续并且只更新累加器值以获得成功状态和失败状态累加器值将被忽略.
    判决:处理得当
  2. 阶段失败:如果执行程序节点崩溃,用户没有故障而硬件故障 - 如果节点在洗牌阶段出现故障.随机输出存储在本地,如果节点发生故障,那么随机输出就会消失.所以Spark会消失回到生成shuffle输出的阶段,查看需要重新运行的任务,并在其中一个仍处于活动状态的节点上执行它们.重新生成缺失的shuffle输出后,生成map输出的阶段执行了一些它的任务多次.Spark计算所有这些的累加器更新.
    判决:未在Result Task.Accumulator中处理错误输出.
  3. 如果任务运行缓慢,则Spark可以在另一个节点上启动该任务的推测副本.
    判决:未处理.累积器会输出错误的输出.
  4. 缓存的RDD很大,不能驻留在Memory.So中.每当使用RDD时,它将重新运行Map操作以获取RDD,并且再次累加器将由它更新.
    判决:未处理.累积器会输出错误的输出.

因此可能会发生同一个函数可能在同一个数据上运行多次.因为Map操作,Spark不会为累加器更新提供任何保证.

因此最好在Spark中使用Accumulator in Action操作.

要了解有关累积器及其问题的更多信息,请参阅此博客文章 - 作者:Imran Rashid.


Dan*_*bos 19

成功完成任务后,累积器更新将发送回驱动程序.因此,当您确定每个任务只执行一次并且每个任务按预期执行时,您的累加器结果将保证是正确的.

我更喜欢依赖reduceaggregate不是累加器,因为很难枚举任务的所有执行方式.

  • 一个动作启动任务.
  • 如果某个操作依赖于较早的阶段,并且该阶段的结果未被(完全)缓存,则将启动早期阶段的任务.
  • 当检测到少量慢速任务时,推测执行会启动重复任务.

也就是说,有许多简单的情况可以完全信任累加器.

val acc = sc.accumulator(0)
val rdd = sc.parallelize(1 to 10, 2)
val accumulating = rdd.map { x => acc += 1; x }
accumulating.count
assert(acc == 10)
Run Code Online (Sandbox Code Playgroud)

这会保证是正确的(没有重复)吗?

是的,如果禁用推测执行.的mapcount将是一个单一的阶段,所以像你说的,没有办法一个任务可以成功执行一次以上.

但是累加器会更新为副作用.因此,在考虑如何执行代码时,您必须非常小心.考虑这个而不是accumulating.count:

// Same setup as before.
accumulating.mapPartitions(p => Iterator(p.next)).collect
assert(acc == 2)
Run Code Online (Sandbox Code Playgroud)

这也将为每个分区创建一个任务,并且每个任务将保证只执行一次.但是代码map将不会在所有元素上执行,只是每个分区中的第一个元素.

累加器就像一个全局变量.如果您共享对可以递增累加器的RDD的引用,那么其他代码(其他线程)也可能导致它递增.

// Same setup as before.
val x = new X(accumulating) // We don't know what X does.
                            // It may trigger the calculation
                            // any number of times.
accumulating.count
assert(acc >= 10)
Run Code Online (Sandbox Code Playgroud)

  • 你是对的.我只是测试了它,并且不计算失败任务的累加器更新.然后我想如果你禁用了推测执行,只要你确定没有什么可以触发计算多次,累加器可能是值得信赖的.我会仔细看看代码.我可能不得不修改我对累加器的深深不信任:). (2认同)