我想提交具有大(千兆字节)参数的Dask函数.做这个的最好方式是什么?我想用不同的(小)参数多次运行这个函数.
这使用concurrent.futures接口.我们可以轻松地使用dask.delayed接口.
x = np.random.random(size=100000000) # 800MB array
params = list(range(100)) # 100 small parameters
def f(x, param):
pass
from dask.distributed import Client
c = Client()
futures = [c.submit(f, x, param) for param in params]
Run Code Online (Sandbox Code Playgroud)
但这比我预期的要慢或导致内存错误.
MRo*_*lin 18
好的,所以这里错的是每个任务都包含numpy数组x,这个数组很大.对于我们提交的100个任务中的每个任务,我们需要序列化x,将其发送到调度程序,将其发送给工作人员等.
相反,我们将一次将阵列发送到集群:
[future] = c.scatter([x])
Run Code Online (Sandbox Code Playgroud)
现在future是一个指向生成x在集群上的数组的标记.现在我们可以提交引用这个远程未来的任务,而不是本地客户端上的numpy数组.
# futures = [c.submit(f, x, param) for param in params] # sends x each time
futures = [c.submit(f, future, param) for param in params] # refers to remote x already on cluster
Run Code Online (Sandbox Code Playgroud)
现在速度更快,让Dask更有效地控制数据移动.
如果您希望最终需要将数组x移动到所有工作者,那么您可能希望广播该数组以启动
[future] = c.scatter([x], broadcast=True)
Run Code Online (Sandbox Code Playgroud)
期货与dask.delayed一起工作正常.这里没有性能优势,但有些人更喜欢这个界面:
# futures = [c.submit(f, future, param) for param in params]
from dask import delayed
lazy_values = [delayed(f)(future, param) for param in params]
futures = c.compute(lazy_values)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3260 次 |
| 最近记录: |