Python Multiprocessing并发使用Manager,Pool和共享列表不起作用

noh*_*hup 6 python parallel-processing concurrency multiprocessing python-multiprocessing

我正在学习python多处理,我正在尝试使用此功能来填充包含操作系统中所有文件的列表.但是,我写的代码只是按顺序执行.

#!/usr/bin/python
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] #Gets a top level directory names inside "/"
manager = multiprocessing.Manager()
files = manager.list()


def get_files(x):
    for root, dir, file in os.walk(x):
        for name in file:
            files.append(os.path.join(root, name))

mp = [multiprocessing.Process(target=get_files, args=(tld[x],))
      for x in range(len(tld))]

for i in mp:
    i.start()
    i.join()
print len(files)
Run Code Online (Sandbox Code Playgroud)

当我检查进程树时,我只能看到生成的一个智能进程.(man pstree说{}表示父母产生的子进程.)

---bash(10949)---python(12729)-+-python(12730)---{python}(12752)
                               `-python(12750)`
Run Code Online (Sandbox Code Playgroud)

我正在寻找的是,为每个tld目录生成一个进程,填充共享列表files,这将是大约10-15个进程,具体取决于目录的数量.我究竟做错了什么?

编辑::

我曾经multiprocessing.Pool创建工作线程,这次生成了进程,但是在我尝试使用时会出错multiprocessing.Pool.map().我指的是显示的python文档中的以下代码

from multiprocessing import Pool
def f(x):
return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3])) 
Run Code Online (Sandbox Code Playgroud)

在这个例子之后,我重写了代码

import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()
pool = multiprocessing.Pool(processes=len(tld))
print pool
files = manager.list()
def get_files(x):
    for root, dir, file in os.walk(x):
        for name in file:
            files.append(os.path.join(root, name))
pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)
Run Code Online (Sandbox Code Playgroud)

它正在分叉多个进程.

---bash(10949)---python(12890)-+-python(12967)
                               |-python(12968)
                               |-python(12970)
                               |-python(12971)
                               |-python(12972)
                               ---snip---
Run Code Online (Sandbox Code Playgroud)

但是代码错误的说法

Process PoolWorker-2: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get return recv() AttributeError: 'module' object has no attribute 'get_files' self._target(*self._args, **self._kwargs) self.run() task = get() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get AttributeError: 'module' object has no attribute 'get_files' self.run()

我在这里做错了什么,为什么get_files()函数出错?

Fun*_*ayu 6

这仅仅是因为您在定义函数之前实例化了池get_files

import os
import multiprocessing

tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()

files = manager.list()
def get_files(x):
    for root, dir, file in os.walk(x):
        for name in file:
            files.append(os.path.join(root, name))

pool = multiprocessing.Pool(processes=len(tld)) # Instantiate the pool here

pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)
Run Code Online (Sandbox Code Playgroud)

进程的总体思想是,在您启动它的那一刻,您将主进程的内存分叉。因此在 fork之后主进程中完成的任何定义都不会在子进程 中。

如果你想要一个共享内存,你可以使用这个threading库,但你会遇到一些问题(参见:全局解释器锁

  • 我不确定是否理解你句子的所有含义(我的英语需要改进:))。据我了解,您认为 Pool 仅设置进程的*数量*,而不是*启动*进程。那不是真的(感谢上帝:D)。总体思路是启动池以便在之后使用它(例如,使用`pool.map`)。初始化过程很长(相信我,我在这方面做了很多工作),而且应该只运行一次。 (3认同)
  • 这是完全有道理的。无论您缺乏英语,您都可以用 Python 弥补 :) 太棒了。感谢所有的帮助。谢谢@FunkySayu :) (2认同)