如何在不使用所有工作程序的情况下限制大量任务

Jef*_*eff 10 python dask

想象一下,我有一个拥有10名工人和40个核心总数的dask网格.这是一个共享网格,所以我不想用我的工作完全浸透它.我有1000个任务要做,我想一次提交(并且主动运行)最多20个任务.

具体来说,

from time import sleep
from random import random

def inc(x):
    from random import random
    sleep(random() * 2)
    return x + 1

def double(x):
    from random import random
    sleep(random())
    return 2 * x

>>> from distributed import Executor
>>> e = Executor('127.0.0.1:8786')
>>> e
<Executor: scheduler=127.0.0.1:8786 workers=10 threads=40>
Run Code Online (Sandbox Code Playgroud)

如果我设置了一个队列系统

>>> from queue import Queue
>>> input_q = Queue()
>>> remote_q = e.scatter(input_q)
>>> inc_q = e.map(inc, remote_q)
>>> double_q = e.map(double, inc_q)
Run Code Online (Sandbox Code Playgroud)

这将工作,但是,这将把我的所有任务转储到网格,使其饱和.理想情况下我可以:

e.scatter(input_q, max_submit=20)
Run Code Online (Sandbox Code Playgroud)

看来这里的文档示例允许我使用maxsize队列.但从用户的角度来看,我仍然需要处理背压.理想情况下 dask会自动处理此事.

MRo*_*lin 7

使用 maxsize=

你很近.所有的scatter,gather以及map采取相同的maxsize=关键字参数是Queue需要.因此,简单的工作流程可能如下:

from time import sleep

def inc(x):
    sleep(1)
    return x + 1

your_input_data = list(range(1000))

from queue import Queue              # Put your data into a queue
q = Queue()
for i in your_input_data:
    q.put(i)

from dask.distributed import Executor
e = Executor('127.0.0.1:8786')        # Connect to cluster


futures = e.map(inc, q, maxsize=20)  # Map inc over data
results = e.gather(futures)          # Gather results

L = []
while not q.empty() or not futures.empty() or not results.empty():
    L.append(results.get())  # this blocks waiting for all results
Run Code Online (Sandbox Code Playgroud)

所有的q,futures以及results是Python的队列对象.该qresults队列没有限制,所以他们会贪婪地拉多,因为他们可以.但是futures队列的最大大小为20,所以在任何给定时间它只允许20个期货在飞行中.一旦领先的未来完成,它将立即被收集功能使用,其结果将被放入results队列中.这释放了空间futures并导致提交另一个任务.

请注意,这并不是您想要的.这些队列是按顺序排列的,因此只有当它们位于队列前面时才会弹出期货.如果所有的飞行中的期货已经完成,除了第一个它们仍将留在队列中,占用空间.鉴于此约束,您可能希望选择maxsize=比您想要的20项目略多的项目.

扩展这一点

这里我们做一个简单的map->gather管道,中间没有逻辑.你也可以map在这里放置其他计算,甚至将期货拉出队列,并自己定制工作.很容易打破上面提供的模具.