标签: dask-distributed

Dask Distributed - 如何为每个工作线程运行一个任务,使该任务在工作线程可用的所有核心上运行?

我对使用distributedpython 库非常陌生。我有 4 个工作线程,并且我已经为每个工作线程使用 14 个核心(在 16 个可用核心中)成功启动了一些并行运行,从而导致 4*14=56 个任务并行运行。

但是,如果我只想让每个工人同时执行一项任务,该怎么办?这样,我期望在工作线程上并行使用 14 个内核执行一项任务。

python cpu-cores dask-distributed

6
推荐指数
1
解决办法
2774
查看次数

为什么 Dask 在我的 Dataframe 中填写“foo”和 1

我读过大约 15 个 csv 文件:

df = dd.read_csv("gs://project/*.csv", blocksize=25e6,
                 storage_options={'token': fs.session.credentials})
Run Code Online (Sandbox Code Playgroud)

然后我保留了 Dataframe(它使用 7.33 GB 内存):

df = df.persist()
Run Code Online (Sandbox Code Playgroud)

我设置了一个新索引,因为我希望该字段上的分组尽可能高效:

df = df.set_index('column_a').persist()
Run Code Online (Sandbox Code Playgroud)

现在我有181个分区,180个分区。为了尝试我的分组速度有多快,我尝试了一个自定义应用函数,该函数仅打印组数据帧:

grouped_by_index = df.groupby('column_a').apply(lambda n: print(n)).compute()
Run Code Online (Sandbox Code Playgroud)

打印了一个包含正确列的数据框,但值为“1”、“foo”或“True”。例子:

column_b  column_c column_d  column_e  column_f  column_g  \
index                                                                   
a          foo           1      foo        1           1           1
Run Code Online (Sandbox Code Playgroud)

我也收到警告:

/opt/conda/lib/python3.7/site-packages/ipykernel_launcher.py:1:UserWarning:meta未指定,从部分数据推断。meta如果结果出乎意料,请提供。之前: .apply(func) 之后: .apply(func, meta={'x': 'f8', 'y': 'f8'}) 对于数据帧结果或: .apply(func, meta=('x' , 'f8'))
用于系列结果“”“启动 IPython 内核的入口点。

这里发生了什么?

dataframe dask dask-distributed

6
推荐指数
1
解决办法
2115
查看次数

如何在 Databricks 上使用 Dask

我想在 Databricks 上使用 Dask。这应该是可能的(我不明白为什么不可以)。如果我导入它,会发生两种情况之一,要么我得到一个ImportError,但是当我安装distributed来解决这个问题时,DataBricks 只是说Cancelled没有抛出任何错误。

dask databricks dask-distributed azure-databricks

6
推荐指数
3
解决办法
5444
查看次数

dask 和 dask.distributed 之间的巨大内存使用差异

我试图用来dask.delayed计算一个大矩阵以供以后计算使用。我只在一台本地机器上运行代码。当我使用dask单机调度程序时,它工作正常,但有点慢。要访问更多选项和性能监视器以改进我想dask.distributed在单台机器上使用的代码。然而,在dask.distributed客户端上运行相同的代码会慢慢耗尽所有可用内存并崩溃而没有任何实现。

是否有不同的设置问题的方法可以让dask.distributed客户端以更好的内存效率完成?

  • 我通读了 dask.delayed最佳实践指南,并认为我们正在正确使用它。
  • 我已经在本地 Win 10 PC (64GB RAM) 和 Azure Win Server 2012 VM (256 GB) 上运行它,结果相同。
  • 我试过手动设置块。
  • 我尝试使用stack.rechunk优化块大小,包括按行和列自动分块(行块在dask调度程序中似乎运行得更快)。
  • 我试过使用compute()persist()(相同的结果)。
  • 我尝试dask.distributed使用线程和进程调度程序启动客户端并调整工作人员的数量。threads在死亡之前更快地使用更多的 RAM。
  • 我已尝试根据此答案设置dask.distributed内存限制,但忽略了内存限制。cluster = distributed.LocalCluster(memory_limit = 8e9)
  • 如果我减少问题的大小(nXnY以下),dask.distributed客户端确实完成了任务,但是它仍然比dask调度程序需要更多的时间和内存。

这个例子重现了这个问题:

import dask
import distributed
import numpy as np
import dask.array as da …
Run Code Online (Sandbox Code Playgroud)

python dask dask-delayed dask-distributed

6
推荐指数
0
解决办法
1492
查看次数

指定 das 的仪表板端口

使用dask-jobqueue创建 dask 集群时是否可以手动指定仪表板的端口?当使用8787时,它会随机选择一个不同的端口,这意味着每次都需要设置不同的隧道。

