noh*_*hup 6 python parallel-processing concurrency multiprocessing python-multiprocessing
我正在学习python多处理,我正在尝试使用此功能来填充包含操作系统中所有文件的列表.但是,我写的代码只是按顺序执行.
#!/usr/bin/python
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] #Gets a top level directory names inside "/"
manager = multiprocessing.Manager()
files = manager.list()
def get_files(x):
for root, dir, file in os.walk(x):
for name in file:
files.append(os.path.join(root, name))
mp = [multiprocessing.Process(target=get_files, args=(tld[x],))
for x in range(len(tld))]
for i in mp:
i.start()
i.join()
print len(files)
Run Code Online (Sandbox Code Playgroud)
当我检查进程树时,我只能看到生成的一个智能进程.(man pstree说{}表示父母产生的子进程.)
---bash(10949)---python(12729)-+-python(12730)---{python}(12752)
`-python(12750)`
Run Code Online (Sandbox Code Playgroud)
我正在寻找的是,为每个tld目录生成一个进程,填充共享列表files,这将是大约10-15个进程,具体取决于目录的数量.我究竟做错了什么?
编辑::
我曾经multiprocessing.Pool创建工作线程,这次生成了进程,但是在我尝试使用时会出错multiprocessing.Pool.map().我指的是显示的python文档中的以下代码
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))
Run Code Online (Sandbox Code Playgroud)
在这个例子之后,我重写了代码
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()
pool = multiprocessing.Pool(processes=len(tld))
print pool
files = manager.list()
def get_files(x):
for root, dir, file in os.walk(x):
for name in file:
files.append(os.path.join(root, name))
pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)
Run Code Online (Sandbox Code Playgroud)
它正在分叉多个进程.
---bash(10949)---python(12890)-+-python(12967)
|-python(12968)
|-python(12970)
|-python(12971)
|-python(12972)
---snip---
Run Code Online (Sandbox Code Playgroud)
但是代码错误的说法
Process PoolWorker-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
return recv()
AttributeError: 'module' object has no attribute 'get_files'
self._target(*self._args, **self._kwargs)
self.run()
task = get()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
AttributeError: 'module' object has no attribute 'get_files'
self.run()
我在这里做错了什么,为什么get_files()函数出错?
这仅仅是因为您在定义函数之前实例化了池get_files:
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()
files = manager.list()
def get_files(x):
for root, dir, file in os.walk(x):
for name in file:
files.append(os.path.join(root, name))
pool = multiprocessing.Pool(processes=len(tld)) # Instantiate the pool here
pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)
Run Code Online (Sandbox Code Playgroud)
进程的总体思想是,在您启动它的那一刻,您将主进程的内存分叉。因此,在 fork之后在主进程中完成的任何定义都不会在子进程 中。
如果你想要一个共享内存,你可以使用这个threading库,但你会遇到一些问题(参见:全局解释器锁)