使用启动方法“spawn”的 Python 多处理不起作用

Adr*_*ian 5 python multiprocessing

我编写了一个 Python 类来并行绘制 pylot。它在默认启动方法是 fork 的 Linux 上工作得很好,但当我在 Windows 上尝试它时,我遇到了问题(可以使用生成启动方法在 Linux 上重现 - 请参阅下面的代码)。我总是会遇到这个错误:

Traceback (most recent call last):
  File "test.py", line 50, in <module>
    test()
  File "test.py", line 7, in test
    asyncPlotter.saveLinePlotVec3("test")
  File "test.py", line 41, in saveLinePlotVec3
    args=(test, ))
  File "test.py", line 34, in process
    p.start()
  File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 112, in start
    self._popen = self._Popen(self)
  File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\popen_spawn_win32.py", line 89, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle weakref objects

C:\Python\MonteCarloTools>Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 99, in spawn_main
    new_handle = reduction.steal_handle(parent_pid, pipe_handle)
  File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\reduction.py", line 82, in steal_handle
    _winapi.PROCESS_DUP_HANDLE, False, source_pid)
OSError: [WinError 87] The parameter is incorrect
Run Code Online (Sandbox Code Playgroud)

我希望有一种方法可以让这段代码适用于 Windows。这里是 Linux 和 Windows 上可用的不同启动方法的链接:https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods

Traceback (most recent call last):
  File "test.py", line 50, in <module>
    test()
  File "test.py", line 7, in test
    asyncPlotter.saveLinePlotVec3("test")
  File "test.py", line 41, in saveLinePlotVec3
    args=(test, ))
  File "test.py", line 34, in process
    p.start()
  File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 112, in start
    self._popen = self._Popen(self)
  File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\popen_spawn_win32.py", line 89, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle weakref objects

C:\Python\MonteCarloTools>Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 99, in spawn_main
    new_handle = reduction.steal_handle(parent_pid, pipe_handle)
  File "C:\Users\adrian\AppData\Local\Programs\Python\Python37\lib\multiprocessing\reduction.py", line 82, in steal_handle
    _winapi.PROCESS_DUP_HANDLE, False, source_pid)
OSError: [WinError 87] The parameter is incorrect
Run Code Online (Sandbox Code Playgroud)

Zac*_*son 7

使用spawnstart 方法时,Process对象本身将被腌制以供子进程使用。在您的代码中,target=target参数是 的绑定方法AsyncPlotter。看起来整个asyncPlotter实例也必须被腌制才能工作,其中包括self.manager,它显然不想被腌制。

简而言之,保持Manager在 之外AsyncPlotter。这适用于我的 macOS 系统:

def test():
    manager = mp.Manager()
    asyncPlotter = AsyncPlotter(manager.Value('i', 0))
    ...
Run Code Online (Sandbox Code Playgroud)

此外,正如您的评论中所述,asyncPlotter重复使用时不起作用。Value我不知道细节,但看起来它与对象如何跨进程共享有关。该test函数需要类似于:

def test():
    manager = mp.Manager()
    nc = manager.Value('i', 0)

    asyncPlotter1 = AsyncPlotter(nc)
    asyncPlotter1.saveLinePlotVec3("test 1")
    asyncPlotter2 = AsyncPlotter(nc)
    asyncPlotter2.saveLinePlotVec3("test 2")

    asyncPlotter1.join()
    asyncPlotter2.join()
Run Code Online (Sandbox Code Playgroud)

总而言之,您可能想要重组代码并使用进程池。它已经处理了并行执行AsyncPlotter的情况cpu_count

from multiprocessing import Pool, set_start_method
from random import random
import time

def linePlotVec3(test):
    time.sleep(random())
    print("test", test)

if __name__ == "__main__":
    set_start_method("spawn")
    with Pool() as pool:
        pool.map(linePlotVec3, range(20))
Run Code Online (Sandbox Code Playgroud)

或者您可以使用 aProcessPoolExecutor来做几乎相同的事情。此示例一次启动一个任务,而不是映射到列表:

from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
import time
from random import random

def work(i):
    r = random()
    print("work", i, r)
    time.sleep(r)

def main():
    ctx = mp.get_context("spawn")
    with ProcessPoolExecutor(mp_context=ctx) as pool:
        for i in range(20):
            pool.submit(work, i)

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)