提交代码以执行到 concurrent.futures.ProcessPool 中的所有进程

cod*_*ape 8 python parallel-processing multiprocessing python-multiprocessing process-pool

语境:

  • 一个使用 aconcurrent.futures.process.ProcessPool来执行代码的Python 应用服务器
  • 我们有时想在不重启整个服务器进程的情况下热重载导入的代码

(是的,我知道importlib.reload警告

为了让它工作,我想我必须在进程池管理的importlib.reload每个multiprocessing进程中执行。

有没有办法向进程池中的所有进程提交一些东西?

Dar*_*aut 5

我不知道您提到的热重载尝试将如何发挥作用,但您真正提出的一般问题是可以回答的。

有没有办法向进程池中的所有进程提交某些内容?

这里的挑战在于确保所有流程都得到这个something一次且仅一次,并且在每个进程都得到它之前不会发生进一步的执行。

您可以借助multiprocessing.Barrier(parties[, action[, timeout]]). 屏障将阻止各方呼叫,barrier.wait()直到各方都完成呼叫,然后立即释放所有呼叫。

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor


def foo(x):
    for _ in range(int(42e4)):
        pass
    return x


def reload(something):
    print(f"{mp.current_process().name} --- reloading {something} and waiting.")
    barrier.wait()
    print(f"{mp.current_process().name} --- released.")


def init_barrier(barrier):
    globals()['barrier'] = barrier


if __name__ == '__main__':

    MAX_WORKERS = 4
    barrier = mp.Barrier(MAX_WORKERS)

    with ProcessPoolExecutor(
            MAX_WORKERS, initializer=init_barrier, initargs=(barrier,)
    ) as executor:
        print(list(executor.map(foo, range(10))))
        # then something for all processes
        futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
        for f in futures:
            f.result()

        print(list(executor.map(foo, range(10))))
Run Code Online (Sandbox Code Playgroud)

示例输出:

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor


def foo(x):
    for _ in range(int(42e4)):
        pass
    return x


def reload(something):
    print(f"{mp.current_process().name} --- reloading {something} and waiting.")
    barrier.wait()
    print(f"{mp.current_process().name} --- released.")


def init_barrier(barrier):
    globals()['barrier'] = barrier


if __name__ == '__main__':

    MAX_WORKERS = 4
    barrier = mp.Barrier(MAX_WORKERS)

    with ProcessPoolExecutor(
            MAX_WORKERS, initializer=init_barrier, initargs=(barrier,)
    ) as executor:
        print(list(executor.map(foo, range(10))))
        # then something for all processes
        futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
        for f in futures:
            f.result()

        print(list(executor.map(foo, range(10))))
Run Code Online (Sandbox Code Playgroud)

如果您可以保留barrier全局并multiprocessing.get_context()._name返回"fork",则不需要使用,initializer因为全局将被继承并通过分叉访问。