Dask 图执行和内存使用

Ada*_*ein 4 python dask dask-delayed

我正在 dask 中构建一个非常大的 DAG 以提交给分布式调度程序,其中节点对本身可能非常大的数据帧进行操作。一种模式是我有大约 50-60 个函数来加载数据并构建每个数百 MB 的 Pandas 数据帧(并且在逻辑上代表单个表的分区)。我想将这些连接到图中下游节点的单个 dask 数据帧中,同时最小化数据移动。我将任务链接如下:

dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs]
dfs = [dask.delayed(pandas_to_dask)(df) for df in dfs]
return dask.delayed(concat_all)(dfs)
Run Code Online (Sandbox Code Playgroud)

在哪里

def pandas_to_dask(df):
    return dask.dataframe.from_pandas(df).to_delayed()
Run Code Online (Sandbox Code Playgroud)

我尝试了各种concat_all实现,但这似乎是合理的:

def concat_all(dfs):
    dfs = [dask.dataframe.from_delayed(df) for df in dfs]
    return dask.dataframe.multi.concat(dfs, axis='index', join='inner')
Run Code Online (Sandbox Code Playgroud)

所有的熊猫数据帧在它们的索引上都是不相交的,并且是排序的/单调的。

但是,concat_all即使每个人的内存预算实际上相当大并且我不希望它移动数据,我也会因为这个功能而被杀死(集群管理器因为超过他们的内存预算而杀死他们)。我有理由肯定,compute()在使用 dask 数据帧的图形节点中调用之前,我总是将数据切片为合理的子集。

--memory-limit到目前为止,我正在玩没有成功。我至少正确地解决了这个问题吗?是否有我遗漏的考虑?

MRo*_*lin 5

给定计算到 Pandas 数据帧的延迟值列表

>>> dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs]
>>> type(dfs[0].compute())  # just checking that this is true
pandas.DataFrame
Run Code Online (Sandbox Code Playgroud)

将它们传递给dask.dataframe.from_delayed函数

>>> ddf = dd.from_delayed(dfs)
Run Code Online (Sandbox Code Playgroud)

默认情况下,这将运行第一次计算以确定元数据(列名、dtypes 等对 dask.dataframe 很重要)。您可以通过构建示例数据帧并将其传递给meta=关键字来避免这种情况。

>>> meta = pd.DataFrame({'value': [1.0], 'name': ['foo'], 'id': [0]})
>>> ddf = dd.from_delayed(dfs, meta=meta)
Run Code Online (Sandbox Code Playgroud)

这个示例笔记本也可能有帮助。

通常,您永远不需要从其他 dask 函数中调用 dask 函数(就像您通过延迟from_pandas调用所做的那样)。Dask.dataframe 函数本身已经是惰性的,不需要进一步延迟。