我想使用累加器来收集有关我在Spark作业上操作的数据的一些统计信息.理想情况下,我会在作业计算所需的转换时执行此操作,但由于Spark会在不同情况下重新计算任务,因此累加器不会反映真实的指标.以下是文档描述的方式:
对于仅在操作内执行的累加器更新,Spark保证每个任务对累加器的更新仅应用一次,即重新启动的任务不会更新该值.在转换中,用户应该知道,如果重新执行任务或作业阶段,则可以多次应用每个任务的更新.
这很令人困惑,因为大多数操作都不允许运行自定义代码(可以使用累加器),它们主要采用先前转换的结果(懒惰).文档还显示了这一点:
val acc = sc.accumulator(0)
data.map(x => acc += x; f(x))
// Here, acc is still 0 because no actions have cause the `map` to be computed.
Run Code Online (Sandbox Code Playgroud)
但是,如果我们data.count()在最后添加,这将保证是正确的(没有重复)或不是吗?显然acc,不使用"仅内部动作",因为地图是一种转变.所以不应该保证.
另一方面,关于相关Jira门票的讨论谈论"结果任务"而不是"行动".例如这里和这里.这似乎表明结果确实可以保证是正确的,因为我们在acc之前和行动之前使用,因此应该作为单个阶段计算.
我猜这个"结果任务"的概念与所涉及的操作类型有关,是包含一个动作的最后一个,就像在这个例子中一样,它显示了几个操作如何分成几个阶段(洋红色,从这里拍摄的图像):

所以假设,count()在该链的末尾的一个动作将是同一个最后阶段的一部分,我将保证在最后一个地图上使用的累加器不会包含任何重复项?
澄清这个问题会很棒!谢谢.
Aja*_*pta 22
回答"蓄能器何时真正可靠?"的问题.
答案:当它们出现在Action操作中时.
根据Action Task中的文档,即使存在任何重新启动的任务,它也只会更新一次Accumulator.
对于仅在操作内执行的累加器更新,Spark保证每个任务对累加器的更新仅应用一次,即重新启动的任务不会更新该值.在转换中,用户应该知道,如果重新执行任务或作业阶段,则可以多次应用每个任务的更新.
Action允许运行自定义代码.
对于Ex.
val accNotEmpty = sc.accumulator(0)
ip.foreach(x=>{
if(x!=""){
accNotEmpty += 1
}
})
Run Code Online (Sandbox Code Playgroud)
但是,为什么Map + Action即.结果任务操作对于累加器操作不可靠?
因此可能会发生同一个函数可能在同一个数据上运行多次.因为Map操作,Spark不会为累加器更新提供任何保证.
因此最好在Spark中使用Accumulator in Action操作.
要了解有关累积器及其问题的更多信息,请参阅此博客文章 - 作者:Imran Rashid.
Dan*_*bos 19
成功完成任务后,累积器更新将发送回驱动程序.因此,当您确定每个任务只执行一次并且每个任务按预期执行时,您的累加器结果将保证是正确的.
我更喜欢依赖reduce而aggregate不是累加器,因为很难枚举任务的所有执行方式.
也就是说,有许多简单的情况可以完全信任累加器.
val acc = sc.accumulator(0)
val rdd = sc.parallelize(1 to 10, 2)
val accumulating = rdd.map { x => acc += 1; x }
accumulating.count
assert(acc == 10)
Run Code Online (Sandbox Code Playgroud)
这会保证是正确的(没有重复)吗?
是的,如果禁用推测执行.的map和count将是一个单一的阶段,所以像你说的,没有办法一个任务可以成功执行一次以上.
但是累加器会更新为副作用.因此,在考虑如何执行代码时,您必须非常小心.考虑这个而不是accumulating.count:
// Same setup as before.
accumulating.mapPartitions(p => Iterator(p.next)).collect
assert(acc == 2)
Run Code Online (Sandbox Code Playgroud)
这也将为每个分区创建一个任务,并且每个任务将保证只执行一次.但是代码map将不会在所有元素上执行,只是每个分区中的第一个元素.
累加器就像一个全局变量.如果您共享对可以递增累加器的RDD的引用,那么其他代码(其他线程)也可能导致它递增.
// Same setup as before.
val x = new X(accumulating) // We don't know what X does.
// It may trigger the calculation
// any number of times.
accumulating.count
assert(acc >= 10)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
12963 次 |
| 最近记录: |