Python 等待多处理池中的进程完成而不关闭池或使用 map()

use*_*481 1 python multiprocessing

我有一个像下面这样的代码段

pool = multiprocessing.Pool(10)
for i in range(300):
    for m in range(500):
        data = do_some_calculation(resource)
        pool.apply_async(paralized_func, data, call_back=update_resource)
    # need to wait for all processes finish
    # {...}
    # Summarize resource
    do_something_with_resource(resource)
Run Code Online (Sandbox Code Playgroud)

所以基本上我有2个循环。我在循环外初始化进程池以避免过热。在第二个循环结束时,我想总结所有过程的结果。

问题是我不能pool.map()因为data输入的变化而等待。我不能使用pool.join()andpool.close()或者因为我仍然需要pool在第一个循环的下一次迭代中使用。

在这种情况下等待进程完成的好方法是什么?

我尝试在第二个循环结束时检查 pool._cache。

while len(process_pool._cache) > 0:
    sleep(0.001)
Run Code Online (Sandbox Code Playgroud)

这种方式有效,但看起来很奇怪。有一个更好的方法吗?

Sra*_*raw 7

apply_async将返回一个AsyncResult对象。这个对象有一个方法wait([timeout]),你可以使用它。

例子:

pool = multiprocessing.Pool(10)
for i in range(300):
    results = []
    for m in range(500):
        data = do_some_calculation(resource)
        result = pool.apply_async(paralized_func, data, call_back=update_resource)
        results.append(result)
    [result.wait() for result in results]
    # need to wait for all processes finish
    # {...}
    # Summarize resource
    do_something_with_resource(resource)
Run Code Online (Sandbox Code Playgroud)

我没有检查这段代码,因为它不是可执行的,但它应该可以工作。

  • 你不必担心。首先,`AsyncResult` 将隐式存储在池中,因此您不会节省任何空间。其次,`AsyncResult` 只是一个非常小的对象,它不会花费你太多。 (3认同)