我如何等待 ThreadPoolExecutor.map 完成

Dr.*_*all 7 python python-multithreading python-3.x

我有以下已简化的代码:

import concurrent.futures

pool = concurrent.futures.ThreadPoolExecutor(8)

def _exec(x):
    return x + x

myfuturelist = pool.map(_exec,[x for x in range(5)])

# How do I wait for my futures to finish?

for result in myfuturelist:
    # Is this how it's done?
    print(result)

#... stuff that should happen only after myfuturelist is
#completely resolved.
# Documentation says pool.map is asynchronous
Run Code Online (Sandbox Code Playgroud)

关于 ThreadPoolExecutor.map 的文档很薄弱。帮助会很棒。

谢谢!

mwa*_*way 12

要将呼叫ThreadPoolExecutor.map不会阻塞,直到它的所有任务已完成。使用等待来做到这一点。

from concurrent.futures import wait, ALL_COMPLETED
...

futures = [pool.submit(fn, args) for args in arg_list]
wait(futures, timeout=whatever, return_when=ALL_COMPLETED)  # ALL_COMPLETED is actually the default
do_other_stuff()
Run Code Online (Sandbox Code Playgroud)

您还可以调用list(results)返回的生成器pool.map以强制评估(这就是您在原始示例中所做的)。但是,如果您实际上没有使用从任务返回的值,wait则是要走的路。


mil*_*ice 10

确实Executor.map()不会等到所有的未来都结束。因为它返回一个像 @MisterMiyagi 所说的惰性迭代器。

但我们可以通过使用以下方法来实现这一点with

import time

from concurrent.futures import ThreadPoolExecutor

def hello(i):
    time.sleep(i)
    print(i)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.map(hello, [1, 2, 3])
print("finish")

# output
# 1
# 2
# 3
# finish
Run Code Online (Sandbox Code Playgroud)

如您所见,finish是在 后打印的1,2,3。它之所以有效,是因为Executor有一个__exit__()方法,代码

def __exit__(self, exc_type, exc_val, exc_tb):
    self.shutdown(wait=True)
    return False
Run Code Online (Sandbox Code Playgroud)

的方法是shutdown ThreadPoolExecutor

def shutdown(self, wait=True, *, cancel_futures=False):
    with self._shutdown_lock:
        self._shutdown = True
        if cancel_futures:
            # Drain all work items from the queue, and then cancel their
            # associated futures.
            while True:
                try:
                    work_item = self._work_queue.get_nowait()
                except queue.Empty:
                    break
                if work_item is not None:
                    work_item.future.cancel()

        # Send a wake-up to prevent threads calling
        # _work_queue.get(block=True) from permanently blocking.
        self._work_queue.put(None)
    if wait:
        for t in self._threads:
            t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__
Run Code Online (Sandbox Code Playgroud)

因此,通过使用with,我们可以获得等待所有 future 完成的能力。


Cth*_*Sky 4

地图和提交之间的区别

Executor.map将并行运行作业并等待 future 完成、收集结果并返回生成器。它已经为您完成了等待。如果设置超时,它将等到超时并在生成器中抛出异常。

映射(func,*可迭代,超时=无,块大小= 1)

  • 可迭代对象是立即收集的,而不是延迟收集的;
  • func 是异步执行的,并且可以同时对 func 进行多次调用。

要获取期货列表并手动等待,您可以使用:

myfuturelist = [pool.submit(_exec, x) for x in range(5)]
Run Code Online (Sandbox Code Playgroud)

Executor.submit将返回一个future对象,调用resultfuture 将显式等待它完成:

myfuturelist[0].result() # wait the 1st future to finish and return the result
Run Code Online (Sandbox Code Playgroud)

编辑 2023-02-24

尽管原始答案已被接受,但请检查 mway 和 milkice 的。我将尝试在这里添加一些细节。

wait是更好的方法,它可以让你通过参数return_when控制如何等待未来:

  • FIRST_COMPLETED,等待第一个完成
  • FIRST_EXCEPTION,等到第一个引发异常或全部完成
  • ALL_COMPLETED,等待全部完成

它返回已完成的 future 和未完成的 future 的元组:

# wait first one to finish
finished_set, unfinished_set = wait(myfuturelist, return_when=FIRST_COMPLETED)
# wait all 
wait(myfuturelist, return_when=ALL_COMPLETED)
Run Code Online (Sandbox Code Playgroud)

使用with很优雅,但请注意:

  • 您无法直接访问这些返回值(不过您可以解决方法,例如非本地或全局变量)
  • 你需要关闭池,这意味着你不能重用它,以节省线程创建和销毁的成本

  • “立即收集可迭代对象”指的是可迭代对象*参数*(传递给映射函数的可迭代参数)。`map` 立即提交任务,但在所有任务完成之前它不会阻塞。 (6认同)