分布式 dask 中的内存泄漏和/或数据持久性

mar*_*oba 5 distributed multiprocessing dask

我正在使用 dask/distributed 向多节点集群提交 100 多个函数的评估。每个 eval 都非常昂贵,大约需要 90 秒的 CPU 时间。我注意到虽然我正在评估的功能并不纯,但似乎存在内存泄漏并且所有工作人员随着时间的推移都在增长。以下是重现此行为的示例代码:

import numpy as np
from dask.distributed import Client

class Foo:
    def __init__(self):
        self.a = np.random.rand(2000, 2000)  # dummy data, not really used

    @staticmethod
    def myfun1(k):
        return np.random.rand(10000 + k, 100)

    def myfun2(self, k):
        return np.random.rand(10000 + k, 100)

client = Client('XXX-YYY:8786')
f = Foo()
tasks = client.map(f.myfun2, range(100), pure=False)
results = client.gather(tasks)
tasks = []
Run Code Online (Sandbox Code Playgroud)

如果调用 client.map() 来执行 f.myfun1()(这只是一个静态方法),worker 的大小不会增长。但是,如果调用 f.myfun2() ,则在上面仅调用一次 client.map() 之后,worker 的大小就会显着增加(例如 50mb -> 400mb)。client.close() 也不会减少工作人员的大小。

这是内存泄漏还是我没有正确使用 dask.distributed?我绝对不关心我的计算结果之后是否可用或在集群上共享。FWIW,使用分布式 v1.19.1 和 Python 3.5.4 进行测试

MRo*_*lin 1

很好的例子。

您的myfun2方法附加到您的f = Foo()对象,该对象带有一个相当大的属性(f.a)。因此,这种f.myfun2方法的移动实际上非常昂贵,而且您要创建 1000 个方法。如果可以的话,最好避免在分布式环境中使用大型对象的方法。相反,请考虑使用函数。