我LongAccumulator在地图操作中使用a 作为共享计数器。但是似乎我没有正确使用它,因为工作节点上计数器的状态没有更新。这是我的计数器类的样子:
public class Counter implements Serializable {
private LongAccumulator counter;
public Long increment() {
log.info("Incrementing counter with id: " + counter.id() + " on thread: " + Thread.currentThread().getName());
counter.add(1);
Long value = counter.value();
log.info("Counter's value with id: " + counter.id() + " is: " + value + " on thread: " + Thread.currentThread().getName());
return value;
}
public Counter(JavaSparkContext javaSparkContext) {
counter = javaSparkContext.sc().longAccumulator();
}
}
Run Code Online (Sandbox Code Playgroud)
据我所了解的文档,当应用程序在多个工作程序节点中运行时,这应该可以正常工作:
累加器是仅通过关联和交换操作“添加”的变量,因此可以有效地并行支持。它们可用于实现计数器(如MapReduce中的计数器)或总和。Spark本身支持数字类型的累加器,程序员可以添加对新类型的支持。
但是,这是当计数器在2个不同的工作线程上递增且看起来状态未在节点之间共享时的结果:
INFO计数器:在线程上ID为866的递增计数器:Executor任务启动worker-6 INFO计数器:在线程上ID为866的计数器值为:1在线程上:Executor任务启动worker-6
INFO计数器:ID为866的递增计数器:线程:执行程序任务启动worker-0 INFO计数器:ID为866的计数器的值是:1在线程上:执行程序任务启动worker-0
我是否理解累加器概念错误,或者必须使用任何设置来启动任务?
它不应该工作:
\n\n\n\n\n然后可以使用 add 方法将在集群上运行的任务添加到集群中。然而,他们无法读取其价值。只有驱动程序可以使用其值方法读取accumulator\xe2\x80\x99s 值。
\n
每个任务都有自己的累加器,一旦任务完成并报告结果,该累加器会在本地更新,并与驱动程序上的“共享”副本合并。
\n\n旧的AccumulatorAPI(现在包装AccumulatorV2)实际上在任务中使用时抛出了异常value,但由于某种原因它在AccumulatorV2.
您所经历的实际上类似于此处描述的旧行为How to print Accumulator Variable from inside task(似乎“工作”而不调用 value 方法)?
\n| 归档时间: |
|
| 查看次数: |
468 次 |
| 最近记录: |