我对使用distributedpython 库非常陌生。我有 4 个工作线程,并且我已经为每个工作线程使用 14 个核心(在 16 个可用核心中)成功启动了一些并行运行,从而导致 4*14=56 个任务并行运行。
但是,如果我只想让每个工人同时执行一项任务,该怎么办?这样,我期望在工作线程上并行使用 14 个内核执行一项任务。
我读过大约 15 个 csv 文件:
df = dd.read_csv("gs://project/*.csv", blocksize=25e6,
storage_options={'token': fs.session.credentials})
Run Code Online (Sandbox Code Playgroud)
然后我保留了 Dataframe(它使用 7.33 GB 内存):
df = df.persist()
Run Code Online (Sandbox Code Playgroud)
我设置了一个新索引,因为我希望该字段上的分组尽可能高效:
df = df.set_index('column_a').persist()
Run Code Online (Sandbox Code Playgroud)
现在我有181个分区,180个分区。为了尝试我的分组速度有多快,我尝试了一个自定义应用函数,该函数仅打印组数据帧:
grouped_by_index = df.groupby('column_a').apply(lambda n: print(n)).compute()
Run Code Online (Sandbox Code Playgroud)
打印了一个包含正确列的数据框,但值为“1”、“foo”或“True”。例子:
column_b column_c column_d column_e column_f column_g \
index
a foo 1 foo 1 1 1
Run Code Online (Sandbox Code Playgroud)
我也收到警告:
/opt/conda/lib/python3.7/site-packages/ipykernel_launcher.py:1:UserWarning:
meta未指定,从部分数据推断。meta如果结果出乎意料,请提供。之前: .apply(func) 之后: .apply(func, meta={'x': 'f8', 'y': 'f8'}) 对于数据帧结果或: .apply(func, meta=('x' , 'f8'))
用于系列结果“”“启动 IPython 内核的入口点。
这里发生了什么?
我想在 Databricks 上使用 Dask。这应该是可能的(我不明白为什么不可以)。如果我导入它,会发生两种情况之一,要么我得到一个ImportError,但是当我安装distributed来解决这个问题时,DataBricks 只是说Cancelled没有抛出任何错误。
我试图用来dask.delayed计算一个大矩阵以供以后计算使用。我只在一台本地机器上运行代码。当我使用dask单机调度程序时,它工作正常,但有点慢。要访问更多选项和性能监视器以改进我想dask.distributed在单台机器上使用的代码。然而,在dask.distributed客户端上运行相同的代码会慢慢耗尽所有可用内存并崩溃而没有任何实现。
是否有不同的设置问题的方法可以让dask.distributed客户端以更好的内存效率完成?
stack.rechunk优化块大小,包括按行和列自动分块(行块在dask调度程序中似乎运行得更快)。compute()和persist()(相同的结果)。dask.distributed使用线程和进程调度程序启动客户端并调整工作人员的数量。threads在死亡之前更快地使用更多的 RAM。dask.distributed内存限制,但忽略了内存限制。cluster = distributed.LocalCluster(memory_limit = 8e9)nX及nY以下),dask.distributed客户端确实完成了任务,但是它仍然比dask调度程序需要更多的时间和内存。这个例子重现了这个问题:
import dask
import distributed
import numpy as np
import dask.array as da …Run Code Online (Sandbox Code Playgroud) 使用dask-jobqueue创建 dask 集群时是否可以手动指定仪表板的端口?当使用8787时,它会随机选择一个不同的端口,这意味着每次都需要设置不同的隧道。
from dask_jobqueue import PBSCluster
cluster = PBSCluster() # ideally here dashboard_port=
cluster.scale(10)
from dask.distributed import Client
client = Client(cluster) # Connect this local process to remote workers
Run Code Online (Sandbox Code Playgroud) 我是 Dask 的新手,我知道要启动 dask 集群,我通常必须 ssh 到我的 hpc 集群,然后启动 SLURMCluster() 来启动一些集群,然后在启动后我需要 Client('node_ip')我的本地计算机。我想知道是否可以在本地计算机上启动 dask 集群而无需先登录 HPC。
这只是在远程 Dask kubernetes 集群中重现内存泄漏问题的示例最小测试。
def load_geojson(pid):
import requests
import io
r = requests.get("https://github.com/datasets/geo-countries/raw/master/data/countries.geojson")
temp = r.json()
import sys
size_temp = sys.getsizeof(temp)
del temp
return size_temp
L_geojson = client.map(load_geojson, range(200))
del L_geojson
Run Code Online (Sandbox Code Playgroud)
观察:每次运行时工作内存(字节存储)稳定增加约 30 MB,并持续增加直到使用整个内存。我用 urllib 尝试的另一个测试,我观察到每次运行时内存随机增加和减少。
预期行为:删除引用 L_geojson 后应清理内存。
有人可以帮忙解决这个内存泄漏问题吗?
import joblib
from sklearn.externals.joblib import parallel_backend
with joblib.parallel_backend('dask'):
from dask_ml.model_selection import GridSearchCV
import xgboost
from xgboost import XGBRegressor
grid_search = GridSearchCV(estimator= XGBRegressor(), param_grid = param_grid, cv = 3, n_jobs = -1)
grid_search.fit(df2,df3)
Run Code Online (Sandbox Code Playgroud)
我使用两台本地机器创建了一个 dask 集群
client = dask.distributed.client('tcp://191.xxx.xx.xxx:8786')
Run Code Online (Sandbox Code Playgroud)
我正在尝试使用 dask gridsearchcv 找到最佳参数。我面临以下错误。
istributed.scheduler - ERROR - Couldn't gather keys {"('xgbregressor-fit-score-7cb7087b3aff75a31f487cfe5a9cedb0', 1202, 2)": ['tcp://127.0.0.1:3738']} state: ['processing'] workers: ['tcp://127.0.0.1:3738']
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:3738'], ('xgbregressor-fit-score-7cb7087b3aff75a31f487cfe5a9cedb0', 1202, 2)
NoneType: None
distributed.client - WARNING - Couldn't gather …Run Code Online (Sandbox Code Playgroud) dask.dataframe尽管import dask有效,但在尝试导入接口时,我收到问题标题中所述的错误。
我当前的 dask 版本是2022.7.0. 可能是什么问题?
我正在多节点分布式 Dask 集群上运行多个并行任务。然而,一旦任务完成,worker 仍然持有大量内存,集群很快就会被填满。
client.restart()我在每个任务之后都尝试过client.cancel(df),第一个任务会杀死工作人员并发送到CancelledError其他正在运行的任务,这很麻烦,第二个任务没有多大帮助,因为我们在 Dask 的函数中使用了很多自定义对象和函数map。添加del已知变量gc.collect()也没有多大帮助。
我确信大部分内存占用是由于自定义 python 函数和使用client.map(..).
我的问题是:
trigger worker restart if no tasks are running right now?dask-distributed ×10
dask ×9
python ×5
cpu-cores ×1
dask-delayed ×1
dask-ml ×1
databricks ×1
dataframe ×1
json ×1
pandas ×1