为什么工作节点看不到另一个工作节点上累加器的更新?

Hri*_*lov 5 java apache-spark

LongAccumulator在地图操作中使用a 作为共享计数器。但是似乎我没有正确使用它,因为工作节点上计数器的状态没有更新。这是我的计数器类的样子:

public class Counter implements Serializable {

   private LongAccumulator counter;

   public Long increment() {
      log.info("Incrementing counter with id: " + counter.id() + " on thread: " + Thread.currentThread().getName());
      counter.add(1);
      Long value = counter.value();
      log.info("Counter's value with id: " + counter.id() + " is: " + value + " on thread: " + Thread.currentThread().getName());
      return value;
   }

   public Counter(JavaSparkContext javaSparkContext) {
      counter = javaSparkContext.sc().longAccumulator();
   }
}
Run Code Online (Sandbox Code Playgroud)

据我所了解的文档,当应用程序在多个工作程序节点中运行时,这应该可以正常工作:

累加器是仅通过关联和交换操作“添加”的变量,因此可以有效地并行支持。它们可用于实现计数器(如MapReduce中的计数器)或总和。Spark本身支持数字类型的累加器,程序员可以添加对新类型的支持。

但是,这是当计数器在2个不同的工作线程上递增且看起来状态未在节点之间共享时的结果:

INFO计数器:在线程上ID为866的递增计数器:Executor任务启动worker-6 INFO计数器:在线程上ID为866的计数器值为:1在线程上:Executor任务启动worker-6
INFO计数器:ID为866的递增计数器:线程:执行程序任务启动worker-0 INFO计数器:ID为866的计数器的值是:1在线程上:执行程序任务启动worker-0

我是否理解累加器概念错误,或者必须使用任何设置来启动任务?

zer*_*323 3

不应该工作

\n\n
\n

然后可以使用 add 方法将在集群上运行的任务添加到集群中。然而,他们无法读取其价值。只有驱动程序可以使用其值方法读取accumulator\xe2\x80\x99s 值。

\n
\n\n

每个任务都有自己的累加器,一旦任务完成并报告结果,该累加器会在本地更新,并与驱动程序上的“共享”副本合并。

\n\n

旧的AccumulatorAPI(现在包装AccumulatorV2)实际上在任务中使用时抛出了异常value,但由于某种原因它在AccumulatorV2.

\n\n

您所经历的实际上类似于此处描述的旧行为How to print Accumulator Variable from inside task(似乎“工作”而不调用 value 方法)?

\n