如何恢复传递给multiprocessing.Process的函数的返回值?

blz*_*blz 143 python python-multiprocessing

在下面的示例代码中,我想恢复函数的返回值worker.我该怎么做呢?这个值存储在哪里?

示例代码:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs
Run Code Online (Sandbox Code Playgroud)

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
Run Code Online (Sandbox Code Playgroud)

我似乎无法在存储的对象中找到相关属性jobs.

提前谢谢,blz

var*_*tec 139

使用共享变量进行通信.例如这样:

import multiprocessing

def worker(procnum, return_dict):
    '''worker function'''
    print str(procnum) + ' represent!'
    return_dict[procnum] = procnum


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print return_dict.values()
Run Code Online (Sandbox Code Playgroud)

  • 我建议使用[`multiprocessing.Queue`](https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Queue),而不是这里的`Manager`.使用`Manager`需要产生一个全新的进程,当`Queue`会这样做时,这是一种过度杀伤. (37认同)
  • @vartec 使用 multipriocessing.Manager() 字典的一个缺点是它会pickle(序列化)它返回的对象,因此它有一个由pickle库给出的瓶颈,该库的返回对象的最大大小为2GiB。有没有其他方法可以避免返回对象的序列化? (5认同)
  • @Catbuilts您可以从每个进程返回一个元组,其中一个值是您关心的实际返回值,另一个是进程中的唯一标识符.但我也想知道为什么你需要知道哪个进程返回哪个值.如果您真正需要了解该过程,或者您需要在输入列表和输出列表之间进行关联?在这种情况下,我建议使用`multiprocessing.Pool.map`来处理您的工作项列表. (4认同)
  • **仅具有单个参数的函数的腔**:应当使用args =(my_function_argument,)。注意这里的逗号。否则,Python将抱怨“缺少位置参数”。花了我10分钟来弄清楚。还要检查[手动用法](https://docs.python.org/3/library/multiprocessing.html)(在“进程类”部分下)。 (3认同)
  • @dano:我想知道,如果我们使用 Queue() 对象,我们无法确定每个进程返回值时的顺序。我的意思是如果我们需要结果中的顺序,来做下一个工作。我们如何确定哪个输出来自哪个进程 (2认同)

Mar*_*ark 59

我认为@sega_sai建议的方法更好.但它确实需要一个代码示例,所以这里是:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))
Run Code Online (Sandbox Code Playgroud)

这将打印返回值:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]
Run Code Online (Sandbox Code Playgroud)

如果您熟悉map(内置的Python 2),这不应该太具有挑战性.否则看看sega_Sai的链接.

请注意需要的代码很少.(还要注意如何重复使用进程).

  • 有什么想法为什么我的“getpid()”返回所有相同的值吗?我正在运行Python3 (2认同)

Dav*_*len 19

此示例显示如何使用multiprocessing.Pipe实例列表从任意数量的进程返回字符串:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']
Run Code Online (Sandbox Code Playgroud)

该解决方案使用较少的资源比multiprocessing.Queue它使用

  • 管道
  • 至少一个锁
  • 一个缓冲区
  • 一个线程

multiprocessing.SimpleQueue其使用

  • 管道
  • 至少一个锁

查看每种类型的来源非常有益.

  • 如果返回的对象很大,这个答案会导致死锁。我不会先执行 proc.join(),而是先尝试 recv() 返回值,然后执行连接。 (4认同)

sud*_*udo 15

出于某种原因,我找不到一个如何在Queue任何地方执行此操作的一般示例(即使Python的doc示例也不会产生多个进程),所以这就是我在10次尝试之后的工作:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets
Run Code Online (Sandbox Code Playgroud)

Queue是一个阻塞的,线程安全的队列,可用于存储子进程的返回值.所以你必须将队列传递给每个进程.一些不太明显的是,你们必须get()从队列你之前joinProcessES否则队列已满,并且块一切.

针对面向对象的人的更新(在Python 3.4中测试):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)
Run Code Online (Sandbox Code Playgroud)


Mat*_*sen 13

对于正在寻求如何从Process使用中获取价值的任何其他人Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    print queue.get()  # Prints {"foo": True}
    p.join()
Run Code Online (Sandbox Code Playgroud)

  • 是的,它无限地悬挂在那里.我的工作人员全部完成(在工作人员职能部门内循环,之后打印声明,为所有工人打印).联接不做任何事情.如果我从我的函数中删除`Queue`,它确实让我传递`join()` (4认同)
  • 当我将某些内容放入工作进程的队列中时,我的加入永远不会到达。知道这会如何发生吗? (2认同)

Dav*_*len 9

您可以使用exit内置来设置进程的退出代码.它可以从exitcode过程的属性中获得:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result
Run Code Online (Sandbox Code Playgroud)

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
Run Code Online (Sandbox Code Playgroud)

  • 请注意,这种方法可能会变得混乱.进程通常应该以退出代码0退出,它们是否完成而没有错误.如果您有任何监视系统进程退出代码的内容,那么您可能会将这些报告视为错误. (4认同)
  • 如果您只想在父进程中发生错误时引发异常,那么这是完美的选择。 (2认同)

eri*_*eed 9

石子包有一个很好的抽象杠杆multiprocessing.Pipe,这使得这个非常简单:

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())
Run Code Online (Sandbox Code Playgroud)

示例来自:https : //pythonhosted.org/Pebble/#concurrent-decorators


Chr*_*ris 6

我想我会简化从上面复制的最简单的例子,在 Py3.6 上为我工作。最简单的是multiprocessing.Pool

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))
Run Code Online (Sandbox Code Playgroud)

您可以使用,例如,设置池中的进程数Pool(processes=5)。然而,它默认为 CPU 计数,因此对于 CPU 密集型任务将其留空。(无论如何,I/O 密集型任务通常适合线程,因为线程大部分时间都在等待,因此可以共享一个 CPU 内核。)Pool也适用于分块优化

(请注意,工作方法不能嵌套在方法中。我最初在调用 的方法中定义了我的工作方法pool.map,以保持其全部自包含,但随后进程无法导入它,并抛出“AttributeError :不能腌制本地对象outer_method..inner_method”。更多在这里。它可以在一个类中。)

(感谢原始问题指定的打印'represent!'而不是time.sleep(),但没有它,我认为某些代码在并发运行时不是。)


Py3ProcessPoolExecutor也是两行(.map返回一个生成器,所以你需要list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))
Run Code Online (Sandbox Code Playgroud)

使用简单的Processes:

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())
Run Code Online (Sandbox Code Playgroud)

SimpleQueue如果您只需要put和 ,请使用get。第一个循环启动所有进程,然后第二个循环进行阻塞queue.get调用。我认为也没有任何理由打电话p.join()