在 IPython 中运行 ProcessPoolExecutor

Dan*_*ker 6 python ipython multiprocessing python-3.x

我在 MacBook 上的 IPython 解释器(IPython 7.9.0、Python 3.8.0)中运行了一个简单的多处理示例,但遇到了一个奇怪的错误。这是我输入的内容:

[In [1]: from concurrent.futures import ProcessPoolExecutor

[In [2]: executor=ProcessPoolExecutor(max_workers=1)

[In [3]: def func():
             print('Hello')

[In [4]: future=executor.submit(func)
Run Code Online (Sandbox Code Playgroud)

但是,我收到以下错误:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)                                   
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 116, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'func' on <module '__main__' (built-in)>
Run Code Online (Sandbox Code Playgroud)

此外,尝试再次提交作业给了我一个不同的错误:

[In [5]: future=executor.submit(func)                                            
---------------------------------------------------------------------------
BrokenProcessPool                         Traceback (most recent call last)
<ipython-input-5-42bad1a6fe80> in <module>
----> 1 future=executor.submit(func)

/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py in submit(*args, **kwargs)
    627         with self._shutdown_lock:
    628             if self._broken:
--> 629                 raise BrokenProcessPool(self._broken)
    630             if self._shutdown_thread:
    631                 raise RuntimeError('cannot schedule new futures after shutdown')

BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore
Run Code Online (Sandbox Code Playgroud)

作为完整性检查,我将相同(几乎)的代码输入到 Python 文件中,然后从命令行 ( python3 test.py)运行它。它工作得很好。

为什么 IPython 的测试有问题?

编辑:

这是运行良好的 Python 文件。

from concurrent.futures import ProcessPoolExecutor as Executor

def func():
        print('Hello')

if __name__ == '__main__':
        with Executor(1) as executor:
                future=executor.submit(func)
                print(future.result())
Run Code Online (Sandbox Code Playgroud)

Han*_*nnu 9

好的,终于知道是怎么回事了。问题在于 Mac OS - 它默认使用“spawn”方法来创建子进程。这在这里解释了https://docs.python.org/3/library/multiprocessing.html以及将其更改为 fork 的方法(尽管它声明 fork 在 Mac 操作系统上是不安全的)。

使用 spawn 方法启动一个新的 Python 解释器,并将您的代码提供给它。然后尝试在 main 下定位您的函数,但在这种情况下没有 main,因为没有程序,只有解释的命令。

如果您将 start 方法更改为 fork,您的代码将运行(但请注意这不安全的警告)

In [1]: import multiprocessing as mp                                                                                     

In [2]: mp.set_start_method("fork")                                                                                      

In [3]: def func(): 
   ...:     print("foo"); 
   ...:                                               

In [4]: from concurrent.futures import ProcessPoolExecutor                                                               

In [5]: executor=ProcessPoolExecutor(max_workers=1)                                                               

In [6]: future=executor.submit(func)                                                                                     

foo
In [7]:  
Run Code Online (Sandbox Code Playgroud)

由于警告,我不确定答案是否有帮助,但它解释了为什么当您有程序(您的其他尝试)时它的行为会有所不同,以及为什么它在 Ubuntu 上运行良好 - 默认情况下使用“fork”。


san*_*eek 6

TLDR;

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor

# create child processes using 'fork' context
executor = ProcessPoolExecutor(max_workers=1, mp_context=mp.get_context('fork'))
Run Code Online (Sandbox Code Playgroud)

这实际上是由 MacOS 上的 python 3.8 切换到“生成”方法来创建子进程引起的;与 3.8 之前的默认值“fork”相反。以下是一些本质区别:

叉子:

  • 克隆父进程的数据和代码,因此继承父程序的状态。
  • 子进程对继承变量所做的任何修改都不会反映在父进程中这些变量的状态。从这一点开始,状态基本上是分叉的(写时复制)。
  • 在父进程中导入的所有库都可用于子进程的上下文。这也使此方法快速,因为子进程不必重新导入库(代码)和变量(数据)。
  • 这带来了一些缺点,尤其是在分叉多线程程序方面。
  • 一些具有 C 后端的库,如 Tensorflow、OpenCV 等,不是 fork-safe,会导致子进程以不确定的方式挂起。

产卵:

  • 在不继承代码或数据的情况下为子进程创建一个新的解释器。
  • 只有必要的数据/参数被发送到子进程。这意味着子进程不会自动使用变量、线程锁、文件描述符等——这避免了难以捕捉的错误。
  • 这种方法也有一些缺点——因为数据/参数需要发送到子进程,它们也必须是可腌制的。某些具有内部锁/互斥锁(如队列)的对象无法进行pickle,而pickle 较重的对象(如数据帧和大型numpy 数组)的成本很高。
  • 取消子进程上的对象将导致重新导入相关库(如果有)。这又增加了时间。
  • 由于父代码未克隆到子进程中,因此if __name__ == '__main__'在创建子进程时需要使用保护。不这样做会使子进程无法从父进程(现在作为main运行)导入代码。这也是为什么您的程序在与守卫一起使用时可以工作的原因。

如果您注意到 fork 会带来一些由您的程序或导入的非 fork 安全库引起的不可预测的影响,您可以:

  • (a) 全局设置多处理的上下文以使用 'fork' 方法:
import multiprocessing as mp

mp.set_start_method("fork")
Run Code Online (Sandbox Code Playgroud)

请注意,这将全局设置上下文,一旦设置,您或任何其他导入的库将无法更改此上下文。

  • (b) 使用 multiprocessing 的get_context方法在本地设置上下文:
import multiprocessing as mp
mp_fork = mp.get_context('fork')

# mp_fork has all the attributes of mp so you can do:
mp_fork.Process(...)  
mp_fork.Pool(...)

# using local context will not change global behaviour:
# create child process using global context
# default is fork in < 3.8; spawn otherwise
mp.Process(...)

# most multiprocessing based functionality like ProcessPoolExecutor 
# also take context as an argument:
executor=ProcessPoolExecutor(max_workers=1, mp_context=mp_fork)
Run Code Online (Sandbox Code Playgroud)