我正在尝试将我的蒙特卡罗模拟转换numpy为dask,因为有时数组太大,无法适应内存.因此,我在云中设置了一组计算机:我的dask集群由24个内核和94 GB内存组成.我为这个问题准备了我的代码的简化版本.
我的原始numpy代码如下所示:
def numpy_way(sim_count, sim_days, hist_days):
historical_data = np.random.normal(111.51, 10, hist_days)
historical_multidim = np.empty(shape=(1, 1, sim_count, hist_days))
historical_multidim[:, :, :, :] = historical_data
random_days_panel = np.random.randint(low=1,
high=hist_days,
size=(1, 1, sim_count, sim_days))
future_panel = historical_multidim[np.arange(1)[:, np.newaxis, np.newaxis, np.newaxis],
np.arange(1)[:, np.newaxis, np.newaxis],
np.arange(sim_count)[:, np.newaxis],
random_days_panel]
return future_panel.shape
Run Code Online (Sandbox Code Playgroud)
注意:我只是在这里返回numpy数组的形状(但是因为它是numpy,所以future_panel的元素在内存中是有意义的.
关于功能的一些话:
historical_data- 这只是1Dhistorical_multidim).这里不使用前两个维度(但它们在我的最终应用程序中)
forecasted未来的天数random_days_panel- 只是一个ndarray随机选择的日子.所以shape这个数组的最后一个是:1,1,sim_count,sim_days(在上一点解释)future_panel是ndarray随机选取的值historical_multidim.即从具有预期形状的历史数据生成的数组(1,1,sim_count,sim_days)现在,问题是,其中一些步骤没有在dask中实现:
historical_multidim[:, …我是 dask 的新手,我发现拥有一个可以轻松实现并行化的模块真是太好了。我正在做一个项目,我可以在一台机器上并行化一个循环,正如你在这里看到的。但是,我想转移到dask.distributed. 我对上面的类应用了以下更改:
diff --git a/mlchem/fingerprints/gaussian.py b/mlchem/fingerprints/gaussian.py
index ce6a72b..89f8638 100644
--- a/mlchem/fingerprints/gaussian.py
+++ b/mlchem/fingerprints/gaussian.py
@@ -6,7 +6,7 @@ from sklearn.externals import joblib
from .cutoff import Cosine
from collections import OrderedDict
import dask
-import dask.multiprocessing
+from dask.distributed import Client
import time
@@ -141,13 +141,14 @@ class Gaussian(object):
for image in images.items():
computations.append(self.fingerprints_per_image(image))
+ client = Client()
if self.scaler is None:
- feature_space = dask.compute(*computations, scheduler='processes',
+ feature_space = dask.compute(*computations, scheduler='distributed',
num_workers=self.cores)
feature_space = OrderedDict(feature_space)
else:
stacked_features …Run Code Online (Sandbox Code Playgroud) 在集群上设置worker时,我对dask和dask.distributed中使用的不同术语感到有些困惑.
我遇到的术语是:线程,进程,处理器,节点,工作者,调度程序.
我的问题是如何设置每个的数量,以及其中任何一个之间是否存在严格或建议的关系.例如:
还有其他建议吗?
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 3.91 GB -- Worker memory limit: 2.00 GB
distributed.worker - WARNING - Worker is at 41% memory usage. Resuming worker. Process memory: 825.12 MB -- Worker memory limit: 2.00 GB
Run Code Online (Sandbox Code Playgroud)
当我尝试运行一段将算法应用于我拥有的数据集的代码时,会出现上述错误。阅读了https://distributed.dask.org/en/latest/worker.html上的文档后 ,我仍然不清楚此错误对该应用程序的结果有何影响。这是否只会影响该代码的速度或效率,还是会影响我的结果?
我们如何在Dask分布式中为每个工作人员选择--nthreads和--nprocs?我有3个工作线程,每个工作线程有2个线程,每个内核有4个内核,每个内核有1个线程(根据每个工作线程上'lscpu'Linux命令的输出)
我正在尝试重新整理 NetCDF 文件集合并在 AWS S3 上创建 Zarr 数据集。我有 168 个原始 NetCDF4 经典文件,其维度数组time: 1, y: 3840, x: 4608分块为chunks={'time':1, 'y':768, 'x':922}.
我想将此输出写入 Zarr,并且我想针对时间序列提取进行优化,因此在我的块中包含更多时间记录。我想我会使用 xarray 来帮助完成工作,因为我有很多处理器可以利用 Dask,而 xarrayxr.open_mfdataset和ds.to_zarr.
我第一次尝试rechunking来chunks={'time':24, 'y':768, 'x':922}匹配输入NetCDF4在分块x和y,但是当我试图写Zarr它抱怨,因为它需要在两个均匀的块大小x和y,只允许非均匀大小沿的最后一块time尺寸(不幸在x维度中,总大小 4608 不是块大小 922 的倍数。
然后我尝试chunks={'time':168, 'y':384, 'x':288}并开始工作,并且非常快速地进行了几分钟,然后变得越来越慢。最终在 50 分钟后,集群死亡:
4072 distributed.core - INFO - Event loop was unresponsive in Worker for 1.41s. This is often caused by long-running …Run Code Online (Sandbox Code Playgroud) 我想在 dask 中找到pandas.dataframe.sort_value函数的替代方法。
我是通过set_index 来的,但它会按单列排序。
如何对 Dask 数据框的多列进行排序?
我有一个超过35000行的熊猫系列。我想使用dask使其更有效率。但是,我的dask代码和pandas代码同时使用。最初,“ ser”是熊猫系列,而fun1和fun2是在系列的各个行中执行模式匹配的基本功能。
大熊猫
ser = ser.apply(fun1).apply(fun2)
达斯克
ser = dd.from_pandas(ser, npartitions = 16)
ser = ser.apply(fun1).apply(fun2)
在检查cpu核心的状态时,我发现并不是所有的核心都被使用了。只有一个内核已经习惯了100%。
有没有什么方法可以使用dask加快序列代码的速度,或者在串行执行dask操作时利用cpu的所有核心?
我正在尝试使用LocalCluster在我的笔记本电脑上使用dask-distributed,但我仍然没有找到一种方法让我的应用程序关闭而不会引发一些警告或触发matplotlib的一些奇怪的迭代(我正在使用tkAgg后端).
例如,如果我按此顺序关闭客户端和群集,则tk无法以适当的方式从内存中删除图像,我收到以下错误:
Traceback (most recent call last):
File "/opt/Python-3.6.0/lib/python3.6/tkinter/__init__.py", line 3501, in __del__
self.tk.call('image', 'delete', self.name)
RuntimeError: main thread is not in main loop
Run Code Online (Sandbox Code Playgroud)
例如,以下代码生成此错误:
from time import sleep
import numpy as np
import matplotlib.pyplot as plt
from dask.distributed import Client, LocalCluster
if __name__ == '__main__':
cluster = LocalCluster(
n_workers=2,
processes=True,
threads_per_worker=1
)
client = Client(cluster)
x = np.linspace(0, 1, 100)
y = x * x
plt.plot(x, y)
print('Computation complete! Stopping workers...')
client.close()
sleep(1)
cluster.close()
print('Execution complete!')
Run Code Online (Sandbox Code Playgroud)
该sleep(1)行使问题更容易出现,因为它不会在每次执行时出现. …
我正在使用dask.distributed(令人尴尬的并行任务)并行化一些代码。
.
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2, threads_per_worker=1,memory_limit =8e9)
client = Client(cluster)
Run Code Online (Sandbox Code Playgroud)
.
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.
Perhaps some other process is leaking memory? Process memory: 6.21 GB -- Worker memory limit: 8.00 GB
Run Code Online (Sandbox Code Playgroud)
暗示工作人员使用的部分 RAM 不在freed不同文件之间(我猜是剩余的过滤中间体......)
有没有办法在开始处理下一个图像之前释放工人的内存?我必须garbage collector在运行任务之间运行一个循环吗?