所以我想我终于要发帖了;管理Process工人的正确方法是什么?我尝试使用 a Pool,但我注意到我无法获得每个已完成进程的返回值。我尝试使用回调,但也没有按预期工作。我应该自己管理它们active_children ()吗?
我的池代码:
from multiprocessing import *
import time
import random
SOME_LIST = []
def myfunc():
a = random.randint(0,3)
time.sleep(a)
return a
def cb(retval):
SOME_LIST.append(retval)
print("Starting...")
p = Pool(processes=8)
p.apply_async(myfunc, callback=cb)
p.close()
p.join()
print("Stopping...")
print(SOME_LIST)
Run Code Online (Sandbox Code Playgroud)
我希望有一个值列表;但我得到的只是工人作业中要完成的最后一项:
$ python multi.py
Starting...
Stopping...
[3]
Run Code Online (Sandbox Code Playgroud)
注意:答案不应使用threading模块;原因如下:
在 CPython 中,由于全局解释器锁,一次只有一个线程可以执行 Python 代码(即使某些面向性能的库可能会克服这一限制)。如果您希望您的应用程序更好地利用多核机器的计算资源,建议您使用多处理。
你误解了工作方式apply_async。它不会在Pool. 它只是在工作进程之一中调用该函数一次。所以你看到的结果是可以预料的。您有几种选择来获得所需的行为:
from multiprocessing import Pool
import time
import random
SOME_LIST = []
def myfunc():
a = random.randint(0,3)
time.sleep(a)
return a
def cb(retval):
SOME_LIST.append(retval)
print("Starting...")
p = Pool(processes=8)
for _ in range(p._processes):
p.apply_async(myfunc, callback=cb)
p.close()
p.join()
print("Stopping...")
print(SOME_LIST)
Run Code Online (Sandbox Code Playgroud)
或者
from multiprocessing import Pool
import time
import random
def myfunc():
a = random.randint(0,3)
time.sleep(a)
return a
print("Starting...")
p = Pool(processes=8)
SOME_LIST = p.map(myfunc, range(p._processes))
p.close()
p.join()
print("Stopping...")
print(SOME_LIST)
Run Code Online (Sandbox Code Playgroud)
请注意,您也可以打电话apply_async或map为更多比在游泳池的进程数。的想法Pool是,无论您提交多少任务,它num_processes都能保证进程在 的整个生命周期内都将运行Pool。因此,如果您创建一个Pool(8)并调用apply_async一次,则您的八个工人中的一个将获得一个任务,而其他七个将处于空闲状态。如果您创建 aPool(8)并调用apply_async80 次,则这 80 个任务将分配给您的 8 个工作人员,同时实际处理的任务不超过 8 个。
| 归档时间: |
|
| 查看次数: |
1106 次 |
| 最近记录: |