如何使用 multiprocessing 和 pool.map 跟踪状态?

Foo*_*Bar 7 python multiprocessing

我是第一次设置多处理模块,基本上,我打算做一些类似的事情

from multiprocessing import pool
pool = Pool(processes=102)
results = pool.map(whateverFunction, myIterable)
print 1
Run Code Online (Sandbox Code Playgroud)

据我所知,1将在所有过程都返回并且结果完成后立即打印。我想对这些进行一些状态更新。实现它的最佳方法是什么?

我有点犹豫要不要whateverFunction()打印。特别是如果有大约 200 个值,我将打印 200 次类似 'process done' 这样的东西,这不是很有用。

我希望输出像

10% of myIterable done
20% of myIterable done
Run Code Online (Sandbox Code Playgroud)

unu*_*tbu 10

pool.map阻塞直到所有并发函数调用完成。 pool.apply_async不阻塞。此外,您可以使用其callback参数来报告进度。回调函数 ,log_result每次foo完成都会调用一次。它传递由 返回的值foo

from __future__ import division
import multiprocessing as mp
import time

def foo(x):
    time.sleep(0.1)
    return x

def log_result(retval):
    results.append(retval)
    if len(results) % (len(data)//10) == 0:
        print('{:.0%} done'.format(len(results)/len(data)))

if __name__ == '__main__':
    pool = mp.Pool()
    results = []
    data = range(200)
    for item in data:
        pool.apply_async(foo, args=[item], callback=log_result)
    pool.close()
    pool.join()
    print(results)
Run Code Online (Sandbox Code Playgroud)

产量

10% done
20% done
30% done
40% done
50% done
60% done
70% done
80% done
90% done
100% done
[0, 1, 2, 3, ..., 197, 198, 199]
Run Code Online (Sandbox Code Playgroud)

log_result上面的函数修改了全局变量results并访问了全局变量data。您不能将这些变量传递给 , log_result因为 中指定的回调函数pool.apply_async总是只使用一个参数调用,即 的返回值foo

但是,您可以创建一个闭包,这至少可以明确哪些变量 log_result取决于:

from __future__ import division
import multiprocessing as mp
import time

def foo(x):
    time.sleep(0.1)
    return x

def make_log_result(results, len_data):
    def log_result(retval):
        results.append(retval)
        if len(results) % (len_data//10) == 0:
            print('{:.0%} done'.format(len(results)/len_data))
    return log_result

if __name__ == '__main__':
    pool = mp.Pool()
    results = []
    data = range(200)
    for item in data:
        pool.apply_async(foo, args=[item], callback=make_log_result(results, len(data)))
    pool.close()
    pool.join()
    print(results)
Run Code Online (Sandbox Code Playgroud)