我的本地计算机上有一个很大的NumPy数组,我想与集群上的Dask.array并行化
import numpy as np
x = np.random.random((1000, 1000, 1000))
Run Code Online (Sandbox Code Playgroud)
但是,当我使用dask.array时,我发现调度程序开始占用大量RAM。为什么是这样?这些数据不应该交给工人吗?
import dask.array as da
x = da.from_array(x, chunks=(100, 100, 100))
from dask.distributed import Client
client = Client(...)
x = x.persist()
Run Code Online (Sandbox Code Playgroud)
每当您persist或computeDask集合将数据发送到调度程序,再从那里发送到工作程序。如果你想绕过存储在调度数据,那么你将不得不学习如何移动数据与分散。
您有三种选择:
最好的方法是将加载数据作为计算的一部分,而不是在本地进行。
之前x = load_array_from_file(fn) # load into a local numpy array
x = da.from_array(x, chunks=(100, 100, 100)) # split with dask
x = x.persist()
Run Code Online (Sandbox Code Playgroud)
后
x = dask.delayed(load_array_from_file)(fn)
x = da.from_delayed(x, shape=(1000, 1000, 1000), dtype=float)
x = x.rechunk((100, 100, 100))
x = x.persist()
Run Code Online (Sandbox Code Playgroud)
有关创建dask数组的更多信息,请参见:http ://dask.pydata.org/en/latest/array-creation.html
您可以将numpy数组直接散布到工人
future = client.scatter(x)
x = da.from_delayed(future, shape=x.shape, dtype=x.dtype)
x = x.rechunk((100, 100, 100))
x = x.persist()
Run Code Online (Sandbox Code Playgroud)
这会将您的数据直接移至工作人员,然后从那里分块。很好,因为它绕过了调度程序。但是,如果您的工作程序开始失败,您现在就有丢失数据的风险。仅在大型并行系统中才重要。
这也有些效率低下,因为您的所有数据都放在一个工作线程中,而不是散布出去。您可以致电client.rebalance或继续阅读。
您可以使用本地调度程序在本地对数据进行分块,然后分散到群集中。
x = da.from_array(x, chunks=(100, 100, 100))
x = x.persist(get=dask.threaded.get) # chunk locally
futures = client.scatter(dict(x.dask)) # scatter chunks
x.dask = x # re-attach scattered futures as task graph
Run Code Online (Sandbox Code Playgroud)
或者,您可以继续在本地使用dask,既可以使用线程调度程序,也可以只使用本地进程使用分布式调度程序。
client = Client(processes=False)
Run Code Online (Sandbox Code Playgroud)
这将停止在本地进程,调度程序和工作进程之间不必要的数据复制。它们现在都在您的本地流程中。
另请参阅:如何在Dask分布式环境中有效地提交带有大参数的任务?对于此答案的基于任务的版本