multiprocessing.Pipe比multiprocessing.Queue更慢?

zax*_*liu 9 python

我试图基准的加速PipeQueuemultiprocessing包.T认为内部使用Pipe会更快.QueuePipe

奇怪的Pipe是,比Queue发送大型numpy数组要慢.我在这里错过了什么?

管:

import sys
import time
from multiprocessing import Process, Pipe
import numpy as np

NUM = 1000


def worker(conn):
    for task_nbr in range(NUM):
        conn.send(np.random.rand(400, 400, 3))
    sys.exit(1)


def main():
    parent_conn, child_conn = Pipe(duplex=False)
    Process(target=worker, args=(child_conn,)).start()
    for num in range(NUM):
        message = parent_conn.recv()


if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = NUM / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

# Took 10.86s.
Run Code Online (Sandbox Code Playgroud)

队列

import sys
import time
from multiprocessing import Process
from multiprocessing import Queue
import numpy as np

NUM = 1000

def worker(q):
    for task_nbr in range(NUM):
        q.put(np.random.rand(400, 400, 3))
    sys.exit(1)

def main():
    recv_q = Queue()
    Process(target=worker, args=(recv_q,)).start()
    for num in range(NUM):
        message = recv_q.get()

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = NUM / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

# Took 6.86s.
Run Code Online (Sandbox Code Playgroud)

biv*_*ac0 8

您可以进行实验并将以下内容放入上面的管道代码中.

def worker(conn):
    for task_nbr in range(NUM):
        data = np.random.rand(400, 400, 3)
    sys.exit(1)

def main():
    parent_conn, child_conn = Pipe(duplex=False)
    p = Process(target=worker, args=(child_conn,))
    p.start()
    p.join()
Run Code Online (Sandbox Code Playgroud)

这为您提供了为测试创建数据所需的时间.在我的系统上,这需要大约2.9秒.

在引擎盖下,queue对象实现了缓冲区和线程发送.该线程仍处于相同的进程中,但通过使用它,数据创建不必等待系统IO完成.它有效地并行化了操作.尝试使用一些简单的线程修改管道代码(免责声明,此处的代码仅供测试,不能生产就绪).

import sys
import time
import threading
from multiprocessing import Process, Pipe, Lock
import numpy as np
import copy

NUM = 1000

def worker(conn):
    _conn = conn
    _buf = []
    _wlock = Lock()
    _sentinel = object() # signal that we're done
    def thread_worker():
        while 1:
            if _buf:
                _wlock.acquire()
                obj = _buf.pop(0)
                if obj is _sentinel: return
                _conn.send(data)
                _wlock.release()
    t = threading.Thread(target=thread_worker)
    t.start()
    for task_nbr in range(NUM):
        data = np.random.rand(400, 400, 3)
        data[0][0][0] = task_nbr    # just for integrity check
        _wlock.acquire()
        _buf.append(data)
        _wlock.release()
    _wlock.acquire()
    _buf.append(_sentinel)
    _wlock.release()
    t.join()
    sys.exit(1)

def main():
    parent_conn, child_conn = Pipe(duplex=False)
    Process(target=worker, args=(child_conn,)).start()
    for num in range(NUM):
        message = parent_conn.recv()
        assert num == message[0][0][0], 'Data was corrupted'        

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = NUM / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec
Run Code Online (Sandbox Code Playgroud)

在我的机器上运行需要3.4秒,这几乎与上面的队列代码完全相同.

来自https://docs.python.org/2/library/threading.html

在Cython中,由于Global Interpreter Lock,只有一个线程可以同时执行Python代码......但是,如果要同时运行多个I/O绑定任务,则线程仍然是一个合适的模型.

这些queuepipe差异绝对是一个奇怪的实现细节,直到你深入挖掘它.

  • @bivouac0 在文档引用中,它应该是“CPython”而不是“Cython”。 (2认同)

Fab*_*ese 5

我假设你的print命令使用的是Python2.然而奇怪的行为无法用Python3复制,其中Pipe实际上比它更快Queue.

import sys
import time
from multiprocessing import Process, Pipe, Queue
import numpy as np

NUM = 20000


def worker_pipe(conn):
    for task_nbr in range(NUM):
        conn.send(np.random.rand(40, 40, 3))
    sys.exit(1)


def main_pipe():
    parent_conn, child_conn = Pipe(duplex=False)
    Process(target=worker_pipe, args=(child_conn,)).start()
    for num in range(NUM):
        message = parent_conn.recv()


def pipe_test():
    start_time = time.time()
    main_pipe()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = NUM / duration
    print("Pipe")
    print("Duration: " + str(duration))
    print("Messages Per Second: " + str(msg_per_sec))

def worker_queue(q):
    for task_nbr in range(NUM):
        q.put(np.random.rand(40, 40, 3))
    sys.exit(1)

def main_queue():
    recv_q = Queue()
    Process(target=worker_queue, args=(recv_q,)).start()
    for num in range(NUM):
        message = recv_q.get()

def queue_test():
    start_time = time.time()
    main_queue()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = NUM / duration
    print("Queue")
    print("Duration: " + str(duration))
    print("Messages Per Second: " + str(msg_per_sec))


if __name__ == "__main__":
    for i in range(2):
        queue_test()
        pipe_test()
Run Code Online (Sandbox Code Playgroud)

结果是:

Queue
Duration: 3.44321894646
Messages Per Second: 5808.51822408
Pipe
Duration: 2.69065594673
Messages Per Second: 7433.13169575
Queue
Duration: 3.45295906067
Messages Per Second: 5792.13354361
Pipe
Duration: 2.78426194191
Messages Per Second: 7183.23218766


------------------
(program exited with code: 0)
Press return to continue
Run Code Online (Sandbox Code Playgroud)