multiprocessing.Pool.map_async 似乎没有...做任何事情?

Sch*_*ote 3 python python-multiprocessing

所以,我正在开发一个应用程序,每次启动时都必须根据哈希列表检查 ~50 GB 的数据。显然这需要并行化,我不希望应用程序在“加载...”屏幕上挂一分半钟。

我正在使用multiprocessing.Pool'smap_async来处理这个问题;主线程调用map_async(checkfiles, path_hash_pairs, callback)并提供一个回调,告诉它在发现不匹配时发出警告。

问题是……什么都没发生。使用我的任务管理器查看 Python 进程显示它们生成然后立即终止而不做任何工作。他们从不打印任何东西,当然也永远不会完成并调用回调。

这个缩小的例子也表现出同样的问题:

def printme(x):
    time.sleep(1)
    print(x)
    return x**2

if __name__ == "__main__":
    l = list(range(0,512))

    def print_result(res):
        print(res)

    with multiprocessing.Pool() as p:
        p.map_async(printme, l, callback=print_result)
    p.join()
    time.sleep(10)
Run Code Online (Sandbox Code Playgroud)

运行它,然后......没有任何反应。交换map_asyncmap工程完全按照预期。

我只是犯了一个愚蠢的错误还是什么?

Sra*_*raw 6

让我们看看会发生什么:

您正在使用上下文管理器自动“关闭” Pool,但是,重要的是,如果您检查Pool.__exit__的源代码,您会发现:

def __exit__(self, exc_type, exc_val, exc_tb):
    self.terminate()
Run Code Online (Sandbox Code Playgroud)

它只是调用terminate而不是close. 所以,你仍然需要明确地关闭Pool,然后join它。

with multiprocessing.Pool() as p:
    p.map_async(printme, l, callback=print_result)
    p.close()
    p.join()
Run Code Online (Sandbox Code Playgroud)

但在这种情况下,使用上下文管理器是没有意义的,只需使用正常形式:

p = multiprocessing.Pool()
p.map_async(printme, l, callback=print_result)
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)

为什么它适用于map?因为map会阻塞util,所有的工作都完成了。