Jon*_*han 128 python queue performance pipe multiprocessing
Python的多处理包中队列和管道之间的根本区别是什么?
在什么情况下应该选择一个而不是另一个?什么时候使用有利Pipe()
?什么时候使用有利Queue()
?
Mik*_*ton 242
何时使用它们
如果您需要两个以上的通信点,请使用Queue()
.
如果你需要绝对的性能,那么a Pipe()
会快得多,因为Queue()
它建立在它之上Pipe()
.
绩效基准
假设您想要生成两个进程并尽快在它们之间发送消息.这些是使用Pipe()
和的类似测试之间的拖拽竞赛的时间结果Queue()
......这是在运行Ubuntu 11.10和Python 2.7.2的ThinkpadT61上.
仅供参考,我把结果JoinableQueue()
作为奖金投入; 调用JoinableQueue()
任务时queue.task_done()
(它甚至不知道特定任务,它只计算队列中未完成的任务),因此queue.join()
知道工作已完成.
这个答案底部的每个代码......
mpenning@mpenning-T61:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$
Run Code Online (Sandbox Code Playgroud)
总结Pipe()
大约是a的三倍Queue()
.JoinableQueue()
除非你真的必须得到好处,否则不要考虑.
奖金材料2
多处理引入了信息流的细微变化,除非您知道一些快捷方式,否则会使调试变得困难.例如,在许多条件下通过字典索引时,您可能有一个可正常工作的脚本,但在某些输入中很少失败.
通常我们会在整个python进程崩溃时找到失败的线索; 但是,如果多处理功能崩溃,则不会将未经请求的崩溃回溯打印到控制台.追踪未知的多处理崩溃是很困难的,没有对流程崩溃的线索.
我发现追踪多处理崩溃信息的最简单方法是将整个多处理函数包装在try
/中except
并使用traceback.print_exc()
:
import traceback
def reader(args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print "FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
Run Code Online (Sandbox Code Playgroud)
现在,当您发现崩溃时,您会看到以下内容:
FATAL: reader([{'crash', 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File "foo.py", line 19, in __init__
self.run(task_q, result_q)
File "foo.py", line 46, in run
raise ValueError
ValueError
Run Code Online (Sandbox Code Playgroud)
源代码:
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader_proc(pipe):
## Read from the pipe; this will be spawned as a separate Process
p_output, p_input = pipe
p_input.close() # We are only reading
while True:
msg = p_output.recv() # Read from the output pipe and do nothing
if msg=='DONE':
break
def writer(count, p_input):
for ii in xrange(0, count):
p_input.send(ii) # Write 'count' numbers into the input pipe
p_input.send('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
# Pipes are unidirectional with two endpoints: p_input ------> p_output
p_output, p_input = Pipe() # writer() writes to p_input from _this_ process
reader_p = Process(target=reader_proc, args=((p_output, p_input),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
p_output.close() # We no longer need this part of the Pipe()
_start = time.time()
writer(count, p_input) # Send a lot of stuff to reader_proc()
p_input.close()
reader_p.join()
print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
(time.time() - _start)))
Run Code Online (Sandbox Code Playgroud)
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to Queue() took {1} seconds".format(count,
(time.time() - _start)))
Run Code Online (Sandbox Code Playgroud)
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
# reader_proc() reads from jqueue as a different process...
reader_p = Process(target=reader_proc, args=((jqueue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
jqueue.join() # Wait for the reader to finish
print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,
(time.time() - _start)))
Run Code Online (Sandbox Code Playgroud)
如果 - 像我一样 - 您想知道是否在程序中使用multiprocessing
构造 (Pipe
或Queue
)threading
来提高性能,我已经改编了Mike Pennington的脚本来与queue.Queue
和进行比较queue.SimpleQueue
:
Sending 10000 numbers to mp.Pipe(): 57.769 ms
Sending 10000 numbers to mp.Queue(): 74.844 ms
Sending 10000 numbers to mp.SimpleQueue(): 66.662 ms
Sending 10000 numbers to queue.Queue(): 8.253 ms
Sending 10000 numbers to queue.SimpleQueue(): 0.831 ms
Sending 100000 numbers to mp.Pipe(): 421.775 ms
Sending 100000 numbers to mp.Queue(): 812.989 ms
Sending 100000 numbers to mp.SimpleQueue(): 682.442 ms
Sending 100000 numbers to queue.Queue(): 82.091 ms
Sending 100000 numbers to queue.SimpleQueue(): 7.831 ms
Sending 1000000 numbers to mp.Pipe(): 4198.766 ms
Sending 1000000 numbers to mp.Queue(): 8302.404 ms
Sending 1000000 numbers to mp.SimpleQueue(): 6845.322 ms
Sending 1000000 numbers to queue.Queue(): 840.551 ms
Sending 1000000 numbers to queue.SimpleQueue(): 77.338 ms
Sending 10000000 numbers to mp.Pipe(): 43341.405 ms
Sending 10000000 numbers to mp.Queue(): 85868.946 ms
Sending 10000000 numbers to mp.SimpleQueue(): 71669.009 ms
Sending 10000000 numbers to queue.Queue(): 8463.520 ms
Sending 10000000 numbers to queue.SimpleQueue(): 773.727 ms
Run Code Online (Sandbox Code Playgroud)
这是在运行 Python 3.11.7 的 M1 MacBook Pro 上进行的。
毫不奇怪,queue
如果您拥有的只是线程,那么使用该包会产生更好的结果。也就是说,我queue.SimpleQueue
对它的性能感到惊讶。
"""
pipe_performance.py
"""
import threading as td
import queue
import multiprocessing as mp
import multiprocessing.connection as mp_connection
import time
import typing
def reader_pipe(p_out: mp_connection.Connection) -> None:
while True:
msg = p_out.recv()
if msg == "DONE":
break
def reader_queue(p_queue: "queue.Queue[typing.Union[str, int]]") -> None:
while True:
msg = p_queue.get()
if msg == "DONE":
break
def pretty_print(count: int, name: str, t: float) -> None:
text = f"Sending {count} numbers to {name}:"
t_text = f"{t*1e3:.3f} ms"
print(f"{text:<50}{t_text:>15}")
if __name__ == "__main__":
for count in [10**4, 10**5, 10**6, 10**7]:
# first: mp.pipe
p_mppipe_out, p_mppipe_in = mp.Pipe()
reader_p = td.Thread(target=reader_pipe, args=((p_mppipe_out),))
reader_p.start()
_start = time.time()
for ii in range(0, count):
p_mppipe_in.send(ii)
p_mppipe_in.send("DONE")
reader_p.join()
pretty_print(count, "mp.Pipe()", time.time() - _start)
# second: mp.Queue
p_mpqueue = mp.Queue()
reader_p = td.Thread(target=reader_queue, args=((p_mpqueue),))
reader_p.start()
_start = time.time()
for ii in range(0, count):
p_mpqueue.put(ii)
p_mpqueue.put("DONE")
reader_p.join()
pretty_print(count, "mp.Queue()", time.time() - _start)
# third: mp.SimpleQueue
p_mpsqueue = mp.SimpleQueue()
reader_p = td.Thread(target=reader_queue, args=((p_mpsqueue),))
reader_p.start()
_start = time.time()
for ii in range(0, count):
p_mpsqueue.put(ii)
p_mpsqueue.put("DONE")
reader_p.join()
pretty_print(count, "mp.SimpleQueue()", time.time() - _start)
# fourth: queue.Queue
p_queue = queue.Queue()
reader_p = td.Thread(target=reader_queue, args=((p_queue),))
reader_p.start()
_start = time.time()
for ii in range(0, count):
p_queue.put(ii)
p_queue.put("DONE")
reader_p.join()
pretty_print(count, "queue.Queue()", time.time() - _start)
# fifth: queue.SimpleQueue
p_squeue = queue.SimpleQueue()
reader_p = td.Thread(target=reader_queue, args=((p_squeue),))
reader_p.start()
_start = time.time()
for ii in range(0, count):
p_squeue.put(ii)
p_squeue.put("DONE")
reader_p.join()
pretty_print(count, "queue.SimpleQueue()", time.time() - _start)
Run Code Online (Sandbox Code Playgroud)
Queue()
值得注意的一个附加功能是馈线。本节指出“当一个进程第一次将一个项目放入队列时,一个馈线线程将启动,它将对象从缓冲区传输到管道中。” 可以在Queue()
不调用queue.put()
阻塞的情况下插入无限数量的(或 maxsize)项。这允许您在 , 中存储多个项目Queue()
,直到您的程序准备好处理它们。
Pipe()
另一方面,对于已发送到一个连接但尚未从另一个连接接收的项目,它具有有限的存储量。在此存储空间用完后,调用connection.send()
将阻塞,直到有空间写入整个项目。这将停止执行写入的线程,直到其他线程从管道中读取。Connection
对象使您可以访问底层文件描述符。在 *nix 系统上,您可以connection.send()
使用该os.set_blocking()
函数防止调用阻塞。但是,如果您尝试发送不适合管道文件的单个项目,这将导致问题。最新版本的 Linux 允许您增加文件的大小,但允许的最大大小因系统配置而异。因此,您永远不应该依赖Pipe()
缓冲数据。调用connection.send
可能会阻塞,直到从其他管道读取数据。
总之,当您需要缓冲数据时,Queue 是比管道更好的选择。即使您只需要在两点之间进行通信。
归档时间: |
|
查看次数: |
61193 次 |
最近记录: |