Dr.*_*all 7 python python-multithreading python-3.x
我有以下已简化的代码:
import concurrent.futures
pool = concurrent.futures.ThreadPoolExecutor(8)
def _exec(x):
return x + x
myfuturelist = pool.map(_exec,[x for x in range(5)])
# How do I wait for my futures to finish?
for result in myfuturelist:
# Is this how it's done?
print(result)
#... stuff that should happen only after myfuturelist is
#completely resolved.
# Documentation says pool.map is asynchronous
Run Code Online (Sandbox Code Playgroud)
关于 ThreadPoolExecutor.map 的文档很薄弱。帮助会很棒。
谢谢!
mwa*_*way 12
要将呼叫ThreadPoolExecutor.map
也不会阻塞,直到它的所有任务已完成。使用等待来做到这一点。
from concurrent.futures import wait, ALL_COMPLETED
...
futures = [pool.submit(fn, args) for args in arg_list]
wait(futures, timeout=whatever, return_when=ALL_COMPLETED) # ALL_COMPLETED is actually the default
do_other_stuff()
Run Code Online (Sandbox Code Playgroud)
您还可以调用list(results)
返回的生成器pool.map
以强制评估(这就是您在原始示例中所做的)。但是,如果您实际上没有使用从任务返回的值,wait
则是要走的路。
mil*_*ice 10
确实Executor.map()
不会等到所有的未来都结束。因为它返回一个像 @MisterMiyagi 所说的惰性迭代器。
但我们可以通过使用以下方法来实现这一点with
:
import time
from concurrent.futures import ThreadPoolExecutor
def hello(i):
time.sleep(i)
print(i)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.map(hello, [1, 2, 3])
print("finish")
# output
# 1
# 2
# 3
# finish
Run Code Online (Sandbox Code Playgroud)
如您所见,finish
是在 后打印的1,2,3
。它之所以有效,是因为Executor
有一个__exit__()
方法,代码是
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
Run Code Online (Sandbox Code Playgroud)
的方法是shutdown
ThreadPoolExecutor
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
# Drain all work items from the queue, and then cancel their
# associated futures.
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()
# Send a wake-up to prevent threads calling
# _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__
Run Code Online (Sandbox Code Playgroud)
因此,通过使用with
,我们可以获得等待所有 future 完成的能力。
地图和提交之间的区别
Executor.map将并行运行作业并等待 future 完成、收集结果并返回生成器。它已经为您完成了等待。如果设置超时,它将等到超时并在生成器中抛出异常。
映射(func,*可迭代,超时=无,块大小= 1)
- 可迭代对象是立即收集的,而不是延迟收集的;
- func 是异步执行的,并且可以同时对 func 进行多次调用。
要获取期货列表并手动等待,您可以使用:
myfuturelist = [pool.submit(_exec, x) for x in range(5)]
Run Code Online (Sandbox Code Playgroud)
Executor.submit将返回一个future对象,调用result
future 将显式等待它完成:
myfuturelist[0].result() # wait the 1st future to finish and return the result
Run Code Online (Sandbox Code Playgroud)
编辑 2023-02-24
尽管原始答案已被接受,但请检查 mway 和 milkice 的。我将尝试在这里添加一些细节。
wait是更好的方法,它可以让你通过参数return_when控制如何等待未来:
它返回已完成的 future 和未完成的 future 的元组:
# wait first one to finish
finished_set, unfinished_set = wait(myfuturelist, return_when=FIRST_COMPLETED)
# wait all
wait(myfuturelist, return_when=ALL_COMPLETED)
Run Code Online (Sandbox Code Playgroud)
使用with
很优雅,但请注意:
归档时间: |
|
查看次数: |
11287 次 |
最近记录: |