我正在使用 Dask 运行任务池,按照方法完成的顺序检索结果as_completed,并可能在每次返回时向池中提交新任务:
# Initial set of jobs
futures = [client.submit(job.run_simulation) for job in jobs]
pool = as_completed(futures, with_results=True)
while True:
# Wait for a job to finish
f, result = next(pool)
# Exit condition
if result == 'STOP':
break
# Do processing and maybe submit more jobs
more_jobs = process_result(f, result)
more_futures = [client.submit(job.run_simulation) for job in more_jobs]
pool.update(more_futures)
Run Code Online (Sandbox Code Playgroud)
这是我的问题:我提交的函数job.run_simulation有时会挂起很长时间,并且我想使该函数超时 - 如果运行时间超过一定的时间限制,则终止任务并继续。
理想情况下,我想做类似的事情client.submit(job.run_simulation, timeout=10),并且如果任务运行时间超过超时时间,则next(pool)返回。None
Dask 有什么办法可以帮助我暂停这样的工作吗?
到目前为止我尝试过的
我的第一反应是在函数本身内独立于 Dask 处理超时 …