多处理 - 管道与队列

Jon*_*han 128 python queue performance pipe multiprocessing

Python的多处理包中队列和管道之间的根本区别是什么?

在什么情况下应该选择一个而不是另一个?什么时候使用有利Pipe()?什么时候使用有利Queue()

Mik*_*ton 242

  • A Pipe()只能有两个端点.

  • A Queue()可以有多个生产者和消费者.

何时使用它们

如果您需要两个以上的通信点,请使用Queue().

如果你需要绝对的性能,那么a Pipe()会快得多,因为Queue()它建立在它之上Pipe().

绩效基准

假设您想要生成两个进程并尽快在它们之间发送消息.这些是使用Pipe()和的类似测试之间的拖拽竞赛的时间结果Queue()......这是在运行Ubuntu 11.10和Python 2.7.2的ThinkpadT61上.

仅供参考,我把结果JoinableQueue()作为奖金投入; 调用JoinableQueue()任务时queue.task_done()(它甚至不知道特定任务,它只计算队列中未完成的任务),因此queue.join()知道工作已完成.

这个答案底部的每个代码......

mpenning@mpenning-T61:~$ python multi_pipe.py 
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py 
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py 
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$
Run Code Online (Sandbox Code Playgroud)

总结Pipe()大约是a的三倍Queue().JoinableQueue()除非你真的必须得到好处,否则不要考虑.

奖金材料2

多处理引入了信息流的细微变化,除非您知道一些快捷方式,否则会使调试变得困难.例如,在许多条件下通过字典索引时,您可能有一个可正常工作的脚本,但在某些输入中很少失败.

通常我们会在整个python进程崩溃时找到失败的线索; 但是,如果多处理功能崩溃,则不会将未经请求的崩溃回溯打印到控制台.追踪未知的多处理崩溃是很困难的,没有对流程崩溃的线索.

我发现追踪多处理崩溃信息的最简单方法是将整个多处理函数包装在try/中except并使用traceback.print_exc():

import traceback
def reader(args):
    try:
        # Insert stuff to be multiprocessed here
        return args[0]['that']
    except:
        print "FATAL: reader({0}) exited while multiprocessing".format(args) 
        traceback.print_exc()
Run Code Online (Sandbox Code Playgroud)

现在,当您发现崩溃时,您会看到以下内容:

FATAL: reader([{'crash', 'this'}]) exited while multiprocessing
Traceback (most recent call last):
  File "foo.py", line 19, in __init__
    self.run(task_q, result_q)
  File "foo.py", line 46, in run
    raise ValueError
ValueError
Run Code Online (Sandbox Code Playgroud)

源代码:


"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time

def reader_proc(pipe):
    ## Read from the pipe; this will be spawned as a separate Process
    p_output, p_input = pipe
    p_input.close()    # We are only reading
    while True:
        msg = p_output.recv()    # Read from the output pipe and do nothing
        if msg=='DONE':
            break

def writer(count, p_input):
    for ii in xrange(0, count):
        p_input.send(ii)             # Write 'count' numbers into the input pipe
    p_input.send('DONE')

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        # Pipes are unidirectional with two endpoints:  p_input ------> p_output
        p_output, p_input = Pipe()  # writer() writes to p_input from _this_ process
        reader_p = Process(target=reader_proc, args=((p_output, p_input),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process

        p_output.close()       # We no longer need this part of the Pipe()
        _start = time.time()
        writer(count, p_input) # Send a lot of stuff to reader_proc()
        p_input.close()
        reader_p.join()
        print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
            (time.time() - _start)))
Run Code Online (Sandbox Code Playgroud)
"""
multi_queue.py
"""

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:             
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
            (time.time() - _start)))
Run Code Online (Sandbox Code Playgroud)
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        queue.task_done()

