大对象和`multiprocessing`管道和`send()`

Ray*_*Ray 3 python pipe multiprocessing

我最近发现,如果我们通过使用创建一对父子连接对象multiprocessing.Pipe,并且如果obj我们尝试通过管道发送的对象太大,我的程序会挂起而不会抛出异常或做任何事情.见下面的代码.(下面的代码使用numpy包来生成大量的浮点数.)

import multiprocessing as mp
import numpy as np

def big_array(conn, size=1200):
    a = np.random.rand(size)
    print "Child process trying to send array of %d floats." %size
    conn.send(a)
    return a

if __name__ == "__main__":
    print "Main process started."
    parent_conn, child_conn = mp.Pipe()
    proc = mp.Process(target=big_array, args=[child_conn, 1200])
    proc.start()
    print "Child process started."
    proc.join()
    print "Child process joined."
    a = parent_conn.recv()
    print "Received the following object."
    print "Type: %s. Size: %d." %(type(a), len(a))
Run Code Online (Sandbox Code Playgroud)

输出如下.

Main process started.
Child process started.
Child process trying to send array of 1200 floats.
Run Code Online (Sandbox Code Playgroud)

它无限期地挂在这里.但是,如果不是1200,我们尝试发送一个包含1000个浮点数的数组,然后程序成功执行,并按预期输出以下输出.

Main process started.
Child process started.
Child process trying to send array of 1000 floats.
Child process joined.
Received the following object.
Type: <type 'numpy.ndarray'>. Size: 1000.
Press any key to continue . . .
Run Code Online (Sandbox Code Playgroud)

对我来说这看起来像个错误.文档说明如下.

send(obj)将对象发送到连接的另一端,应该使用recv()读取.

该对象必须是可选择的.非常大的泡菜(大约32 MB +,虽然它取决于操作系统)可能会引发ValueError异常.

但随着我的运行,甚至没有ValueError抛出异常,程序只是挂在那里.而且,1200长的numpy阵列大9600字节,肯定不超过32MB!这看起来像一个bug.有谁知道如何解决这个问题?

顺便说一句,我使用的是Windows 7,64位.

Eci*_*ana 12

尝试移动join()到下面recv():

import multiprocessing as mp

def big_array(conn, size=1200):
    a = "a" * size
    print "Child process trying to send array of %d floats." %size
    conn.send(a)
    return a

if __name__ == "__main__":
    print "Main process started."
    parent_conn, child_conn = mp.Pipe()
    proc = mp.Process(target=big_array, args=[child_conn, 120000])
    proc.start()
    print "Child process started."
    print "Child process joined."
    a = parent_conn.recv()
    proc.join()
    print "Received the following object."
    print "Type: %s. Size: %d." %(type(a), len(a))
Run Code Online (Sandbox Code Playgroud)

但我真的不明白为什么你的例子适用于小尺寸.我当时认为写入管道然后在没有首先从管道读取数据的情况下进行连接将阻止连接.你应该首先从管道接收,然后加入.但显然它不会阻挡小尺寸......?

编辑:来自文档(http://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming):

"将陷入僵局的一个例子如下:"

from multiprocessing import Process, Queue

def f(q):
q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()
Run Code Online (Sandbox Code Playgroud)