在终端和Django或Flask的代码模块中使用python多处理池

Zag*_*ags 19 python django pool multiprocessing flask

在python中使用multiprocessing.Pool时,如下代码,有一些奇怪的行为.

from multiprocessing import Pool
p = Pool(3)
def f(x): return x
threads = [p.apply_async(f, [i]) for i in range(20)]
for t in threads:
    try: print(t.get(timeout=1))
    except Exception: pass
Run Code Online (Sandbox Code Playgroud)

我得到以下错误三次(池中的每个线程一个),并打印"3"到"19":

AttributeError: 'module' object has no attribute 'f'
Run Code Online (Sandbox Code Playgroud)

前三个apply_async调用永远不会返回.

同时,如果我尝试:

from multiprocessing import Pool
p = Pool(3)
def f(x): print(x)
p.map(f, range(20))
Run Code Online (Sandbox Code Playgroud)

我得到了AttributeError 3次,shell打印"6"到"19",然后挂起并且不能被[Ctrl] + [C]杀死

多处理文档有以下说法:

此程序包中的功能要求模块可由子项导入.

这是什么意思?

为了澄清,我在终端中运行代码来测试功能,但最终我希望能够将其放入Web服务器的模块中.你如何在python终端和代码模块中正确使用multiprocessing.Pool?

Zag*_*ags 36

这意味着必须在要对它们运行的​​函数的定义之后初始化池.if __name__ == "__main__":如果您正在编写独立脚本,则使用块内的池可以正常工作,但这在较大的代码库或服务器代码(例如Django或Flask项目)中是不可能的.因此,如果您尝试在其中一个中使用池,请务必遵循以下准则中解释的这些准则:

  1. 初始化模块底部或内部函数的池.
  2. 不要在模块的全局范围内调用Pool的方法.

或者,如果您只需要更好的I/O并行性(如数据库访问或网络调用),您可以省去所有这些麻烦并使用线程池而不是进程池.这涉及完全无证的:

from multiprocessing.pool import ThreadPool
Run Code Online (Sandbox Code Playgroud)

它的接口与Pool的接口完全相同,但由于它使用线程而不是进程,因此没有使用进程池的注意事项,唯一的缺点是你没有得到代码执行的真正并行性,只是阻塞I/O的并行性.


必须在对要在其上运行的函数的定义之后初始化池

来自python docs的不可思议的文本意味着在定义池时,周围的模块由池中的线程导入.对于python终端,这意味着到目前为止您运行的所有代码.

因此,必须在初始化池之前定义要在池中使用的任何函数.模块中的代码和终端中的代码都是如此.以下对问题中代码的修改将正常工作:

from multiprocessing import Pool
def f(x): return x  # FIRST
p = Pool(3) # SECOND
threads = [p.apply_async(f, [i]) for i in range(20)]
for t in threads:
    try: print(t.get(timeout=1))
    except Exception: pass
Run Code Online (Sandbox Code Playgroud)

要么

from multiprocessing import Pool
def f(x): print(x)  # FIRST
p = Pool(3) # SECOND
p.map(f, range(20))
Run Code Online (Sandbox Code Playgroud)

很好,我的意思是在Unix上很好.Windows有它自己的问题,我不会进入这里.


注意在模块中使用池

但是等等,还有更多(要在其他地方导入模块中使用池)!

如果在函数内定义池,则没有问题. 但是,如果您将Pool对象用作模块中的全局变量,则必须在页面底部而不是顶部定义它.虽然这与大多数好的代码风格相悖,但它对功能来说是必要的.使用在页面顶部声明的池的方法是仅将其与从其他模块导入的函数一起使用,如下所示:

from multiprocessing import Pool
from other_module import f
p = Pool(3)
p.map(f, range(20))
Run Code Online (Sandbox Code Playgroud)

从另一个模块导入预先配置的池是非常可怕的,因为导入必须在您想要在其上运行的任何内容之后,如下所示:

### module.py ###
from multiprocessing import Pool
POOL = Pool(5)

### module2.py ###
def f(x):
    # Some function
from module import POOL
POOL.map(f, range(10))
Run Code Online (Sandbox Code Playgroud)

其次,如果您在要导入的模块的全局范围内在池上运行任何内容,系统将挂起.即这不起作用:

### module.py ###
from multiprocessing import Pool
def f(x): return x
p = Pool(1)
print(p.map(f, range(5)))

### module2.py ###
import module
Run Code Online (Sandbox Code Playgroud)

但是,只要没有导入module2,这确实有效:

### module.py ###
from multiprocessing import Pool

def f(x): return x
p = Pool(1)
def run_pool(): print(p.map(f, range(5)))

### module2.py ###
import module
module.run_pool()
Run Code Online (Sandbox Code Playgroud)

现在,其背后的原因只是更奇怪,并且可能与问题中的代码仅在每次出现属性错误并且之后出现正确执行代码的原因有关.看来,池线程(至少具有一定的可靠性)在执行后重新加载模块中的代码.


mat*_*ata 5

创建线程池时,必须已经定义要在线程池上执行的函数。

这应该有效:

from multiprocessing import Pool

def f(x): print(x)

if __name__ == '__main__':
    p = Pool(3)
    p.map(f, range(20))
Run Code Online (Sandbox Code Playgroud)

原因是(至少在基于 Unix 的系统上fork)当您创建池时,工作线程是通过分叉当前进程来创建的。因此,如果此时目标函数尚未定义,则工作人员将无法调用它。

在 Windows 上情况有点不同,因为Windows 没有fork. 这里启动新的工作进程并导入主模块。这就是为什么在 Windows 上使用if __name__ == '__main__'. 否则,每个新工作人员将重新执行代码,从而无限地产生新进程,从而导致程序(或系统)崩溃。