dask 和 dask.distributed 之间的巨大内存使用差异

Nic*_* W. 6 python dask dask-delayed dask-distributed

我试图用来dask.delayed计算一个大矩阵以供以后计算使用。我只在一台本地机器上运行代码。当我使用dask单机调度程序时,它工作正常,但有点慢。要访问更多选项和性能监视器以改进我想dask.distributed在单台机器上使用的代码。然而,在dask.distributed客户端上运行相同的代码会慢慢耗尽所有可用内存并崩溃而没有任何实现。

是否有不同的设置问题的方法可以让dask.distributed客户端以更好的内存效率完成?

  • 我通读了 dask.delayed最佳实践指南,并认为我们正在正确使用它。
  • 我已经在本地 Win 10 PC (64GB RAM) 和 Azure Win Server 2012 VM (256 GB) 上运行它,结果相同。
  • 我试过手动设置块。
  • 我尝试使用stack.rechunk优化块大小,包括按行和列自动分块(行块在dask调度程序中似乎运行得更快)。
  • 我试过使用compute()persist()(相同的结果)。
  • 我尝试dask.distributed使用线程和进程调度程序启动客户端并调整工作人员的数量。threads在死亡之前更快地使用更多的 RAM。
  • 我已尝试根据此答案设置dask.distributed内存限制,但忽略了内存限制。cluster = distributed.LocalCluster(memory_limit = 8e9)
  • 如果我减少问题的大小(nXnY以下),dask.distributed客户端确实完成了任务,但是它仍然比dask调度程序需要更多的时间和内存。

这个例子重现了这个问题:

import dask
import distributed
import numpy as np
import dask.array as da

def calcRow(X,Y):
    Tx = np.transpose(X * (X + Y)) # Simplified work
    return (Tx)

# Specify size of (nY x nX) matrix
nX = 1000000 #  Distributed fails with nX >= 1000000 and nY >= 5000
nY = 5000

# Fill with random data
x = np.random.rand(nX,1)
y = np.random.rand(nY,1)

# Setup dask.distributed client.
# Comment out these two lines to use the standard dask scheduler,
# which does work 
client = distributed.Client()
client

# Build the matrix
row = dask.delayed(calcRow, pure=True)   # Build 1 row
makeRows = [row(x, y[ii]) for ii in range(nY)] # Loop for all nY rows
buildMat = [da.from_delayed(makeRow, dtype=float, shape=(10,nX))
            for makeRow in makeRows] # Build matrix
stack = da.vstack(buildMat)
my_matrix = stack.compute() # Calculate the matrix entries
Run Code Online (Sandbox Code Playgroud)

实际上,我的问题要大得多,而且calcRow计算本身就是一个庞大、缓慢、复杂的计算,但形状和矩阵构建步骤是相同的​​。

我了解最佳做法是scatter在调用之前将数据放入内存中compute,但我没有散布函数,只有一个delayed数组。

如果我注释掉 2 个dask.distributed客户端行,则上面的示例使用最多 0.25 GB 的 RAM 在 60 秒内运行。但是有了这些行,代码会在 3-4 分钟内攀升到完全内存使用 (64GB),并继续运行,直到系统变得不稳定。

如果我在其中构建矩阵,则dask可以启动一个dask.distributed客户端,并在以后的dask.distributed计算中毫无问题地使用该矩阵。只是构建导致问题的矩阵。

我几乎觉得这是一个错误,但不能确定我的代码不是罪魁祸首。我真的很重视可能使代码运行或证明错误的建议。

编辑 1: 我还尝试将装饰器应用于calcRow

@dask.delayed
def calcRow(X,Y):
Run Code Online (Sandbox Code Playgroud)

并使用:

makeRows = [calcRow(x, y[ii]) for ii in range(nY)]
Run Code Online (Sandbox Code Playgroud)

但这似乎是相同的?

编辑 2: 如果我开始distributed.client使用processes=False它会更快地消耗所有系统内存,但实际上会提供以下警告,这可能是诊断性的:

分布式工人 - 警告 - 内存使用量很高,但工人没有数据存储到磁盘。也许其他一些进程正在泄漏内存?进程内存:40.27 GB -- 工作内存限制:8.00 GB