在 Dask 中,如何根据全局(而不是工作线程)资源约束来限制任务的调度?

dbe*_*h24 2 python database performance constraints dask

我有一个使用 Dask 编写的大型数据提取作业,其中每个任务将从数十个数据库的大量表中查询一个表。对于每个数据库实例,我想限制一次连接的任务数量(即限制)。例如,我可能有 100 个任务连接到数据库 A,100 个任务连接到数据库 B,100 个任务连接到数据库 C,等等,并且我希望确保在任何给定时间连接到任何数据库的任务不超过 20 个。

我发现 Dask 提供了基于工作线程资源(CPU、MEM、GPU 等)的约束,但是数据库资源是“全局”的,因此对于任何 Dask 工作线程来说都不是特定的。Dask 是否提供任何方法来对任务并发的此类约束进行建模?

dbe*_*h24 5

在阅读了几个小时的文档后,我找到了自己问题的答案。Dask 提供分布式信号量,可以限制对数据库等资源的并发访问。有关更多信息,请参阅:

https://docs.dask.org/en/latest/futures.html#id1

例子

import time
from dask.distributed import Client, Semaphore

client = Client(...)

def do_task(x, sem):
    with sem:
        time.sleep(5)
        return x

# allow no more than 5 tasks to run concurrently
sem = Semaphore(max_leases=5, name="Limiter")

# submit jobs that use the semaphore
futures = client.map(do_task, range(20), sem=sem)

# collect results
results = client.gather(futures)
Run Code Online (Sandbox Code Playgroud)