我正在使用 dask/distributed 向多节点集群提交 100 多个函数的评估。每个 eval 都非常昂贵,大约需要 90 秒的 CPU 时间。我注意到虽然我正在评估的功能并不纯,但似乎存在内存泄漏并且所有工作人员随着时间的推移都在增长。以下是重现此行为的示例代码:
import numpy as np
from dask.distributed import Client
class Foo:
def __init__(self):
self.a = np.random.rand(2000, 2000) # dummy data, not really used
@staticmethod
def myfun1(k):
return np.random.rand(10000 + k, 100)
def myfun2(self, k):
return np.random.rand(10000 + k, 100)
client = Client('XXX-YYY:8786')
f = Foo()
tasks = client.map(f.myfun2, range(100), pure=False)
results = client.gather(tasks)
tasks = []
Run Code Online (Sandbox Code Playgroud)
如果调用 client.map() 来执行 f.myfun1()(这只是一个静态方法),worker 的大小不会增长。但是,如果调用 f.myfun2() ,则在上面仅调用一次 client.map() 之后,worker 的大小就会显着增加(例如 50mb -> 400mb)。client.close() 也不会减少工作人员的大小。 …