def writer(count, queue):
    for ii in xrange(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
        # reader_proc() reads from jqueue as a different process...
        reader_p = Process(target=reader_proc, args=((jqueue),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process
        _start = time.time()
        writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
        jqueue.join()         # Wait for the reader to finish
        print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count, 
            (time.time() - _start)))
Run Code Online (Sandbox Code Playgroud)

  • 优秀!很好的答案和你提供的基准测试很好!我只有两个微小的狡辩:(1)"数量级更快"有点夸大其词.差异是x3,大约是一个数量级的三分之一.只是说.;-); (2)更公平的比较是运行N个工作者,每个工作人员通过点对点管道与主线程进行通信,而运行N个工作人员的性能都是从单个点到多点队列中进行的. (11认同)
  • @JJC为了狡辩你的狡辩,3x约为半个数量级,而不是第三个 - sqrt(10)= ~3. (9认同)
  • 对你的"奖金材料"......是的.如果您是子类化Process,请将大量"run"方法放在try块中.这也是记录异常的有用方法.要复制正常的异常输出:sys.stderr.write(''.join(traceback.format_exception(*(sys.exc_info())))) (3认同)
  • @Jonathan"总结Pipe()大约比Queue()快三倍" (2认同)
  • @ alexpinho98 - 但是你需要一些带外数据和相关的信令模式来表明你发送的不是常规数据而是错误数据.看到原始过程已经处于不可预测的状态,这可能太多了. (2认同)
  • @Mike,只是想说你真棒.这个答案对我帮助很大. (2认同)
  • 在我的测试中,使用 Queue 发送小包(每个“send/put”“recv/get”最多约 500 个整数值)比使用 Pipe 时更快(即使是 duplex=False 的单向)。因此,如果您想获得绝对性能,请在使用其中之一之前检查代表性数据大小。例如,使用 Queue 发送仅 3 个 int 的消息大约快 40%。 (2认同)

Tob*_*ter 8

如果 - 像我一样 - 您想知道是否在程序中使用multiprocessing构造 (PipeQueue)threading来提高性能,我已经改编了Mike Pennington的脚本来与queue.Queue和进行比较queue.SimpleQueue

Sending 10000 numbers to mp.Pipe():                     57.769 ms
Sending 10000 numbers to mp.Queue():                    74.844 ms
Sending 10000 numbers to mp.SimpleQueue():              66.662 ms
Sending 10000 numbers to queue.Queue():                  8.253 ms
Sending 10000 numbers to queue.SimpleQueue():            0.831 ms
Sending 100000 numbers to mp.Pipe():                   421.775 ms
Sending 100000 numbers to mp.Queue():                  812.989 ms
Sending 100000 numbers to mp.SimpleQueue():            682.442 ms
Sending 100000 numbers to queue.Queue():                82.091 ms
Sending 100000 numbers to queue.SimpleQueue():           7.831 ms
Sending 1000000 numbers to mp.Pipe():                 4198.766 ms
Sending 1000000 numbers to mp.Queue():                8302.404 ms
Sending 1000000 numbers to mp.SimpleQueue():          6845.322 ms
Sending 1000000 numbers to queue.Queue():              840.551 ms
Sending 1000000 numbers to queue.SimpleQueue():         77.338 ms
Sending 10000000 numbers to mp.Pipe():               43341.405 ms
Sending 10000000 numbers to mp.Queue():              85868.946 ms
Sending 10000000 numbers to mp.SimpleQueue():        71669.009 ms
Sending 10000000 numbers to queue.Queue():            8463.520 ms
Sending 10000000 numbers to queue.SimpleQueue():       773.727 ms
Run Code Online (Sandbox Code Playgroud)

这是在运行 Python 3.11.7 的 M1 MacBook Pro 上进行的。

毫不奇怪,queue如果您拥有的只是线程,那么使用该包会产生更好的结果。也就是说,我queue.SimpleQueue对它的性能感到惊讶。


"""
pipe_performance.py
"""
import threading as td
import queue
import multiprocessing as mp
import multiprocessing.connection as mp_connection
import time
import typing


