为什么`multiprocessing.Queue.get`这么慢?

JBS*_*rro 8 message-queue python-3.x python-multiprocessing

我需要帮助才能理解multiprocessing.Queue.我遇到的问题是,queue.get(...)与调用queue.put(...)和队列缓冲区(deque)相比,获得结果是非常有趣的.

这种泄漏抽象导致我调查队列的内部.它直截了当的源代码只是指向deque实现,这似乎也很简单,我不能用它来解释我所看到的行为.另外我读到Queue使用管道,但我似乎无法在源代码中找到它.

我把它归结为一个重现问题的最小例子,我在下面指定了一个可能的输出.

import threading
import multiprocessing
import queue

q = None
def enqueue(item):
    global q
    if q is None:
        q = multiprocessing.Queue()
        process = threading.Thread(target=worker, args=(q,))  # or multiprocessing.Process Doesn't matter
        process.start()
    q.put(item)
    print(f'len putted item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')


def worker(local_queue):
    while True:
        try:
            while True:  # get all items
                item = local_queue.get(block=False)
                print(f'len got item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
        except queue.Empty:
            print('empty')


if __name__ == '__main__':
    for i in range(1, 100000, 1000):
        enqueue(list(range(i)))
Run Code Online (Sandbox Code Playgroud)

输出:

empty
empty
empty
len putted item: 1. qsize: 1. buffer len: 1
len putted item: 1001. qsize: 2. buffer len: 2
len putted item: 2001. qsize: 3. buffer len: 1
len putted item: 3001. qsize: 4. buffer len: 2
len putted item: 4001. qsize: 5. buffer len: 3
len putted item: 5001. qsize: 6. buffer len: 4
len putted item: 6001. qsize: 7. buffer len: 5
len putted item: 7001. qsize: 8. buffer len: 6
len putted item: 8001. qsize: 9. buffer len: 7
len putted item: 9001. qsize: 10. buffer len: 8
len putted item: 10001. qsize: 11. buffer len: 9
len putted item: 11001. qsize: 12. buffer len: 10
len putted item: 12001. qsize: 13. buffer len: 11
len putted item: 13001. qsize: 14. buffer len: 12
len putted item: 14001. qsize: 15. buffer len: 13
len putted item: 15001. qsize: 16. buffer len: 14
len got item: 1. qsize: 15. buffer len: 14
len putted item: 16001. qsize: 16. buffer len: 15
len putted item: 17001. qsize: 17. buffer len: 16
len putted item: 18001. qsize: 18. buffer len: 17
len putted item: 19001. qsize: 19. buffer len: 18
len putted item: 20001. qsize: 20. buffer len: 19
len putted item: 21001. qsize: 21. buffer len: 20
len putted item: 22001. qsize: 22. buffer len: 21
len putted item: 23001. qsize: 23. buffer len: 22
len putted item: 24001. qsize: 24. buffer len: 23
len putted item: 25001. qsize: 25. buffer len: 24
len putted item: 26001. qsize: 26. buffer len: 25
len putted item: 27001. qsize: 27. buffer len: 26
len putted item: 28001. qsize: 28. buffer len: 27
len got item: 1001. qsize: 27. buffer len: 27
empty
len putted item: 29001. qsize: 28. buffer len: 28
empty
empty
empty
len got item: 2001. qsize: 27. buffer len: 27
empty
len putted item: 30001. qsize: 28. buffer len: 28
Run Code Online (Sandbox Code Playgroud)

我希望您注意以下结果:插入元素28001后,工作人员发现队列中没有剩余元素,而有几十个元素.由于同步,我只能获得除了少数几个以外的所有内容.但它只设法找到两个!

这种模式还在继续.

这似乎与我放在队列中的对象的大小有关.对于小对象,例如i相反list(range(i)),此问题不会出现.但是正在谈论的对象的大小仍然是千字节,并不足以使这种显着的延迟高贵(在我的现实世界非最小的例子中这很容易花费几分钟)

我的具体问题是:如何在Python中的进程之间共享(不是这样)大量数据?另外,我想知道在Queue的内部实现中,这种迟缓是从哪里来的

wei*_*114 6

我也遇到了这个问题.我正在发送大型numpy数组(~300MB),而且它在mp.queue.get()时非常慢.

在看了一下mp.Queue的python2.7源代码后,我发现最慢的部分(在unix类系统上)_conn_recvall()socket_connection.c中,但我看起来并不深.

为了解决这个问题,我构建了一个实验包FMQ.

这个项目的灵感来自于使用multiprocessing.Queue(mp.Queue).由于管道的速度限制(在类Unix系统上),mp.Queue对大数据项来说很慢.

使用mp.Queue处理进程间传输,FMQ实现了一个窃取程序线程,一旦任何项目可用,它就会从mp.Queue中窃取一个项目,并将其放入Queue.Queue.然后,使用者进程可以立即从Queue.Queue获取数据.

加速是基于生产者和消费者过程都是计算密集型(因此需要多处理)并且数据很大(例如> 50 227×227图像)的假设.否则使用多处理的mp.Queue或带线程的Queue.Queue就足够了.

fmq.Queue很容易像mp.Queue一样使用.

请注意,仍有一些已知问题,因为该项目尚处于初期阶段.


Vin*_*s M 5

对于未来的读者,您还可以尝试使用:

q = multiprocessing.Manager().Queue()
Run Code Online (Sandbox Code Playgroud)

而不是仅仅

q = multiprocessing.Queue()
Run Code Online (Sandbox Code Playgroud)

我还没有完全提炼和理解这种行为背后的机制,但我读过的一个来源声称它是关于:

“当将大项目推入队列时,尽管队列的 put 函数立即返回,但这些项目基本上是缓冲的。”

作者继续解释更多关于它的内容和修复方法,但对我来说,添加 Manager 使这个技巧变得简单而干净。

更新:我相信这个 StackOverflow 答案有助于解释这个问题。

接受的答案中提到的 FMQ 也是 Python2 独有的,这也是我觉得这个答案有一天可能会帮助更多人的原因之一。

  • 尽管我只将三个整数的元组放入其中,但我的队列很慢。 (2认同)
  • 一个相关的问题:/sf/answers/3166572391/ (2认同)