与 multiprocessing.Pool 共享一个计数器

Bra*_*mon 5 python parallel-processing multiprocessing python-3.x python-multiprocessing

我想使用multiprocessing.Value+multiprocessing.Lock在不同的进程之间共享一个计数器。例如:

import itertools as it
import multiprocessing

def func(x, val, lock):
    for i in range(x):
        i ** 2
    with lock:
        val.value += 1
        print('counter incremented to:', val.value)

if __name__ == '__main__':
    v = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    with multiprocessing.Pool() as pool:
        pool.starmap(func, ((i, v, lock) for i in range(25)))
    print(counter.value())
Run Code Online (Sandbox Code Playgroud)

这将引发以下异常:

RuntimeError:同步对象只能通过继承在进程之间共享

我最困惑的是,一个相关的(虽然不是完全类似的)模式适用于multiprocessing.Process()

if __name__ == '__main__':
    v = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    procs = [multiprocessing.Process(target=func, args=(i, v, lock))
             for i in range(25)]
    for p in procs: p.start()
    for p in procs: p.join()
Run Code Online (Sandbox Code Playgroud)

现在,我认识到这是两件明显不同的事情:

  • 第一个示例使用了许多等于 的工作进程cpu_count(),并range(25)在它们之间拆分了一个可迭代对象
  • 第二个示例创建了 25 个工作进程和任务,每个进程和任务都有一个输入

也就是说:如何以这种方式与pool.starmap()(或pool.map())共享实例?

我在这里这里这里都看到过类似的问题,但是这些方法似乎不适合.map()/ .starmap(),无论是否Value使用ctypes.c_int.


我意识到这种方法在技术上有效:

def func(x):
    for i in range(x):
        i ** 2
    with lock:
        v.value += 1
        print('counter incremented to:', v.value)

v = None
lock = None

def set_global_counter_and_lock():
    """Egh ... """
    global v, lock
    if not any((v, lock)):
        v = multiprocessing.Value('i', 0)
        lock = multiprocessing.Lock()

if __name__ == '__main__':
    # Each worker process will call `initializer()` when it starts.
    with multiprocessing.Pool(initializer=set_global_counter_and_lock) as pool:
        pool.map(func, range(25))
Run Code Online (Sandbox Code Playgroud)

这真的是解决此问题的最佳实践方式吗?

Dar*_*aut 6

使用RuntimeError时得到的Pool是因为池方法的参数在通过(池内部)队列发送到工作进程之前被腌制。您尝试使用哪种池方法在这里无关紧要。当您使用时不会发生这种情况,Process因为不涉及队列。您只需使用pickle.dumps(multiprocessing.Value('i', 0)).

您的最后一个代码片段并不像您认为的那样工作。你是不是共享一个Value,你是为每一个孩子的过程重新创建独立的计数器。

如果您在 Unix 上并使用默认启动方法“fork”,您只需将共享对象作为参数传递到池方法中即可。您的子进程将通过分叉继承全局变量。使用 process-start-methods “spawn”(默认 Windows 和macOS with Python 3.8+)或“forkserver”,你必须initializerPool 实例化过程中使用,让子进程继承共享对象。

请注意,multiprocessing.Lock这里不需要额外的,因为multiprocessing.Value默认情况下有一个可以使用的内部。

import os
from multiprocessing import Pool, Value #, set_start_method


def func(x):
    for i in range(x):
        assert i == i
        with cnt.get_lock():
            cnt.value += 1
            print(f'{os.getpid()} | counter incremented to: {cnt.value}\n')


def init_globals(counter):
    global cnt
    cnt = counter


if __name__ == '__main__':

    # set_start_method('spawn')

    cnt = Value('i', 0)
    iterable = [10000 for _ in range(10)]

    with Pool(initializer=init_globals, initargs=(cnt,)) as pool:
        pool.map(func, iterable)

    assert cnt.value == 100000
Run Code Online (Sandbox Code Playgroud)

可能还值得注意的是,您不需要在所有情况下都共享计数器。如果您只需要跟踪某事总共发生的频率,一个选项是在计算期间保留单独的工作人员本地计数器,并在最后汇总。对于在并行计算本身期间不需要同步的频繁计数器更新,这可能会显着提高性能。