def reader_pipe(p_out: mp_connection.Connection) -> None:
    while True:
        msg = p_out.recv()
        if msg == "DONE":
            break


def reader_queue(p_queue: "queue.Queue[typing.Union[str, int]]") -> None:
    while True:
        msg = p_queue.get()
        if msg == "DONE":
            break


def pretty_print(count: int, name: str, t: float) -> None:
    text = f"Sending {count} numbers to {name}:"
    t_text = f"{t*1e3:.3f} ms"
    print(f"{text:<50}{t_text:>15}")


if __name__ == "__main__":
    for count in [10**4, 10**5, 10**6, 10**7]:
        # first: mp.pipe
        p_mppipe_out, p_mppipe_in = mp.Pipe()
        reader_p = td.Thread(target=reader_pipe, args=((p_mppipe_out),))
        reader_p.start()
        _start = time.time()
        for ii in range(0, count):
            p_mppipe_in.send(ii)
        p_mppipe_in.send("DONE")
        reader_p.join()
        pretty_print(count, "mp.Pipe()", time.time() - _start)

        # second: mp.Queue
        p_mpqueue = mp.Queue()
        reader_p = td.Thread(target=reader_queue, args=((p_mpqueue),))
        reader_p.start()
        _start = time.time()
        for ii in range(0, count):
            p_mpqueue.put(ii)
        p_mpqueue.put("DONE")
        reader_p.join()
        pretty_print(count, "mp.Queue()", time.time() - _start)

        # third: mp.SimpleQueue
        p_mpsqueue = mp.SimpleQueue()
        reader_p = td.Thread(target=reader_queue, args=((p_mpsqueue),))
        reader_p.start()
        _start = time.time()
        for ii in range(0, count):
            p_mpsqueue.put(ii)
        p_mpsqueue.put("DONE")
        reader_p.join()
        pretty_print(count, "mp.SimpleQueue()", time.time() - _start)

        # fourth: queue.Queue
        p_queue = queue.Queue()
        reader_p = td.Thread(target=reader_queue, args=((p_queue),))
        reader_p.start()
        _start = time.time()
        for ii in range(0, count):
            p_queue.put(ii)
        p_queue.put("DONE")
        reader_p.join()
        pretty_print(count, "queue.Queue()", time.time() - _start)

        # fifth: queue.SimpleQueue
        p_squeue = queue.SimpleQueue()
        reader_p = td.Thread(target=reader_queue, args=((p_squeue),))
        reader_p.start()
        _start = time.time()
        for ii in range(0, count):
            p_squeue.put(ii)
        p_squeue.put("DONE")
        reader_p.join()
        pretty_print(count, "queue.SimpleQueue()", time.time() - _start)
Run Code Online (Sandbox Code Playgroud)


ski*_*dge 7

Queue()值得注意的一个附加功能是馈线。节指出“当一个进程第一次将一个项目放入队列时,一个馈线线程将启动,它将对象从缓冲区传输到管道中。” 可以在Queue()不调用queue.put()阻塞的情况下插入无限数量的(或 maxsize)项。这允许您在 , 中存储多个项目Queue(),直到您的程序准备好处理它们。

Pipe()另一方面,对于已发送到一个连接但尚未从另一个连接接收的项目,它具有有限的存储量。在此存储空间用完后,调用connection.send()将阻塞,直到有空间写入整个项目。这将停止执行写入的线程,直到其他线程从管道中读取。Connection对象使您可以访问底层文件描述符。在 *nix 系统上,您可以connection.send()使用该os.set_blocking()函数防止调用阻塞。但是,如果您尝试发送不适合管道文件的单个项目,这将导致问题。最新版本的 Linux 允许您增加文件的大小,但允许的最大大小因系统配置而异。因此,您永远不应该依赖Pipe()缓冲数据。调用connection.send 可能会阻塞,直到从其他管道读取数据。

总之,当您需要缓冲数据时,Queue 是比管道更好的选择。即使您只需要在两点之间进行通信。