我正在使用 Dask 运行任务池,按照方法完成的顺序检索结果as_completed,并可能在每次返回时向池中提交新任务:
# Initial set of jobs
futures = [client.submit(job.run_simulation) for job in jobs]
pool = as_completed(futures, with_results=True)
while True:
# Wait for a job to finish
f, result = next(pool)
# Exit condition
if result == 'STOP':
break
# Do processing and maybe submit more jobs
more_jobs = process_result(f, result)
more_futures = [client.submit(job.run_simulation) for job in more_jobs]
pool.update(more_futures)
Run Code Online (Sandbox Code Playgroud)
这是我的问题:我提交的函数job.run_simulation有时会挂起很长时间,并且我想使该函数超时 - 如果运行时间超过一定的时间限制,则终止任务并继续。
理想情况下,我想做类似的事情client.submit(job.run_simulation, timeout=10),并且如果任务运行时间超过超时时间,则next(pool)返回。None
Dask 有什么办法可以帮助我暂停这样的工作吗?
到目前为止我尝试过的
我的第一反应是在函数本身内独立于 Dask 处理超时job.run_simulation。我已经看到了两种针对通用 Python 超时的建议(例如,此处)。
1)使用两个线程,一个用于函数本身,一个用于计时器。我的印象是这实际上不起作用,因为你无法杀死线程。即使计时器耗尽,两个线程也必须在任务完成之前完成。
2) 使用两个独立的进程(与multiprocessing模块一起),一个用于函数,一个用于定时器。这可行,但由于我已经处于 Dask 生成的守护程序子进程中,因此不允许我创建新的子进程。
第三种可能性是将代码块移动到我运行的单独脚本中subprocess.run并使用subprocess.run内置超时。我可以做到这一点,但这感觉像是最坏情况的后备方案,因为它需要在子进程之间传递大量繁琐的数据。
所以感觉我必须要完成Dask级别的超时。我的一个想法是在将任务提交给 Dask 的同时创建一个计时器作为子进程。然后,如果计时器耗尽,则用于Client.cancel()停止任务。这个计划的问题是,Dask 可能会等待工作线程释放后再开始任务,而我不希望计时器在任务实际运行之前运行。
您对问题的评估对我来说似乎是正确的,您所经历的解决方案与我会考虑的相同。一些注意事项:
Client.cancel如果函数已经启动,则无法停止该函数的运行。这些函数在线程池中运行,因此您会遇到“无法停止线程”的限制。Dask 工作线程只是 Python 进程,具有相同的能力和限制。您说您不能使用守护进程中的进程。解决此问题的一种方法是通过以下方式之一更改使用流程的方式:
如果您在单台机器上使用 dask.distributed 那么就不要使用进程
client = Client(processes=False)
Run Code Online (Sandbox Code Playgroud)multiprocessing-context配置设置为"spawn"而不是 fork 或 forkserver解决这个问题的最干净的方法是在你的函数内部解决它job.run_simulation。理想情况下,您将能够将此超时逻辑下推到该代码并使其干净利落。