Kob*_*ohn 7 python multiprocessing
有没有人知道一种干净的方式来接近LIFO甚至不接近FIFO(例如随机)的行为multiprocessing.Queue
?
备选问题:有人能指出管理实际存储结构的线程的代码multiprocessing.Queue
吗?似乎在提供大约LIFO访问权限的过程中是微不足道的,但我在试图找到它的兔子洞中迷路了.
笔记:
multiprocessing.Queue
不保证订单.精细.但它接近FIFO,所以接近LIFO会很棒.(编辑)澄清:我正在进行CPU绑定模拟,multiprocessing
因此不能使用专门的队列Queue
.由于我几天没有看到任何答案,我在上面添加了替代问题.
如果是一个问题,下面multiprocessing.Queue
是接近FIFO的轻微证据.它只是表明在一个简单的情况下(单个线程),它在我的系统上是完美的FIFO:
import multiprocessing as mp
import Queue
q = mp.Queue()
for i in xrange(1000):
q.put(i)
deltas = []
while True:
try:
value1 = q.get(timeout=0.1)
value2 = q.get(timeout=0.1)
deltas.append(value2-value1)
except Queue.Empty:
break
#positive deltas would indicate the numbers are coming out in increasing order
min_delta, max_delta = min(deltas), max(deltas)
avg_delta = sum(deltas)/len(deltas)
print "min", min_delta
print "max", max_delta
print "avg", avg_delta
Run Code Online (Sandbox Code Playgroud)
打印:最小值,最大值和平均值正好为1(完美FIFO)
我查看了Lib/multiprocessing/queues.py
我的 Python 安装中的 Queue 类(Python 2.7,但与我简要检查过的 Python 3.2 版本没有明显的不同)。我是这样理解它的工作原理的:
队列对象维护两组对象。一组是由所有进程共享的多进程安全原语。其他的由每个进程单独创建和使用。
跨进程对象在方法中设置__init__
:
Pipe
对象,其两端保存为self._reader
和self._writer
。BoundedSemaphore
对象,用于计算(并可选择限制)队列中的对象数量。Lock
用于读取 Pipe 的对象,在非 Windows 平台上另一个用于写入。(我认为这是因为在 Windows 上写入管道本质上是多进程安全的。)_after_fork
每个进程的对象在和方法中设置_start_thread
:
collections.deque
用于缓冲对 Pipe 的写入的对象。threading.condition
用于在缓冲区不为空时发出信号的对象。threading.Thread
进行实际写入的对象。它是惰性创建的,因此在给定进程中至少请求对队列进行一次写入之前,它不会存在。Finalize
进程结束时清理内容的各种对象。get
队列中的A非常简单。您获取读锁,递减信号量,并从管道的读取端获取一个对象。
Aput
比较复杂。它使用多个线程。调用者put
获取条件的锁,然后将其对象添加到缓冲区并在解锁之前发出条件信号。它还会增加信号量并启动编写器线程(如果尚未运行)。
编写器线程在方法中永远循环(直到取消)_feed
。如果缓冲区为空,则等待条件notempty
。然后它从缓冲区中取出一个项目,获取写锁(如果存在)并将该项目写入管道。
那么,考虑到所有这些,您可以修改它以获得 LIFO 队列吗?看起来并不容易。管道本质上是 FIFO 对象,虽然队列不能保证整体 FIFO 行为(由于来自多个进程的写入的异步性质),但它始终主要是 FIFO。
如果您只有一个消费者,您可以get
从队列中获取对象并将它们添加到您自己的进程本地堆栈中。尽管使用共享内存,有限大小的堆栈不会太难,但执行多使用者堆栈会更困难。您需要一个锁、一对条件(用于在满状态和空状态下阻塞/发信号)、一个共享整数值(用于保存的值的数量)和一个适当类型的共享数组(用于值本身)。
归档时间: |
|
查看次数: |
2186 次 |
最近记录: |