Dsc*_*oni 5 python queue python-multiprocessing
在 Python 2.7 中,我实现了一个具有多个队列和消费者的多处理场景。简化的想法是,我有一个作业的生产者,这些作业被提供给消费者,处理作业和一个错误处理程序,它负责所有的日志记录。非常简化,看起来都可以与之媲美:
import multiprocessing as mp
import Queue
job_queue = mp.Queue()
error_queue = mp.Queue()
for i in range(10):
job_queue.put(i)
def job_handler(job_queue, error_queue):
print 'Job handler'
while True:
try:
element = job_queue.get_nowait()
print element
except:
# t1
error_queue.put('Error')
error_queue.close()
error_queue.join_thread()
job_queue.close()
job_queue.join_thread()
# t2
return 1
def error_handler(error_queue):
result = error_queue.get()
if result == 'Error':
error_queue.close()
error_queue.join_thread()
if __name__ == '__main__':
print 'Starting'
p1 = mp.Process(target = error_handler, args = (error_queue, ))
p1.start()
p2 = mp.Process(target = job_handler, args = (job_queue, error_queue))
p2.start()
Run Code Online (Sandbox Code Playgroud)
这基本上有效,但在我更复杂的程序中,两个评论点t1和t2(大约 5 分钟)之间存在很长的时间差。所以我有两个问题:
close()和 join_thread()对所有使用队列对象,以表明它使用它们做了什么?我认为,当我结束它们时,子流程会隐含地这样做,例如通过返回此处所述:join_thread()加入后台线程。这只能在调用 close() 后使用。它会阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道中。
默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。进程可以调用cancel_join_thread() 使join_thread() 什么也不做。
致电.close()和.join_thread()是建议,但不是必须的。.close()当队列被垃圾回收时自动调用,并且.join_thread()在进程终止时自动调用。
不幸的是,我运行了你的代码,并在 5 秒后得到了一个漂亮的终止结果,并打印了 0-9。即使当我推送一个不可打印的字符时,我也没有收到任何延迟。该代码似乎运行流畅。
对于更复杂的程序,如果您通过队列传递大量数据,则可能会发生这种情况。队列用作IPC,这意味着数据在一侧编码,推入管道,并在另一侧解码。传递大量数据会导致速度减慢。由于它最终会自行解决,因此看起来并不像僵局。
尽管最好避免这种情况,但一个选择是使用共享内存而不是队列。这样,数据不需要在进程之间传递,而只需保留在两个进程共享的一个内存段中。
| 归档时间: |
|
| 查看次数: |
3513 次 |
| 最近记录: |