Python多处理队列可靠性,队列与SimpleQueue和JoinableQueue

dmg*_*dmg 10 python process multiprocessing

直接来自Python 文档:

class multiprocessing.Queue([maxsize])

...

qsize()返回队列的大致大小.由于多线程/多处理语义,这个数字不可靠.

empty()如果​​队列为空则返回True,否则返回False.由于多线程/多处理语义,这是不可靠的.

而且我凭经验发现这是非常正确的Queue,特别是对于empty().

在我的代码中,我有一堆进程(每个进程都是相同主进程的子进程),并且每个进程在其run方法中都有以下内容:

while self.active:
    if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0):
        try:
            self.exclusive_queue.put(self.general_queue.get(timeout=self.queue_timeout))
        except Queue.Empty as empty_queue:
            continue
    else:
        task = self.exclusive_queue.get()
        self.compute(task)
Run Code Online (Sandbox Code Playgroud)

基本上,该过程等待general_queue工作,但首先检查它exclusive_queue.主进程可以将任务放在进程的常规队列或独占队列中.现在,在if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0)我,我首先使用了一个self.exclusive_queue.empty()导致相当奇怪的行为(qsize()30+和empty() = True).

所以我在这里的方向是 - 因为multiprocessing.queues.SimpleQueue在文档中写道:

empty()如果​​队列为空则返回True,否则返回False.

根本没有提到可靠性.SimpleQueue.empty()可靠吗?

其次是multiprocessing.JoinableQueue可靠的或"比较"可靠的比Queue,因为的task_done()机制?

这样的方法可以被认为是正确的,或者回调的方法(通过子节点之间的共享管道端点)更合适吗?

Mid*_*ter 7

不是直接的答案,但我已经开始越来越依赖于使用保护条件迭代输入队列。multiprocessing 模块的文档中有一个示例:

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)
Run Code Online (Sandbox Code Playgroud)

因此,当您对队列的输入完成后,您只需将与启动进程put一样多的STOP字符串或您选择的任何守卫加入队列即可。