dask 可以使用无限的流输入吗

sam*_*ami 5 dask

我知道 dask 在这样的批处理模式下工作得很好

def load(filename):
    ...

def clean(data):
    ...

def analyze(sequence_of_data):
    ...

def store(result):
    with open(..., 'w') as f:
        f.write(result)

dsk = {'load-1': (load, 'myfile.a.data'),
       'load-2': (load, 'myfile.b.data'),
       'load-3': (load, 'myfile.c.data'),
       'clean-1': (clean, 'load-1'),
       'clean-2': (clean, 'load-2'),
       'clean-3': (clean, 'load-3'),
       'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
       'store': (store, 'analyze')}

from dask.multiprocessing import get
get(dsk, 'store')  # executes in parallel
Run Code Online (Sandbox Code Playgroud)
  1. 我们可以使用 dask 来处理块数量未知甚至无穷无尽的流通道吗?
  2. 它可以以增量方式执行计算吗?例如,上面的“分析”步骤可以处理正在进行的块吗?
  3. 我们是否必须在所有数据块都已知之后才调用“get”操作,我们可以在调用“get”后添加新块吗

MRo*_*lin 4

编辑:请参阅下面的新答案

dask 中当前的任务调度程序需要单个计算图。它不支持动态添加或删除该图表。调度程序旨在用少量内存评估大图;提前了解整个图表对此至关重要。

然而,这并不能阻止人们创建具有不同属性的其他调度程序。conncurrent.futures这里的一个简单的解决方案就是在单台机器或多台机器上使用模块distributed

其实,是

分布式调度程序现在完全异步运行,您可以在计算期间提交任务、等待其中一些任务、提交更多任务、取消任务、添加/删除工作人员等。有多种方法可以做到这一点,但最简单的可能是concurrent.futures这里简要描述的新界面:

http://dask.pydata.org/en/latest/futures.html