从multiprocessing.Queue获得接近LIFO行为的干净方法?(甚至只是*不*近FIFO)

Kob*_*ohn 7 python multiprocessing

有没有人知道一种干净的方式来接近LIFO甚至不接近FIFO(例如随机)的行为multiprocessing.Queue

备选问题:有人能指出管理实际存储结构的线程的代码multiprocessing.Queue吗?似乎在提供大约LIFO访问权限的过程中是微不足道的,但我在试图找到它的兔子洞中迷路了.

笔记:

  1. 我相信multiprocessing.Queue 不保证订单.精细.但它接近FIFO,所以接近LIFO会很棒.
  2. 我可以将所有当前项目从队列中拉出来并在使用它们之前撤消订单,但我更愿意尽可能避免使用kludge.

(编辑)澄清:我正在进行CPU绑定模拟,multiprocessing因此不能使用专门的队列Queue.由于我几天没有看到任何答案,我在上面添加了替代问题.


如果是一个问题,下面multiprocessing.Queue是接近FIFO的轻微证据.它只是表明在一个简单的情况下(单个线程),它在我的系统上是完美的FIFO:

import multiprocessing as mp
import Queue

q = mp.Queue()

for i in xrange(1000):
    q.put(i)

deltas = []
while True:
    try:
        value1 = q.get(timeout=0.1)
        value2 = q.get(timeout=0.1)
        deltas.append(value2-value1)
    except Queue.Empty:
        break

#positive deltas would indicate the numbers are coming out in increasing order
min_delta, max_delta = min(deltas), max(deltas)
avg_delta = sum(deltas)/len(deltas)

print "min", min_delta
print "max", max_delta
print "avg", avg_delta
Run Code Online (Sandbox Code Playgroud)

打印:最小值,最大值和平均值正好为1(完美FIFO)

Blc*_*ght 3

我查看了Lib/multiprocessing/queues.py我的 Python 安装中的 Queue 类(Python 2.7,但与我简要检查过的 Python 3.2 版本没有明显的不同)。我是这样理解它的工作原理的:

队列对象维护两组对象。一组是由所有进程共享的多进程安全原语。其他的由每个进程单独创建和使用。

跨进程对象在方法中设置__init__

  1. 一个Pipe对象,其两端保存为self._readerself._writer
  2. 一个BoundedSemaphore对象,用于计算(并可选择限制)队列中的对象数量。
  3. 一个Lock用于读取 Pipe 的对象,在非 Windows 平台上另一个用于写入。(我认为这是因为在 Windows 上写入管道本质上是多进程安全的。)

_after_fork每个进程的对象在和方法中设置_start_thread

  1. collections.deque用于缓冲对 Pipe 的写入的对象。
  2. threading.condition用于在缓冲区不为空时发出信号的对象。
  3. threading.Thread进行实际写入的对象。它是惰性创建的,因此在给定进程中至少请求对队列进行一次写入之前,它不会存在。
  4. Finalize进程结束时清理内容的各种对象。

get队列中的A非常简单。您获取读锁,递减信号量,并从管道的读取端获取一个对象。

Aput比较复杂。它使用多个线程。调用者put获取条件的锁,然后将其对象添加到缓冲区并在解锁之前发出条件信号。它还会增加信号量并启动编写器线程(如果尚未运行)。

编写器线程在方法中永远循环(直到取消)_feed。如果缓冲区为空,则等待条件notempty。然后它从缓冲区中取出一个项目,获取写锁(如果存在)并将该项目写入管道。


那么,考虑到所有这些,您可以修改它以获得 LIFO 队列吗?看起来并不容易。管道本质上是 FIFO 对象,虽然队列不能保证整体 FIFO 行为(由于来自多个进程的写入的异步性质),但它始终主要是 FIFO。

如果您只有一个消费者,您可以get从队列中获取对象并将它们添加到您自己的进程本地堆栈中。尽管使用共享内存,有限大小的堆栈不会太难,但执行多使用者堆栈会更困难。您需要一个锁、一对条件(用于在满状态和空状态下阻塞/发信号)、一个共享整数值(用于保存的值的数量)和一个适当类型的共享数组(用于值本身)。