Python - apply_async不执行函数

Shu*_*iao 7 python multiprocessing

您好,我正在尝试使用多处理来加速我的代码。但是,apply_async 对我不起作用。我尝试做一个简单的例子,例如:

from multiprocessing.pool import Pool
t = [0, 1, 2, 3, 4, 5]
def cube(x):
    t[x] = x**3
pool = Pool(processes=4)
for i in range(6):
    pool.apply_async(cube, args=(i, ))
for x in t:
    print(x)
Run Code Online (Sandbox Code Playgroud)

它并没有t像我预期的那样真正改变。

我的真实代码是这样的:

from multiprocessing.pool import Pool
def func(a, b, c, d):
    #some calculations
    #save result to files
    #no return value
lt = #list of possible value of a
#set values to b, c, d
p = Pool()
for i in lt:
    p.apply_async(func, args=(i, b, c, d, ))
Run Code Online (Sandbox Code Playgroud)

这里的问题出在哪里呢?

谢谢你!


更新:感谢评论和答案,现在我明白为什么我的简单示例不起作用。但是,我的真实代码仍然遇到麻烦。我已经检查过我的func不依赖于任何全局变量,所以它似乎与我的示例代码不是同一个问题。

按照建议,我向我的 中添加了一个返回值func,现在我的代码是:

f = Flux("reactor")
d = Detector("Ge")
mv = arange(-6, 1.5, 0.5)
p = Pool()
lt = ["uee", "dee"]
for i in lt:
    re = p.apply_async(res, args=(i, d, f, mv, ))
    print(re.get())
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)

现在我收到以下错误:

Traceback (most recent call last):
  File "/Users/Shu/Documents/Programming/Python/Research/debug.py", line 35, in <module>
print(re.get())
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py", line 608, in get
raise self._value
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py", line 385, in _handle_tasks
put(task)
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'Flux.__init__.<locals>.<lambda>'
Run Code Online (Sandbox Code Playgroud)

nox*_*fox 7

编辑:您提供的第一个示例将不起作用,原因很简单:进程不共享内存。因此,更改t[x] = x**3不会应用于父进程,列表的值保持t不变。

您需要实际返回计算中的值并从中构建一个新列表。

def cube(x):
    return x**3

t = [0, 1, 2, 3, 4, 5]

p = Pool()
t = p.map(cube, t)

print(t)  
Run Code Online (Sandbox Code Playgroud)

如果,正如您在第二个示例中声称的那样,结果不应该返回,而是独立存储在文件中,并且这种情况没有发生,我建议检查函数的返回值,看看函数本身是否是是否引发异常。

我建议您获取实际结果并看看会发生什么:

p = Pool()
for i in lt:
    res = p.apply_async(func, args=(i, b, c, d, ))
    print(res.get())  # this will raise an exception if it happens within func

p.close()  # do not accept any more tasks
p.join()  # wait for the completion of all scheduled jobs
Run Code Online (Sandbox Code Playgroud)