from dask_jobqueue import PBSCluster
cluster = PBSCluster() # ideally here dashboard_port=
cluster.scale(10)         

from dask.distributed import Client
client = Client(cluster)  # Connect this local process to remote workers
Run Code Online (Sandbox Code Playgroud)

python dask dask-distributed

6
推荐指数
1
解决办法
1730
查看次数

是否可以从本地计算机远程启动 HPC(slurm)上的 dask 集群?

我是 Dask 的新手,我知道要启动 dask 集群,我通常必须 ssh 到我的 hpc 集群,然后启动 SLURMCluster() 来启动一些集群,然后在启动后我需要 Client('node_ip')我的本地计算机。我想知道是否可以在本地计算机上启动 dask 集群而无需先登录 HPC。

dask dask-distributed

6
推荐指数
0
解决办法
205
查看次数

json和请求的Dask内存泄漏问题

这只是在远程 Dask kubernetes 集群中重现内存泄漏问题的示例最小测试。

def load_geojson(pid):
    import requests
    import io
    r = requests.get("https://github.com/datasets/geo-countries/raw/master/data/countries.geojson")
    temp = r.json()
    import sys
    size_temp = sys.getsizeof(temp)
    del temp
    return size_temp

L_geojson = client.map(load_geojson, range(200))

del L_geojson
Run Code Online (Sandbox Code Playgroud)

观察:每次运行时工作内存(字节存储)稳定增加约 30 MB,并持续增加直到使用整个内存。我用 urllib 尝试的另一个测试,我观察到每次运行时内存随机增加和减少。

预期行为:删除引用 L_geojson 后应清理内存。

有人可以帮忙解决这个内存泄漏问题吗?

json python-requests dask dask-distributed dask-kubernetes

6
推荐指数
1
解决办法
360
查看次数

Daskdistributed.scheduler - 错误 - 无法收集密钥

import joblib

from sklearn.externals.joblib import parallel_backend
with joblib.parallel_backend('dask'):
 
    from dask_ml.model_selection import GridSearchCV
    import xgboost
    from xgboost import XGBRegressor
    grid_search = GridSearchCV(estimator= XGBRegressor(), param_grid = param_grid, cv = 3, n_jobs = -1)
    grid_search.fit(df2,df3)
Run Code Online (Sandbox Code Playgroud)

我使用两台本地机器创建了一个 dask 集群

client = dask.distributed.client('tcp://191.xxx.xx.xxx:8786')
Run Code Online (Sandbox Code Playgroud)

我正在尝试使用 dask gridsearchcv 找到最佳参数。我面临以下错误。

istributed.scheduler - ERROR - Couldn't gather keys {"('xgbregressor-fit-score-7cb7087b3aff75a31f487cfe5a9cedb0', 1202, 2)": ['tcp://127.0.0.1:3738']} state: ['processing'] workers: ['tcp://127.0.0.1:3738']
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:3738'], ('xgbregressor-fit-score-7cb7087b3aff75a31f487cfe5a9cedb0', 1202, 2)
NoneType: None
distributed.client - WARNING - Couldn't gather …
Run Code Online (Sandbox Code Playgroud)

python dask dask-distributed dask-ml

6
推荐指数
1
解决办法
464
查看次数

AttributeError:导入 Dask 时模块“pandas.core.strings”没有属性“StringMethods”

dask.dataframe尽管import dask有效,但在尝试导入接口时,我收到问题标题中所述的错误。

我当前的 dask 版本是2022.7.0. 可能是什么问题?

pandas dask dask-distributed dask-dataframe

6
推荐指数
1
解决办法
1万
查看次数

Dask 工作线程的内存清理

我正在多节点分布式 Dask 集群上运行多个并行任务。然而,一旦任务完成,worker 仍然持有大量内存,集群很快就会被填满。

client.restart()我在每个任务之后都尝试过client.cancel(df),第一个任务会杀死工作人员并发送到CancelledError其他正在运行的任务,这很麻烦,第二个任务没有多大帮助,因为我们在 Dask 的函数中使用了很多自定义对象和函数map。添加del已知变量gc.collect()也没有多大帮助。

我确信大部分内存占用是由于自定义 python 函数和使用client.map(..).

我的问题是:

  1. 有没有一种从命令行或其他方式类似的方法trigger worker restart if no tasks are running right now
  2. 如果不是,这个问题有哪些可能的解决方案?我不可能避免 Dask 任务中的自定义对象和纯 python 函数。

python dask dask-distributed

5
推荐指数
1
解决办法
3097
查看次数