ely*_*ely 5 python asynchronous timeout python-3.x concurrent.futures
有关超时的文档非常concurrent.futures难以理解。在一个简单的情况下,我想在循环中使用ProcessPoolExecutorby 调用.submit来扫描工作职能列表。我希望每个Future对象都有 10 分钟的关联超时,但否则它们会异步完成。
我的第一种方法是尝试使用该as_completed函数,该函数生成 Future 对象的迭代器,并且仅在完成时才生成下一个对象。as_completed接受一个timeout参数,但文档说这个超时是相对于as_completed调用的第一时刻,而不一定是任何Future对象本身的生命周期。
例如,假设ProcessPoolExecutor只有 3 个工作进程,但对象列表Future包含 10 个项目。其中 7 个项目可能会处于未处理状态长达 10 分钟,而前 3 个项目将被处理。此后不久,即使每个人自己都达到了 10 分钟的限制as_completed,超时也会被触发,从而导致失败。Future
请注意,适用的相同限制as_completed也适用于此用例wait,并且wait由于它支持的返回选项有限,因此更难使用。
我的下一个想法是使用允许并调用我的期货列表中的每个(未来)的timeout参数。future.result然而,实际上没有一种方法可以在不以阻塞方式实际要求结果的情况下设置此超时。如果您迭代 future 列表并调用,则此调用会阻塞指定的超时时间。f.result(timeout=600)ff.result(...)
另一方面,你也不能f.result以as_completed一种天真的但看似正确的方式结合任何一个,比如
[f.result(timeout=600) for f in as_completed(futures_list)]
Run Code Online (Sandbox Code Playgroud)
...因为 的迭代as_completed在 future 完成时欺骗性地异步等待,并且仅在它们已经完成后将.result它们返回到已调用的状态。
鉴于此,生成一个列表的正确模式是什么,Future其中每个列表都有自己的单独超时,然后异步等待它们完成?
看来没有办法在这种异步上下文中提供 per-Future 超时。可用的 API 函数wait通过as_completed支持可迭代对象中所有任务的全局超时来采取更简单的方法Future,并且不会尝试测量从Future第一个任务开始主动处于正在处理状态的时间。
我选择了一种解决方法,将任务列表分成一组块并用于as_completed每个块。ProcessPoolExecutor块大小设置为与 my配置使用的工作人员数量相同,这样我就可以在某种程度上确定 的“全局”超时as_completed正在秘密地充当未来超时,因为所有任务都处于活动状态立即处理。缺点是利用率较低,因为当任务提前完成时,进程池无法自由地获取下一个 Future 任务;它必须等待整个下一批任务。对我来说这还可以,但这是concurrent.futures我必须选择的一个重大可用性缺陷。
这是一些示例代码。假设my_task_list已经包含带有通过functools.partial或其他方式绑定的部分或全部必需参数的函数。您可以修改它,以便在元组或字典的单独可迭代中提供参数并submit根据需要传递。
my_task_list = #... define your list of task functions
num_workers = #... set number of workers
my_timeout = #... define your timeout
with ProcessPoolExecutor(max_workers=num_workers) as pool:
all_results = []
for chunk_start in range(0, len(my_task_list), num_workers):
chunk = my_task_list[chunk_start:chunk_start + num_workers]
# could extract parameters to pass for this task chunk here.
futures = [pool.submit(task) for task in chunk]
all_results += [
f.result() for f in as_completed(futures, timeout=my_timeout)
]
return all_results
Run Code Online (Sandbox Code Playgroud)
请注意,如果您选择的num_workers数量高于 可用的处理器数量ProcessPoolExecutor,则最终任务数将多于给定块中的处理器数量,并返回到超时as_completed不会正确应用于每个任务的运行时的情况,可能会导致与仅使用as_completed或wait在整个任务列表上而不进行分块相同的超时错误。