标签: dask-distributed

将numpy解决方案转换为dask(numpy索引在dask中不起作用)

我正在尝试将我的蒙特卡罗模拟转换numpydask,因为有时数组太大,无法适应内存.因此,我在云中设置了一组计算机:我的dask集群由24个内核和94 GB内存组成.我为这个问题准备了我的代码的简化版本.

我的原始numpy代码如下所示:

def numpy_way(sim_count, sim_days, hist_days):
   historical_data = np.random.normal(111.51, 10, hist_days)
   historical_multidim = np.empty(shape=(1, 1, sim_count, hist_days))
   historical_multidim[:, :, :, :] = historical_data


   random_days_panel = np.random.randint(low=1,
                                      high=hist_days,
                                      size=(1, 1, sim_count, sim_days))
   future_panel = historical_multidim[np.arange(1)[:, np.newaxis, np.newaxis, np.newaxis],
                                      np.arange(1)[:, np.newaxis, np.newaxis],
                                      np.arange(sim_count)[:, np.newaxis],
                                      random_days_panel]
   return future_panel.shape
Run Code Online (Sandbox Code Playgroud)

注意:我只是在这里返回numpy数组的形状(但是因为它是numpy,所以future_panel的元素在内存中是有意义的.

关于功能的一些话:

  • 我正在创建一个随机数组historical_data- 这只是1D
  • 然后将该数组"广播"为4D数组(historical_multidim).这里不使用前两个维度(但它们在我的最终应用程序中)
    • 第三维表示完成了多少次模拟
    • 第四维度是forecasted未来的天数
  • random_days_panel- 只是一个ndarray随机选择的日子.所以shape这个数组的最后一个是:1,1,sim_count,sim_days(在上一点解释)
  • future_panelndarray随机选取的值historical_multidim.即从具有预期形状的历史数据生成的数组(1,1,sim_count,sim_days)

现在,问题是,其中一些步骤没有在dask中实现:

  • historical_multidim[:, …

python numpy dask dask-distributed

16
推荐指数
1
解决办法
677
查看次数

在当前进程完成其引导阶段之前尝试启动一个新进程

我是 dask 的新手,我发现拥有一个可以轻松实现并行化的模块真是太好了。我正在做一个项目,我可以在一台机器上并行化一个循环,正如你在这里看到的。但是,我想转移到dask.distributed. 我对上面的类应用了以下更改:

diff --git a/mlchem/fingerprints/gaussian.py b/mlchem/fingerprints/gaussian.py
index ce6a72b..89f8638 100644
--- a/mlchem/fingerprints/gaussian.py
+++ b/mlchem/fingerprints/gaussian.py
@@ -6,7 +6,7 @@ from sklearn.externals import joblib
 from .cutoff import Cosine
 from collections import OrderedDict
 import dask
-import dask.multiprocessing
+from dask.distributed import Client
 import time


@@ -141,13 +141,14 @@ class Gaussian(object):
         for image in images.items():
             computations.append(self.fingerprints_per_image(image))

+        client = Client()
         if self.scaler is None:
-            feature_space = dask.compute(*computations, scheduler='processes',
+            feature_space = dask.compute(*computations, scheduler='distributed',
                                          num_workers=self.cores)
             feature_space = OrderedDict(feature_space)
         else:
             stacked_features …
Run Code Online (Sandbox Code Playgroud)

python dask dask-distributed

16
推荐指数
2
解决办法
1万
查看次数

设置dask worker数量的最佳实践

在集群上设置worker时,我对dask和dask.distributed中使用的不同术语感到有些困惑.

我遇到的术语是:线程,进程,处理器,节点,工作者,调度程序.

我的问题是如何设置每个的数量,以及其中任何一个之间是否存在严格或建议的关系.例如:

  • 每个节点1个工作器,n个进程用于节点上的n个核心
  • 线程和进程是同一个概念?在dask-mpi中,我必须设置nthreads,但它们在客户端中显示为进程

还有其他建议吗?

dask dask-distributed

14
推荐指数
1
解决办法
1279
查看次数

distribution.worker 内存使用率很高,但worker没有数据可存储到磁盘

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 3.91 GB -- Worker memory limit: 2.00 GB
distributed.worker - WARNING - Worker is at 41% memory usage. Resuming worker. Process memory: 825.12 MB -- Worker memory limit: 2.00 GB
Run Code Online (Sandbox Code Playgroud)

当我尝试运行一段将算法应用于我拥有的数据集的代码时,会出现上述错误。阅读了https://distributed.dask.org/en/latest/worker.html上的文档后 ,我仍然不清楚此错误对该应用程序的结果有何影响。这是否只会影响该代码的速度或效率,还是会影响我的结果?

dask dask-distributed

9
推荐指数
1
解决办法
7225
查看次数

我们如何在dask分布式中为每个工作人员选择--nthreads和--nprocs?

我们如何在Dask分布式中为每个工作人员选择--nthreads和--nprocs?我有3个工作线程,每个工作线程有2个线程,每个内核有4个内核,每个内核有1个线程(根据每个工作线程上'lscpu'Linux命令的输出)

distributed-computing dask dask-distributed

8
推荐指数
1
解决办法
1384
查看次数

如何最好地将 NetCDF 文件集合重新分块到 Zarr 数据集

我正在尝试重新整理 NetCDF 文件集合并在 AWS S3 上创建 Zarr 数据集。我有 168 个原始 NetCDF4 经典文件,其维度数组time: 1, y: 3840, x: 4608分块为chunks={'time':1, 'y':768, 'x':922}.

我想将此输出写入 Zarr,并且我想针对时间序列提取进行优化,因此在我的块中包含更多时间记录。我想我会使用 xarray 来帮助完成工作,因为我有很多处理器可以利用 Dask,而 xarrayxr.open_mfdatasetds.to_zarr.

我第一次尝试rechunking来chunks={'time':24, 'y':768, 'x':922}匹配输入NetCDF4在分块xy,但是当我试图写Zarr它抱怨,因为它需要在两个均匀的块大小xy,只允许非均匀大小沿的最后一块time尺寸(不幸在x维度中,总大小 4608 不是块大小 922 的倍数。

然后我尝试chunks={'time':168, 'y':384, 'x':288}并开始工作,并且非常快速地进行了几分钟,然后变得越来越慢。最终在 50 分钟后,集群死亡:

4072 distributed.core - INFO - Event loop was unresponsive in Worker for 1.41s.  This is often caused by long-running …
Run Code Online (Sandbox Code Playgroud)

python python-xarray netcdf4 dask-distributed zarr

8
推荐指数
1
解决办法
1198
查看次数

在 Dask 中排序

我想在 dask 中找到pandas.dataframe.sort_value函数的替代方法。
我是通过set_index 来的,但它会按单列排序。

如何对 Dask 数据框的多列进行排序?

sorting dask dask-delayed dask-distributed

8
推荐指数
1
解决办法
4462
查看次数

如何使用Dask使用所有cpu核心?

我有一个超过35000行的熊猫系列。我想使用dask使其更有效率。但是,我的dask代码和pandas代码同时使用。最初,“ ser”是熊猫系列,而fun1fun2是在系列的各个行中执行模式匹配的基本功能。
大熊猫
ser = ser.apply(fun1).apply(fun2)

达斯克
ser = dd.from_pandas(ser, npartitions = 16) ser = ser.apply(fun1).apply(fun2)

在检查cpu核心的状态时,我发现并不是所有的核心都被使用了。只有一个内核已经习惯了100%。

有没有什么方法可以使用dask加快序列代码的速度,或者在串行执行dask操作时利用cpu的所有核心?

dask dask-delayed dask-distributed

8
推荐指数
1
解决办法
4600
查看次数

什么是关闭Dask LocalCluster的"正确"方法?

我正在尝试使用LocalCluster在我的笔记本电脑上使用dask-distributed,但我仍然没有找到一种方法让我的应用程序关闭而不会引发一些警告或触发matplotlib的一些奇怪的迭代(我正在使用tkAgg后端).

例如,如果我按此顺序关闭客户端和群集,则tk无法以适当的方式从内存中删除图像,我收到以下错误:

Traceback (most recent call last):
  File "/opt/Python-3.6.0/lib/python3.6/tkinter/__init__.py", line 3501, in __del__
    self.tk.call('image', 'delete', self.name)
RuntimeError: main thread is not in main loop
Run Code Online (Sandbox Code Playgroud)

例如,以下代码生成此错误:

from time import sleep
import numpy as np
import matplotlib.pyplot as plt
from dask.distributed import Client, LocalCluster

if __name__ == '__main__':
    cluster = LocalCluster(
        n_workers=2,
        processes=True,
        threads_per_worker=1
    )
    client = Client(cluster)

    x = np.linspace(0, 1, 100)
    y = x * x
    plt.plot(x, y)

    print('Computation complete! Stopping workers...')
    client.close()
    sleep(1)
    cluster.close()

    print('Execution complete!')
Run Code Online (Sandbox Code Playgroud)

sleep(1)行使问题更容易出现,因为它不会在每次执行时出现. …

python dask dask-distributed

8
推荐指数
2
解决办法
605
查看次数

任务之间保留的 dask-worker 内存

介绍

我正在使用dask.distributed(令人尴尬的并行任务)并行化一些代码。

  • 我有一个指向不同图像的路径列表,这些图像散布给工作人员。
  • 每个工作人员加载和过滤图像(3D 堆栈)并运行一些过滤。使用 scipy 进行 3D 过滤可节省中间体输出。
  • 每个过滤后的图像在磁盘上保存为 npy 和/或 png。
  • 我在集群上运行之前在本地进行测试,我的设置是:

.

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2, threads_per_worker=1,memory_limit =8e9)
client = Client(cluster)
Run Code Online (Sandbox Code Playgroud)

问题:

  • 当我只处理两个图像(1 个图像/工人)时,一切都很好
  • 当我为每个工作人员散布多个图像时,我会收到此警告,其中进程内存值增加。

.

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.
Perhaps some other process is leaking memory?  Process memory: 6.21 GB -- Worker memory limit: 8.00 GB
Run Code Online (Sandbox Code Playgroud)

暗示工作人员使用的部分 RAM 不在freed不同文件之间(我猜是剩余的过滤中间体......)

有没有办法在开始处理下一个图像之前释放工人的内存?我必须garbage collector在运行任务之间运行一个循环吗?

编辑 …

python parallel-processing dask dask-distributed

8
推荐指数
1
解决办法
659
查看次数