在pyspark的全球柜台

xxx*_*222 2 apache-spark pyspark

为什么我在下面用pyspark写的计数器并不总能为我提供正确的结果,它与全局计数器有关吗?

def increment_counter():
    global counter
    counter += 1

def get_number_of_element(rdd):
    global counter
    counter = 0
    rdd.foreach(lambda x:increment_counter())
    return counter
Run Code Online (Sandbox Code Playgroud)

Dan*_*nai 5

您的全局变量仅在驱动程序节点上定义,这意味着它将在localhost上运行之前正常工作.只要将作业分配给多个进程,他们就无法访问counter变量,只会在自己的进程中创建一个新进程.因此,最终结果将仅包含在驱动程序进程中完成的增量.

您正在寻找的是一种非常常见的用法,并且由Spark的累加器功能涵盖.累加器在过程结束时分发和收集,因此总计将包含所有节点的增量,而不是仅包含驱动程序节点.

累加器 - Spark编程指南