我试图了解 Dask 在本地计算机上的使用模式。
具体来说,
Pandas 通过单个核心执行这些操作,这些操作对我来说需要几个小时。我的机器上有 8 个核心,因此,我想使用 Dask 尽可能地并行化这些操作。
我的问题如下: Dask 中执行此操作的两种方式有什么区别:
import pandas as pd
from sklearn.datasets import load_iris
iris = load_iris()
Run Code Online (Sandbox Code Playgroud)
(1)
import dask.dataframe as dd
df = dd.from_pandas(
pd.DataFrame(iris.data, columns=iris.feature_names),
npartitions=2
)
df.mean().compute()
Run Code Online (Sandbox Code Playgroud)
(2)
import dask.dataframe as dd
from distributed import Client
client = Client()
df = client.persist(
dd.from_pandas(
pd.DataFrame(iris.data, columns=iris.feature_names),
npartitions=2
)
)
df.mean().compute()
Run Code Online (Sandbox Code Playgroud)
一种使用模式相对于另一种使用模式有什么好处?为什么我应该使用其中一种而不是另一种?
有没有办法直接将 Spark 数据帧转换为 Dask 数据帧?
我目前正在使用 Spark 的 .toPandas()函数将其转换为 pandas 数据帧,然后转换为 dask 数据帧。我相信这是低效的操作,并且没有利用dask的分布式处理能力,因为pandas永远是瓶颈。
以下LocalCluster配置有dask.distributed什么区别?
Client(n_workers=4, processes=False, threads_per_worker=1)
Run Code Online (Sandbox Code Playgroud)
相对
Client(n_workers=1, processes=True, threads_per_worker=4)
Run Code Online (Sandbox Code Playgroud)
它们都有四个线程处理任务图,但第一个线程有四个工作线程。那么,与具有多个线程的单个工作人员相比,让多个工作人员充当线程有什么好处?
编辑:只是澄清一下,我知道进程、线程和共享内存之间的区别,所以这个问题更多地针对这两个客户端的配置差异。
我对使用distributedpython 库非常陌生。我有 4 个工作线程,并且我已经为每个工作线程使用 14 个核心(在 16 个可用核心中)成功启动了一些并行运行,从而导致 4*14=56 个任务并行运行。
但是,如果我只想让每个工人同时执行一项任务,该怎么办?这样,我期望在工作线程上并行使用 14 个内核执行一项任务。
我读过大约 15 个 csv 文件:
df = dd.read_csv("gs://project/*.csv", blocksize=25e6,
storage_options={'token': fs.session.credentials})
Run Code Online (Sandbox Code Playgroud)
然后我保留了 Dataframe(它使用 7.33 GB 内存):
df = df.persist()
Run Code Online (Sandbox Code Playgroud)
我设置了一个新索引,因为我希望该字段上的分组尽可能高效:
df = df.set_index('column_a').persist()
Run Code Online (Sandbox Code Playgroud)
现在我有181个分区,180个分区。为了尝试我的分组速度有多快,我尝试了一个自定义应用函数,该函数仅打印组数据帧:
grouped_by_index = df.groupby('column_a').apply(lambda n: print(n)).compute()
Run Code Online (Sandbox Code Playgroud)
打印了一个包含正确列的数据框,但值为“1”、“foo”或“True”。例子:
column_b column_c column_d column_e column_f column_g \
index
a foo 1 foo 1 1 1
Run Code Online (Sandbox Code Playgroud)
我也收到警告:
/opt/conda/lib/python3.7/site-packages/ipykernel_launcher.py:1:UserWarning:
meta未指定,从部分数据推断。meta如果结果出乎意料,请提供。之前: .apply(func) 之后: .apply(func, meta={'x': 'f8', 'y': 'f8'}) 对于数据帧结果或: .apply(func, meta=('x' , 'f8'))
用于系列结果“”“启动 IPython 内核的入口点。
这里发生了什么?
我想在 Databricks 上使用 Dask。这应该是可能的(我不明白为什么不可以)。如果我导入它,会发生两种情况之一,要么我得到一个ImportError,但是当我安装distributed来解决这个问题时,DataBricks 只是说Cancelled没有抛出任何错误。
使用dask-jobqueue创建 dask 集群时是否可以手动指定仪表板的端口?当使用8787时,它会随机选择一个不同的端口,这意味着每次都需要设置不同的隧道。
from dask_jobqueue import PBSCluster
cluster = PBSCluster() # ideally here dashboard_port=
cluster.scale(10)
from dask.distributed import Client
client = Client(cluster) # Connect this local process to remote workers
Run Code Online (Sandbox Code Playgroud) 这只是在远程 Dask kubernetes 集群中重现内存泄漏问题的示例最小测试。
def load_geojson(pid):
import requests
import io
r = requests.get("https://github.com/datasets/geo-countries/raw/master/data/countries.geojson")
temp = r.json()
import sys
size_temp = sys.getsizeof(temp)
del temp
return size_temp
L_geojson = client.map(load_geojson, range(200))
del L_geojson
Run Code Online (Sandbox Code Playgroud)
观察:每次运行时工作内存(字节存储)稳定增加约 30 MB,并持续增加直到使用整个内存。我用 urllib 尝试的另一个测试,我观察到每次运行时内存随机增加和减少。
预期行为:删除引用 L_geojson 后应清理内存。
有人可以帮忙解决这个内存泄漏问题吗?
import joblib
from sklearn.externals.joblib import parallel_backend
with joblib.parallel_backend('dask'):
from dask_ml.model_selection import GridSearchCV
import xgboost
from xgboost import XGBRegressor
grid_search = GridSearchCV(estimator= XGBRegressor(), param_grid = param_grid, cv = 3, n_jobs = -1)
grid_search.fit(df2,df3)
Run Code Online (Sandbox Code Playgroud)
我使用两台本地机器创建了一个 dask 集群
client = dask.distributed.client('tcp://191.xxx.xx.xxx:8786')
Run Code Online (Sandbox Code Playgroud)
我正在尝试使用 dask gridsearchcv 找到最佳参数。我面临以下错误。
istributed.scheduler - ERROR - Couldn't gather keys {"('xgbregressor-fit-score-7cb7087b3aff75a31f487cfe5a9cedb0', 1202, 2)": ['tcp://127.0.0.1:3738']} state: ['processing'] workers: ['tcp://127.0.0.1:3738']
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:3738'], ('xgbregressor-fit-score-7cb7087b3aff75a31f487cfe5a9cedb0', 1202, 2)
NoneType: None
distributed.client - WARNING - Couldn't gather …Run Code Online (Sandbox Code Playgroud) dask.dataframe尽管import dask有效,但在尝试导入接口时,我收到问题标题中所述的错误。
我当前的 dask 版本是2022.7.0. 可能是什么问题?
dask-distributed ×10
dask ×9
python ×5
pandas ×2
cpu-cores ×1
dask-ml ×1
data-science ×1
databricks ×1
dataframe ×1
json ×1
pyspark ×1