标签: dask-distributed

dask 的本地使用:到 Client() 还是不到 Client()?

我试图了解 Dask 在本地计算机上的使用模式。

具体来说,

  • 我有一个适合内存的数据集
  • 我想做一些熊猫操作
    • 通过...分组...
    • 日期解析
    • ETC。

Pandas 通过单个核心执行这些操作,这些操作对我来说需要几个小时。我的机器上有 8 个核心,因此,我想使用 Dask 尽可能地并行化这些操作。

我的问题如下: Dask 中执行此操作的两种方式有什么区别:

import pandas as pd
from sklearn.datasets import load_iris

iris = load_iris()
Run Code Online (Sandbox Code Playgroud)

(1)

import dask.dataframe as dd

df = dd.from_pandas(
    pd.DataFrame(iris.data, columns=iris.feature_names),
    npartitions=2
)

df.mean().compute()
Run Code Online (Sandbox Code Playgroud)

(2)

import dask.dataframe as dd
from distributed import Client

client = Client()

df = client.persist(
    dd.from_pandas(
        pd.DataFrame(iris.data, columns=iris.feature_names),
        npartitions=2
    )
)

df.mean().compute()
Run Code Online (Sandbox Code Playgroud)

一种使用模式相对于另一种使用模式有什么好处?为什么我应该使用其中一种而不是另一种?

python dask data-science dask-distributed

7
推荐指数
1
解决办法
2950
查看次数

将 Spark 数据帧转换为 dask 数据帧

有没有办法直接将 Spark 数据帧转换为 Dask 数据帧?

我目前正在使用 Spark 的 .toPandas()函数将其转换为 pandas 数据帧,然后转换为 dask 数据帧。我相信这是低效的操作,并且没有利用dask的分布式处理能力,因为pandas永远是瓶颈。

pandas pyspark dask dask-distributed

7
推荐指数
1
解决办法
3701
查看次数

dask.distributed LocalCluster 与线程与进程之间的区别

以下LocalCluster配置有dask.distributed什么区别?

Client(n_workers=4, processes=False, threads_per_worker=1)
Run Code Online (Sandbox Code Playgroud)

相对

Client(n_workers=1, processes=True, threads_per_worker=4)
Run Code Online (Sandbox Code Playgroud)

它们都有四个线程处理任务图,但第一个线程有四个工作线程。那么,与具有多个线程的单个工作人员相比,让多个工作人员充当线程有什么好处?

编辑:只是澄清一下,我知道进程、线程和共享内存之间的区别,所以这个问题更多地针对这两个客户端的配置差异。

python dask dask-distributed

7
推荐指数
3
解决办法
2461
查看次数

Dask Distributed - 如何为每个工作线程运行一个任务,使该任务在工作线程可用的所有核心上运行?

我对使用distributedpython 库非常陌生。我有 4 个工作线程,并且我已经为每个工作线程使用 14 个核心(在 16 个可用核心中)成功启动了一些并行运行,从而导致 4*14=56 个任务并行运行。

但是,如果我只想让每个工人同时执行一项任务,该怎么办?这样,我期望在工作线程上并行使用 14 个内核执行一项任务。

python cpu-cores dask-distributed

6
推荐指数
1
解决办法
2774
查看次数

为什么 Dask 在我的 Dataframe 中填写“foo”和 1

我读过大约 15 个 csv 文件:

df = dd.read_csv("gs://project/*.csv", blocksize=25e6,
                 storage_options={'token': fs.session.credentials})
Run Code Online (Sandbox Code Playgroud)

然后我保留了 Dataframe(它使用 7.33 GB 内存):

df = df.persist()
Run Code Online (Sandbox Code Playgroud)

我设置了一个新索引,因为我希望该字段上的分组尽可能高效:

df = df.set_index('column_a').persist()
Run Code Online (Sandbox Code Playgroud)

现在我有181个分区,180个分区。为了尝试我的分组速度有多快,我尝试了一个自定义应用函数,该函数仅打印组数据帧:

grouped_by_index = df.groupby('column_a').apply(lambda n: print(n)).compute()
Run Code Online (Sandbox Code Playgroud)

打印了一个包含正确列的数据框,但值为“1”、“foo”或“True”。例子:

column_b  column_c column_d  column_e  column_f  column_g  \
index                                                                   
a          foo           1      foo        1           1           1
Run Code Online (Sandbox Code Playgroud)

