如何使提交给 Dask 的作业超时?

emi*_*a17 5 python dask

我正在使用 Dask 运行任务池,按照方法完成的顺序检索结果as_completed,并可能在每次返回时向池中提交新任务:

# Initial set of jobs
futures = [client.submit(job.run_simulation) for job in jobs]
pool = as_completed(futures, with_results=True)

while True:
    # Wait for a job to finish
    f, result = next(pool)

    # Exit condition
    if result == 'STOP':
        break

    # Do processing and maybe submit more jobs
    more_jobs = process_result(f, result)
    more_futures = [client.submit(job.run_simulation) for job in more_jobs]
    pool.update(more_futures)
Run Code Online (Sandbox Code Playgroud)

这是我的问题:我提交的函数job.run_simulation有时会挂起很长时间,并且我想使该函数超时 - 如果运行时间超过一定的时间限制,则终止任务并继续。

理想情况下,我想做类似的事情client.submit(job.run_simulation, timeout=10),并且如果任务运行时间超过超时时间,则next(pool)返回。None

Dask 有什么办法可以帮助我暂停这样的工作吗?

到目前为止我尝试过的

我的第一反应是在函数本身内独立于 Dask 处理超时job.run_simulation。我已经看到了两种针对通用 Python 超时的建议(例如,此处)。

1)使用两个线程,一个用于函数本身,一个用于计时器。我的印象是这实际上不起作用,因为你无法杀死线程。即使计时器耗尽,两个线程也必须在任务完成之前完成。

2) 使用两个独立的进程(与multiprocessing模块一起),一个用于函数,一个用于定时器。这可行,但由于我已经处于 Dask 生成的守护程序子进程中,因此不允许我创建新的子进程。

第三种可能性是将代码块移动到我运行的单独脚本中subprocess.run并使用subprocess.run内置超时。我可以做到这一点,但这感觉像是最坏情况的后备方案,因为它需要在子进程之间传递大量繁琐的数据。

所以感觉我必须要完成Dask级别的超时。我的一个想法是在将任务提交给 Dask 的同时创建一个计时器作为子进程。然后,如果计时器耗尽,则用于Client.cancel()停止任务。这个计划的问题是,Dask 可能会等待工作线程释放后再开始任务,而我不希望计时器在任务实际运行之前运行。

MRo*_*lin 0

您对问题的评估对我来说似乎是正确的,您所经历的解决方案与我会考虑的相同。一些注意事项:

  1. Client.cancel如果函数已经启动,则无法停止该函数的运行。这些函数在线程池中运行,因此您会遇到“无法停止线程”的限制。Dask 工作线程只是 Python 进程,具有相同的能力和限制。
  2. 您说您不能使用守护进程中的进程。解决此问题的一种方法是通过以下方式之一更改使用流程的方式:

    • 如果您在单台机器上使用 dask.distributed 那么就不要使用进程

      client = Client(processes=False)
      
      Run Code Online (Sandbox Code Playgroud)
    • 不要使用 Dask 的默认保姆进程,那么你的 Dask Worker 将是一个能够使用多处理的普通进程
    • 将 dask 的multiprocessing-context配置设置为"spawn"而不是 fork 或 forkserver

解决这个问题的最干净的方法是在你的函数内部解决它job.run_simulation。理想情况下,您将能够将此超时逻辑下推到该代码并使其干净利落。