如何使用 multiprocessing.Manager().Value 存储总和?

kjs*_*kjs 5 python python-multiprocessing

我想使用 multiprocessing.Pool 累积总和。这是我尝试的方法:

import multiprocessing

def add_to_value(addend, value):
    value.value += addend

with multiprocessing.Manager() as manager:
    value = manager.Value(float, 0.0)
    with multiprocessing.Pool(2) as pool:
        pool.starmap(add_to_value,
                     [(float(i), value) for i in range(100)])
    print(value.value)
Run Code Online (Sandbox Code Playgroud)

这会给出不正确甚至不一致的结果。例如,一次给出 2982.0,另一次给出 2927.0。正确的输出是 4950.0,当我在调用 Pool 时仅使用一个进程而不是 2 个进程时,我确实得到了这一结果。我使用的是 Python 3.7.5。

fil*_*den 6

多处理文档(位于multiprocessing.Value)对此非常明确:

涉及读取和写入的操作+=不是原子的。因此,例如,如果您想以原子方式增加共享值,那么仅仅执行 是不够的counter.value += 1

简而言之,您需要抓住一把锁才能做到这一点。

您可以通过以下方式做到这一点:

def add_to_value(addend, value, lock):
    with lock:
        value.value += addend

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        lock = manager.Lock()
        value = manager.Value(float, 0.0)
        with multiprocessing.Pool(2) as pool:
            pool.starmap(add_to_value,
                         [(float(i), value, lock) for i in range(100)])
        print(value.value)
Run Code Online (Sandbox Code Playgroud)

这将正确输出 4950.0。

但请注意,由于需要锁定,这种方法将非常昂贵。最有可能的是,与使用单个进程执行该操作相比,完成该操作需要更多时间。

注意:我还添加了一个防护,当使用除fork之外的启动方法时if __name__ == '__main__':实际上需要它。Windows 和 Mac OS 上的默认值是spawn,因此确实需要使此代码可移植到这两个平台。启动方法spawnforkserver在Linux/Unix上也可用,因此在某些情况下也需要这样做。

当您能够将工作卸载给工作人员并让他们自己完成时,多重处理将会更加高效,例如计算部分总和,然后在主流程中将它们加在一起。如果可能,请考虑重新考虑您的方法以适应该模型。

  • 谢谢,我以后会努力更好地阅读文档。 (2认同)
  • @kjs 我只是喜欢引用文档,主要作为答案的权威来源......祝你的并行编码好运! (2认同)
  • 如果使用“mp.Lock”(在 Windows 上),您的说法是正确的,因为这是我自己尝试过的方法。请注意,我还添加了“spawn”启动方法所需的“if __name__ == '__main__':”保护。(顺便说一句,我认为你应该将后者添加到你的答案中,以使其更便携——尽管我也同意多处理示例函数可能效率太低。) (2认同)