我也收到警告:

/opt/conda/lib/python3.7/site-packages/ipykernel_launcher.py:1:UserWarning:meta未指定,从部分数据推断。meta如果结果出乎意料,请提供。之前: .apply(func) 之后: .apply(func, meta={'x': 'f8', 'y': 'f8'}) 对于数据帧结果或: .apply(func, meta=('x' , 'f8'))
用于系列结果“”“启动 IPython 内核的入口点。

这里发生了什么?

dataframe dask dask-distributed

6
推荐指数
1
解决办法
2115
查看次数

如何在 Databricks 上使用 Dask

我想在 Databricks 上使用 Dask。这应该是可能的(我不明白为什么不可以)。如果我导入它,会发生两种情况之一,要么我得到一个ImportError,但是当我安装distributed来解决这个问题时,DataBricks 只是说Cancelled没有抛出任何错误。

dask databricks dask-distributed azure-databricks

6
推荐指数
3
解决办法
5444
查看次数

指定 das 的仪表板端口

使用dask-jobqueue创建 dask 集群时是否可以手动指定仪表板的端口?当使用8787时,它会随机选择一个不同的端口,这意味着每次都需要设置不同的隧道。

from dask_jobqueue import PBSCluster
cluster = PBSCluster() # ideally here dashboard_port=
cluster.scale(10)         

from dask.distributed import Client
client = Client(cluster)  # Connect this local process to remote workers
Run Code Online (Sandbox Code Playgroud)

python dask dask-distributed

6
推荐指数
1
解决办法
1730
查看次数

json和请求的Dask内存泄漏问题

这只是在远程 Dask kubernetes 集群中重现内存泄漏问题的示例最小测试。

def load_geojson(pid):
    import requests
    import io
    r = requests.get("https://github.com/datasets/geo-countries/raw/master/data/countries.geojson")
    temp = r.json()
    import sys
    size_temp = sys.getsizeof(temp)
    del temp
    return size_temp

L_geojson = client.map(load_geojson, range(200))

del L_geojson
Run Code Online (Sandbox Code Playgroud)

观察:每次运行时工作内存(字节存储)稳定增加约 30 MB,并持续增加直到使用整个内存。我用 urllib 尝试的另一个测试,我观察到每次运行时内存随机增加和减少。

预期行为:删除引用 L_geojson 后应清理内存。

有人可以帮忙解决这个内存泄漏问题吗?

json python-requests dask dask-distributed dask-kubernetes

6
推荐指数
1
解决办法
360
查看次数

Daskdistributed.scheduler - 错误 - 无法收集密钥

import joblib

from sklearn.externals.joblib import parallel_backend
with joblib.parallel_backend('dask'):
 
    from dask_ml.model_selection import GridSearchCV
    import xgboost
    from xgboost import XGBRegressor
    grid_search = GridSearchCV(estimator= XGBRegressor(), param_grid = param_grid, cv = 3, n_jobs = -1)
    grid_search.fit(df2,df3)
Run Code Online (Sandbox Code Playgroud)

我使用两台本地机器创建了一个 dask 集群

client = dask.distributed.client('tcp://191.xxx.xx.xxx:8786')
Run Code Online (Sandbox Code Playgroud)

我正在尝试使用 dask gridsearchcv 找到最佳参数。我面临以下错误。

istributed.scheduler - ERROR - Couldn't gather keys {"('xgbregressor-fit-score-7cb7087b3aff75a31f487cfe5a9cedb0', 1202, 2)": ['tcp://127.0.0.1:3738']} state: ['processing'] workers: ['tcp://127.0.0.1:3738']
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:3738'], ('xgbregressor-fit-score-7cb7087b3aff75a31f487cfe5a9cedb0', 1202, 2)
NoneType: None
distributed.client - WARNING - Couldn't gather …
Run Code Online (Sandbox Code Playgroud)

python dask dask-distributed dask-ml

6
推荐指数
1
解决办法
464
查看次数

AttributeError:导入 Dask 时模块“pandas.core.strings”没有属性“StringMethods”

dask.dataframe尽管import dask有效,但在尝试导入接口时,我收到问题标题中所述的错误。

我当前的 dask 版本是2022.7.0. 可能是什么问题?

pandas dask dask-distributed dask-dataframe

6
推荐指数
1
解决办法
1万
查看次数