如何获得Python多处理池剩余的"工作量"?

E.Z*_*.Z. 9 python parallel-processing pool process multiprocessing

到目前为止,每当我需要使用时,multiprocessing我都是通过手动创建"进程池"并与所有子进程共享工作队列来完成的.

例如:

from multiprocessing import Process, Queue


class MyClass:

    def __init__(self, num_processes):
        self._log         = logging.getLogger()
        self.process_list = []
        self.work_queue   = Queue()
        for i in range(num_processes):
            p_name = 'CPU_%02d' % (i+1)
            self._log.info('Initializing process %s', p_name)
            p = Process(target = do_stuff,
                        args   = (self.work_queue, 'arg1'),
                        name   = p_name)
Run Code Online (Sandbox Code Playgroud)

这样我就可以在队列中添加东西,这些东西将由子进程使用.然后,我可以通过检查以下内容来监控处理的进度Queue.qsize():

    while True:
        qsize = self.work_queue.qsize()
        if qsize == 0:
            self._log.info('Processing finished')
            break
        else:
            self._log.info('%d simulations still need to be calculated', qsize)
Run Code Online (Sandbox Code Playgroud)

现在我认为这multiprocessing.Pool可以简化很多代码.

我无法找到的是如何监控仍有待完成的"工作量".

请看以下示例:

from multiprocessing import Pool


class MyClass:

    def __init__(self, num_processes):
        self.process_pool = Pool(num_processes)
        # ...
        result_list = []
        for i in range(1000):            
            result = self.process_pool.apply_async(do_stuff, ('arg1',))
            result_list.append(result)
        # ---> here: how do I monitor the Pool's processing progress?
        # ...?
Run Code Online (Sandbox Code Playgroud)

有任何想法吗?

aar*_*ren 14

使用Manager队列.这是在工作进程之间共享的队列.如果您使用普通队列,它将被每个工作人员腌制和取消,并因此被复制,以便每个工作人员无法更新队列.

然后,让工作人员向队列中添加内容,并在工作人员工作时监视队列的状态.你需要这样做,map_async因为这可以让你看到整个结果何时准备就绪,让你打破监控循环.

例:

import time
from multiprocessing import Pool, Manager


def play_function(args):
    """Mock function, that takes a single argument consisting
    of (input, queue). Alternately, you could use another function
    as a wrapper.
    """
    i, q = args
    time.sleep(0.1)  # mock work
    q.put(i)
    return i

p = Pool()
m = Manager()
q = m.Queue()

inputs = range(20)
args = [(i, q) for i in inputs]
result = p.map_async(play_function, args)

# monitor loop
while True:
    if result.ready():
        break
    else:
        size = q.qsize()
        print(size)
        time.sleep(0.1)

outputs = result.get()
Run Code Online (Sandbox Code Playgroud)