如何使用Dask.array有效地将大型numpy数组发送到集群

MRo*_*lin 3 numpy dask

我的本地计算机上有一个很大的NumPy数组,我想与集群上的Dask.array并行化

import numpy as np
x = np.random.random((1000, 1000, 1000))
Run Code Online (Sandbox Code Playgroud)

但是,当我使用dask.array时,我发现调度程序开始占用大量RAM。为什么是这样?这些数据不应该交给工人吗?

import dask.array as da
x = da.from_array(x, chunks=(100, 100, 100))

from dask.distributed import Client
client = Client(...)
x = x.persist()
Run Code Online (Sandbox Code Playgroud)

MRo*_*lin 5

每当您persistcomputeDask集合将数据发送到调度程序,再从那里发送到工作程序。如果你想绕过存储在调度数据,那么你将不得不学习如何移动数据分散

您有三种选择:

  1. 不要在客户端计算机上加载数据
  2. 散布然后大块
  3. 大块散点图

不要在客户端计算机上加载数据

最好的方法是将加载数据作为计算的一部分,而不是在本地进行。

之前
x = load_array_from_file(fn)  # load into a local numpy array
x = da.from_array(x, chunks=(100, 100, 100))  # split with dask
x = x.persist()
Run Code Online (Sandbox Code Playgroud)
x = dask.delayed(load_array_from_file)(fn)
x = da.from_delayed(x, shape=(1000, 1000, 1000), dtype=float)
x = x.rechunk((100, 100, 100))
x = x.persist()
Run Code Online (Sandbox Code Playgroud)

有关创建dask数组的更多信息,请参见:http ://dask.pydata.org/en/latest/array-creation.html

然后散布

您可以将numpy数组直接散布到工人

future = client.scatter(x)
x = da.from_delayed(future, shape=x.shape, dtype=x.dtype)
x = x.rechunk((100, 100, 100))
x = x.persist()
Run Code Online (Sandbox Code Playgroud)

这会将您的数据直接移至工作人员,然后从那里分块。很好,因为它绕过了调度程序。但是,如果您的工作程序开始失败,您现在就有丢失数据的风险。仅在大型并行系统中才重要。

这也有些效率低下,因为您的所有数据都放在一个工作线程中,而不是散布出去。您可以致电client.rebalance或继续阅读。

块然后散布

您可以使用本地调度程序在本地对数据进行分块,然后分散到群集中。

x = da.from_array(x, chunks=(100, 100, 100))
x = x.persist(get=dask.threaded.get)  # chunk locally
futures = client.scatter(dict(x.dask))  # scatter chunks
x.dask = x  # re-attach scattered futures as task graph
Run Code Online (Sandbox Code Playgroud)

留在本地

或者,您可以继续在本地使用dask,既可以使用线程调度程序,也可以只使用本地进程使用分布式调度程序。

client = Client(processes=False)
Run Code Online (Sandbox Code Playgroud)

这将停止在本地进程,调度程序和工作进程之间不必要的数据复制。它们现在都在您的本地流程中。

另请参阅:如何在Dask分布式环境中有效地提交带有大参数的任务?对于此答案的基于任务的版本