cod*_*ape 8 python parallel-processing multiprocessing python-multiprocessing process-pool
语境:
concurrent.futures.process.ProcessPool来执行代码的Python 应用服务器(是的,我知道importlib.reload有警告)
为了让它工作,我想我必须在进程池管理的importlib.reload每个multiprocessing进程中执行。
有没有办法向进程池中的所有进程提交一些东西?
我不知道您提到的热重载尝试将如何发挥作用,但您真正提出的一般问题是可以回答的。
有没有办法向进程池中的所有进程提交某些内容?
这里的挑战在于确保所有流程都得到这个something一次且仅一次,并且在每个进程都得到它之前不会发生进一步的执行。
您可以借助multiprocessing.Barrier(parties[, action[, timeout]]). 屏障将阻止各方呼叫,barrier.wait()直到各方都完成呼叫,然后立即释放所有呼叫。
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
def foo(x):
for _ in range(int(42e4)):
pass
return x
def reload(something):
print(f"{mp.current_process().name} --- reloading {something} and waiting.")
barrier.wait()
print(f"{mp.current_process().name} --- released.")
def init_barrier(barrier):
globals()['barrier'] = barrier
if __name__ == '__main__':
MAX_WORKERS = 4
barrier = mp.Barrier(MAX_WORKERS)
with ProcessPoolExecutor(
MAX_WORKERS, initializer=init_barrier, initargs=(barrier,)
) as executor:
print(list(executor.map(foo, range(10))))
# then something for all processes
futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
for f in futures:
f.result()
print(list(executor.map(foo, range(10))))
Run Code Online (Sandbox Code Playgroud)
示例输出:
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
def foo(x):
for _ in range(int(42e4)):
pass
return x
def reload(something):
print(f"{mp.current_process().name} --- reloading {something} and waiting.")
barrier.wait()
print(f"{mp.current_process().name} --- released.")
def init_barrier(barrier):
globals()['barrier'] = barrier
if __name__ == '__main__':
MAX_WORKERS = 4
barrier = mp.Barrier(MAX_WORKERS)
with ProcessPoolExecutor(
MAX_WORKERS, initializer=init_barrier, initargs=(barrier,)
) as executor:
print(list(executor.map(foo, range(10))))
# then something for all processes
futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
for f in futures:
f.result()
print(list(executor.map(foo, range(10))))
Run Code Online (Sandbox Code Playgroud)
如果您可以保留barrier全局并multiprocessing.get_context()._name返回"fork",则不需要使用,initializer因为全局将被继承并通过分叉访问。