blz*_*blz 143 python python-multiprocessing
在下面的示例代码中,我想恢复函数的返回值worker.我该怎么做呢?这个值存储在哪里?
示例代码:
import multiprocessing
def worker(procnum):
'''worker function'''
print str(procnum) + ' represent!'
return procnum
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print jobs
Run Code Online (Sandbox Code Playgroud)
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
Run Code Online (Sandbox Code Playgroud)
我似乎无法在存储的对象中找到相关属性jobs.
提前谢谢,blz
var*_*tec 139
使用共享变量进行通信.例如这样:
import multiprocessing
def worker(procnum, return_dict):
'''worker function'''
print str(procnum) + ' represent!'
return_dict[procnum] = procnum
if __name__ == '__main__':
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print return_dict.values()
Run Code Online (Sandbox Code Playgroud)
Mar*_*ark 59
我认为@sega_sai建议的方法更好.但它确实需要一个代码示例,所以这里是:
import multiprocessing
from os import getpid
def worker(procnum):
print('I am number %d in process %d' % (procnum, getpid()))
return getpid()
if __name__ == '__main__':
pool = multiprocessing.Pool(processes = 3)
print(pool.map(worker, range(5)))
Run Code Online (Sandbox Code Playgroud)
这将打印返回值:
I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]
Run Code Online (Sandbox Code Playgroud)
如果您熟悉map(内置的Python 2),这不应该太具有挑战性.否则看看sega_Sai的链接.
请注意需要的代码很少.(还要注意如何重复使用进程).
Dav*_*len 19
此示例显示如何使用multiprocessing.Pipe实例列表从任意数量的进程返回字符串:
import multiprocessing
def worker(procnum, send_end):
'''worker function'''
result = str(procnum) + ' represent!'
print result
send_end.send(result)
def main():
jobs = []
pipe_list = []
for i in range(5):
recv_end, send_end = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=worker, args=(i, send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()
for proc in jobs:
proc.join()
result_list = [x.recv() for x in pipe_list]
print result_list
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']
Run Code Online (Sandbox Code Playgroud)
该解决方案使用较少的资源比multiprocessing.Queue它使用
或multiprocessing.SimpleQueue其使用
查看每种类型的来源非常有益.
sud*_*udo 15
出于某种原因,我找不到一个如何在Queue任何地方执行此操作的一般示例(即使Python的doc示例也不会产生多个进程),所以这就是我在10次尝试之后的工作:
def add_helper(queue, arg1, arg2): # the func called in child processes
ret = arg1 + arg2
queue.put(ret)
def multi_add(): # spawns child processes
q = Queue()
processes = []
rets = []
for _ in range(0, 100):
p = Process(target=add_helper, args=(q, 1, 2))
processes.append(p)
p.start()
for p in processes:
ret = q.get() # will block
rets.append(ret)
for p in processes:
p.join()
return rets
Run Code Online (Sandbox Code Playgroud)
Queue是一个阻塞的,线程安全的队列,可用于存储子进程的返回值.所以你必须将队列传递给每个进程.一些不太明显的是,你们必须get()从队列你之前join的ProcessES否则队列已满,并且块一切.
针对面向对象的人的更新(在Python 3.4中测试):
from multiprocessing import Process, Queue
class Multiprocessor():
def __init__(self):
self.processes = []
self.queue = Queue()
@staticmethod
def _wrapper(func, queue, args, kwargs):
ret = func(*args, **kwargs)
queue.put(ret)
def run(self, func, *args, **kwargs):
args2 = [func, self.queue, args, kwargs]
p = Process(target=self._wrapper, args=args2)
self.processes.append(p)
p.start()
def wait(self):
rets = []
for p in self.processes:
ret = self.queue.get()
rets.append(ret)
for p in self.processes:
p.join()
return rets
# tester
if __name__ == "__main__":
mp = Multiprocessor()
num_proc = 64
for _ in range(num_proc): # queue up multiple tasks running `sum`
mp.run(sum, [1, 2, 3, 4, 5])
ret = mp.wait() # get all results
print(ret)
assert len(ret) == num_proc and all(r == 15 for r in ret)
Run Code Online (Sandbox Code Playgroud)
Mat*_*sen 13
对于正在寻求如何从Process使用中获取价值的任何其他人Queue:
import multiprocessing
ret = {'foo': False}
def worker(queue):
ret = queue.get()
ret['foo'] = True
queue.put(ret)
if __name__ == '__main__':
queue = multiprocessing.Queue()
queue.put(ret)
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
print queue.get() # Prints {"foo": True}
p.join()
Run Code Online (Sandbox Code Playgroud)
seg*_*sai 12
您似乎应该使用multiprocessing.Pool类并使用方法.apply().apply_async(),map()
http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult
您可以使用exit内置来设置进程的退出代码.它可以从exitcode过程的属性中获得:
import multiprocessing
def worker(procnum):
print str(procnum) + ' represent!'
exit(procnum)
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
result = []
for proc in jobs:
proc.join()
result.append(proc.exitcode)
print result
Run Code Online (Sandbox Code Playgroud)
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
Run Code Online (Sandbox Code Playgroud)
该石子包有一个很好的抽象杠杆multiprocessing.Pipe,这使得这个非常简单:
from pebble import concurrent
@concurrent.process
def function(arg, kwarg=0):
return arg + kwarg
future = function(1, kwarg=1)
print(future.result())
Run Code Online (Sandbox Code Playgroud)
示例来自:https : //pythonhosted.org/Pebble/#concurrent-decorators
我想我会简化从上面复制的最简单的例子,在 Py3.6 上为我工作。最简单的是multiprocessing.Pool:
import multiprocessing
import time
def worker(x):
time.sleep(1)
return x
pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))
Run Code Online (Sandbox Code Playgroud)
您可以使用,例如,设置池中的进程数Pool(processes=5)。然而,它默认为 CPU 计数,因此对于 CPU 密集型任务将其留空。(无论如何,I/O 密集型任务通常适合线程,因为线程大部分时间都在等待,因此可以共享一个 CPU 内核。)Pool也适用于分块优化。
(请注意,工作方法不能嵌套在方法中。我最初在调用 的方法中定义了我的工作方法pool.map,以保持其全部自包含,但随后进程无法导入它,并抛出“AttributeError :不能腌制本地对象outer_method..inner_method”。更多在这里。它可以在一个类中。)
(感谢原始问题指定的打印'represent!'而不是time.sleep(),但没有它,我认为某些代码在并发运行时不是。)
Py3ProcessPoolExecutor也是两行(.map返回一个生成器,所以你需要list()):
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
print(list(executor.map(worker, range(10))))
Run Code Online (Sandbox Code Playgroud)
使用简单的Processes:
import multiprocessing
import time
def worker(x, queue):
time.sleep(1)
queue.put(x)
queue = multiprocessing.SimpleQueue()
tasks = range(10)
for task in tasks:
multiprocessing.Process(target=worker, args=(task, queue,)).start()
for _ in tasks:
print(queue.get())
Run Code Online (Sandbox Code Playgroud)
SimpleQueue如果您只需要put和 ,请使用get。第一个循环启动所有进程,然后第二个循环进行阻塞queue.get调用。我认为也没有任何理由打电话p.join()。