Spark Accumulator值未被任务读取

Rah*_*ley 2 java apache-spark rdd

我正在初始化一个累加器

final Accumulator<Integer> accum = sc.accumulator(0);

然后在map函数中,我试图递增累加器,然后在设置变量时使用累加器值.

JavaRDD<UserSetGet> UserProfileRDD1 = temp.map(new Function<String, UserSetGet>() {

            @Override
            public UserSetGet call(String arg0) throws Exception {

                    UserSetGet usg = new UserSetGet();

                    accum.add(1);
                    usg.setPid(accum.value().toString();


            }
  });
Run Code Online (Sandbox Code Playgroud)

但我得到以下错误.

16/03/14 09:12:58 ERROR executor.Executor:阶段2.0(TID 2)中任务0.0的异常java.lang.UnsupportedOperationException:无法读取任务中的累加器值

编辑 - 根据Avihoo Mamka的回答,无法在任务中获得累加器值.

那么无论如何我可以并行实现同样的目标.这样,每当变量(例如像静态变量)在我的map函数中递增时,Pid值就会被设置.

Avi*_*mka 7

来自Spark文档

累加器是仅通过关联操作"添加"的变量,因此可以并行有效地支持.它们可用于实现计数器(如MapReduce)或总和

...

只有驱动程序可以使用其value方法读取累加器的值 .

因此,当尝试从Spark中的任务中读取累加器的值时,意味着您尝试从工作程序读取其值,这与仅从驱动程序读取累加器值的概念相反.

  • 您不能使用累加器的值来设置“POJO”值,因为就累加器的当前状态而言,您的工作人员并未与所有其他工作人员同步,只有驱动程序是由于并行性和执行程序的管理而同步的,因此您不能在工作人员内部使用累加器的值。 (2认同)