如何确保多处理队列为空

alp*_*ric 4 python multiprocessing

下面的代码首先启动多个进程。然后它运行一个while True循环检查queue对象。最后,它迭代进程以检查是否有任何活动。所有过程完成后,breaks循环while。不幸的是,它发生在queue对象不为空时。在没有存储数据的情况下打破循环queue可能很容易监督数据丢失。如何修改代码逻辑以确保queue在中断循环之前对象为空?

import time, multiprocessing, os
logger = multiprocessing.log_to_stderr()

def foo(*args):
    for i in range(3):
        queue = args[0]
        queue.put(os.getpid())

items = dict()
for i in range(5):
    queue = multiprocessing.Queue()
    proc = multiprocessing.Process(target=foo, args=(queue,))
    items[proc] = queue
    proc.start()
    time.sleep(0.1)

while True:
    time.sleep(1)

    for proc, queue in items.items():
        if not queue.empty():
            print(queue.get()) 

    if not True in [proc.is_alive() for proc in items]:
        if not queue.empty():
            logger.warning('...not empty: %s' % queue.get()) 
        break 
Run Code Online (Sandbox Code Playgroud)

geo*_*xsh 6

又是同步问题。当您检查队列发现它是空的时,不能保证将来不会有新项目出现。

当子进程完成其工作时,您可以将哨兵放入队列中,以通知队列中不再有项目。父进程可以排空队列,直到获得哨兵。这也是 所使用的方法multiprocessing.Pool。你可以None在这里用作哨兵:

def foo(*args):
    for i in range(3):
        queue = args[0]
        queue.put(os.getpid())
    queue.put(None)

...

while items:
    for proc in tuple(items.keys()):
        queue = items[proc]
        if not queue.empty():
            r = queue.get()
            print(r)
            if r is None:
                proc.join()
                del items[proc]
    time.sleep(0.1)
Run Code Online (Sandbox Code Playgroud)