使用多处理启动大量异步进程

Dus*_*rea 0 python multiprocessing high-volume

如果我调用apply_async 10,000次,假设OOM杀手没有干扰,多处理会同时启动它们,还是会批量启动它们.例如..每100次启动,等待90次完成启动再开始之前?

达斯汀

Tim*_*ers 6

apply_async()是一种multiprocessing.Pool对象的方法,并将所有工作传递给您在创建时指定的进程数Pool.只有那么多任务可以同时运行.其余的由多处理机器保存在队列(或管道)中,并在完成已分配的任务时自动发送到进程.几乎相同的是真正的所有Pool给你喂多个工作项的方法.

更多澄清: apply_async不创建或启动任何流程.您调用时创建了进程Pool().这些过程只是坐在那里等到你调用Pool方法(比如apply_async())要求完成一些真正的工作.

玩这个:

MAX = 100000

from time import sleep
def f(i):
    sleep(0.01)
    return i

def summer(summand):
    global SUM, FINISHED
    SUM += summand
    FINISHED += 1

if __name__ == "__main__":
    import multiprocessing as mp
    SUM = 0
    FINISHED = 0
    pool = mp.Pool(4)

    print "queuing", MAX, "work descriptions"
    for i in xrange(MAX):
        pool.apply_async(f, args=(i,), callback=summer)
        if i % 1000 == 0:
            print "{}/{}".format(FINISHED, i),
    print

    print "closing pool"
    pool.close()

    print "waiting for processes to end"
    pool.join()

    print "verifying result"
    print "got", SUM, "expected", sum(xrange(MAX))
Run Code Online (Sandbox Code Playgroud)

输出如下:

queuing 100000 work descriptions
0/0 12/1000 21/2000 33/3000 42/4000
... stuff chopped for brevity ...
1433/95000 1445/96000 1456/97000 1466/98000 1478/99000
closing pool
waiting for processes to end
... and it waits here "for a long time" ...
verifying result
got 4999950000 expected 4999950000
Run Code Online (Sandbox Code Playgroud)

您只需观察其行为即可回答大部分问题.工作项目排队很快.当我们看到"关闭池"时,所有工作项都已排队,但是已经完成了1478个,大约98000个仍在等待某些进程处理它们.

如果你把sleep(0.01)出来的f(),它揭示要少得多,因为结果回来几乎一样快,工作项目进行排队.

不管你怎么运行它,内存使用仍然是微不足道的.这里的工作项(函数("f")的名称及其pickle整数参数)很小.