标签: dask-distributed

Dask 分布式无法找到凭据

我无法使用数据帧读取:df_read_csv 访问 S3 上的文件。我收到错误:Exception: Unable to locate credentials

当我的 dask 分布式针对本地工作核心运行时,这可以正常工作。但是,当我导入具有附加工作服务器集群的客户端时,它会失败。我的集群是在 ubuntu 上使用 dask-ec2 创建的,标头服务器上有 1 个调度程序,3 个工作服务器(全部为 ubuntu)。

我假设失败是因为所有工作人员都需要访问 S3。我已经在所有这些设备上安装了 aws cli,并使用我的密钥进行连接,并且可以从 cli 列出 S3 存储桶。但是,由于某种原因,我的数据帧读取抛出一个 ubuntu 错误,指出 boto 无法找到凭据

我浏览了各种帖子,但似乎找不到任何有帮助的东西。这是错误的屏幕截图:

错误

ubuntu amazon-s3 dask-distributed

5
推荐指数
1
解决办法
1111
查看次数

Dask 按索引分配给数据帧列会引发 ValueError

我有一个按数据框分组的转换管道。所有函数都获取 aDataframeGroupBy并计算一些特征。然后将这些特征存储在数据框中。数据帧的索引是相同的,因为所有特征都是由同一DataFrameGroupBy对象派生的。函数如下所示:

def function(group_by_df, features_df=None):
    # actions to perform to group_by_df e.g
    feature_max = group_by_df.column.max() # This is a series object with index the same as group_by_df
    if features_df is not None:
        features_df['feature_name'] = feature_max
    else:
        features_df = feature_max.to_frame(name='feature_name')
    return features_df
Run Code Online (Sandbox Code Playgroud)

因此,由于这是迭代的,因此第一次 features_df 为 none,因此创建了数据帧。然后,当执行所有其他迭代时,feature_df 具有包含所有先前特征的列。在尝试将由生成的一系列分配给的一个步骤中,group_by_dffeature_df收到以下错误:

ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.
Run Code Online (Sandbox Code Playgroud)

奇怪的部分是运行以下代码:

featues_pandas = features_df.compute()
feature_series_with_issue_pandas = feature_series_with_issue_pandas.compute()
features_pandas['feature_name'] = feature_series_with_issue_pandas …
Run Code Online (Sandbox Code Playgroud)

python dataframe pandas dask dask-distributed

5
推荐指数
0
解决办法
654
查看次数

Dask groupby 具有多列问题

我使用具有dataframe.from_delayed以下列的方法创建了以下数据框

_id     hour_timestamp  http_method     total_hits  username    hour    weekday. 
Run Code Online (Sandbox Code Playgroud)

有关源数据框的一些详细信息:

hits_rate_stats._meta.dtypes
_id                       object
hour_timestamp    datetime64[ns]
http_method               object
total_hits                object
username                  object
hour                       int64
weekday                    int64
dtype: object
Run Code Online (Sandbox Code Playgroud)

元索引:

RangeIndex(start=0, stop=0, step=1)
Run Code Online (Sandbox Code Playgroud)

当我执行以下代码时

my_df_grouped = my_df.groupby(['username', 'http_method', 'weekday', 'hour'])
my_df_grouped.total_hits.sum().reset_index().compute()
Run Code Online (Sandbox Code Playgroud)

我得到以下异常:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-27-b24b24fc86db> in <module>()
----> 1 hits_rate_stats_grouped.total_hits.sum().reset_index().compute()

/home/avlach/virtualenvs/enorasys_sa_v2/local/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
    141         dask.base.compute
    142         """
--> 143         (result,) = compute(self, traverse=False, **kwargs)
    144         return result
    145 

/home/avlach/virtualenvs/enorasys_sa_v2/local/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
    390     postcomputes = …
Run Code Online (Sandbox Code Playgroud)

python dataframe dask dask-distributed

5
推荐指数
0
解决办法
1065
查看次数

如何在 Dask.Distributed 中将任务分配给特定工作人员

我对使用 Dask Distributed 作为任务执行器很感兴趣。在 Celery 中,可以将任务分配给特定的工作人员。如何使用 Dask 分布式?

dask-distributed

5
推荐指数
1
解决办法
1570
查看次数

Dask 客户端无法连接到 dask-scheduler

我使用的是 dask 1.1.1(最新版本),并且我已使用以下命令在命令行启动了 dask 调度程序:

$ dask-scheduler --port 9796 --bokeh-port 9797 --bokeh-prefix my_project
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://10.1.0.107:9796
distributed.scheduler - INFO -       bokeh at:                     :9797
distributed.scheduler - INFO - Local Directory:    /tmp/scheduler-pdnwslep
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Register tcp://10.1.25.4:36310
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.1.25.4:36310
distributed.core - INFO - Starting established connection
Run Code Online (Sandbox Code Playgroud)

然后...我尝试使用以下代码启动客户端连接到调度程序:

from dask.distributed import Client
c = …
Run Code Online (Sandbox Code Playgroud)

ssl-certificate python-3.x dask-distributed

5
推荐指数
1
解决办法
2036
查看次数

Dask Distributed: Reading .csv from HDFS

I'm performance testing Dask using "Distributed Pandas on a Cluster with Dask DataFrames" as a guide.

In Matthew's example, he has a 20GB file and 64 workers (8 physical nodes).

In my case, I have a 82GB file and 288 workers (12 physical nodes; there's a HDFS data node on each).

On all 12 nodes, I can access HDFS and execute a simple Python script that displays info on a file:

import pyarrow as pa
fs = pa.hdfs.connect([url], 8022)
print(str(fs.info('/path/to/file.csv'))) …
Run Code Online (Sandbox Code Playgroud)

python hdfs dask dask-distributed

5
推荐指数
1
解决办法
2509
查看次数

如何在 Dask 中正确使用 client.scatter

当执行“大量”任务时,我收到此错误:

考虑使用 client.scatter 提前分散大型对象,以减轻调度程序负担并保留工作人员的数据

我还收到了一堆这样的消息:

tornado.application - ERROR - Exception in callback <bound method BokehTornado._keep_alive of <bokeh.server.tornado.BokehTornado object at 0x7f20d25e10b8>>
Traceback (most recent call last):
  File "/home/muammar/.local/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
    return self.callback()
  File "/home/muammar/.local/lib/python3.7/site-packages/bokeh/server/tornado.py", line 542, in _keep_alive
    c.send_ping()
  File "/home/muammar/.local/lib/python3.7/site-packages/bokeh/server/connection.py", line 80, in send_ping
    self._socket.ping(codecs.encode(str(self._ping_count), "utf-8"))
  File "/home/muammar/.local/lib/python3.7/site-packages/tornado/websocket.py", line 447, in ping
    raise WebSocketClosedError()
tornado.websocket.WebSocketClosedError
tornado.application - ERROR - Exception in callback <bound method BokehTornado._keep_alive of <bokeh.server.tornado.BokehTornado object at 0x7f20d25e10b8>>
Traceback (most recent call last):
  File "/home/muammar/.local/lib/python3.7/site-packages/tornado/ioloop.py", line …
Run Code Online (Sandbox Code Playgroud)

parallel-processing python-3.x dask dask-distributed

5
推荐指数
1
解决办法
6548
查看次数

当有更多任务时,许多分布式 Dask Worker 在一次评估后就会闲置,或者从未收到任何工作

我们\xe2\x80\x99使用dask来优化深度学习器(DL)架构,方法是生成设计,然后将它们发送给dask工作人员,而DASK工作人员又使用pytorch进行训练。我们观察到一些工作人员似乎没有开始,而那些完成评估 DL 的工作人员不会立即开始评估下一个等待的 DL。

\n\n

我们\xe2\x80\x99在橡树岭国家实验室\xe2\x80\x99s Summit超级计算机上实现了这一点。对于我们的原型,我们提交了一个批处理作业,该作业分配 92 个节点,启动一个 dask 调度程序和 92 个 dask 工作线程,每个节点都有一个工作线程。每个节点有 6 个 Nvidia Volta V100、两个 IBM Power9 和 512 GB DDR4 + 96GB HMB@ 内存。然后,每个工作人员使用 pytorch 训练 DL 并将其验证准确度返回为 \xe2\x80\x9cfitness。\xe2\x80\x9d 但是,如果所提出的 DL 架构不可行,则会引发异常,并且相关的适应度将变为-MAXINT。

\n\n

在只有两名工作人员的初始试运行中,我们注意到,如果一名工作人员评估了畸形的深度学习设计,那么它会立即被分配一个新的深度学习来评估。直到运行结束,两名工人都没有闲着。

\n\n

这是实际代码的精简版本。

\n\n
from dask.distributed import Client, as_completed\n\nclient = Client(scheduler_file=\xe2\x80\x99scheduler.json\xe2\x80\x99)\n\n# posed_dl_designs is a list of random DL architectures, \n# eval_dl is the entry point for the pytorch training\nworker_futures = client.map(eval_dl, posed_dl_designs)\n\nfor res in as_completed(worker_futures):\n    evaluated_dl = res.result()\n\n    # pool is …
Run Code Online (Sandbox Code Playgroud)

python dask dask-distributed

5
推荐指数
1
解决办法
596
查看次数

触发Dask Worker释放内存

我正在使用 Dask 分配一些函数的计算。我的总体布局如下所示:


    from dask.distributed import Client, LocalCluster, as_completed

    cluster = LocalCluster(processes=config.use_dask_local_processes,
                           n_workers=1,
                           threads_per_worker=1,
                           )
    client = Client(cluster)
    cluster.scale(config.dask_local_worker_instances)

    work_futures = []

    # For each group do work
    for group in groups:
        fcast_futures.append(client.submit(_work, group))

    # Wait till the work is done
    for done_work in as_completed(fcast_futures, with_results=False):
        try:
            result = done_work.result()
        except Exception as error:
            log.exception(error)

Run Code Online (Sandbox Code Playgroud)

我的问题是,对于大量工作,我往往会达到内存限制。我看到很多:

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory? …
Run Code Online (Sandbox Code Playgroud)

dask dask-distributed

5
推荐指数
1
解决办法
5942
查看次数

限制Dask CPU和内存使用(单节点)

我在一台计算机上运行 Dask,在该计算机上运行.compute()对巨大 parquet 文件执行计算将导致 dask 耗尽系统上的所有 CPU 核心。

import dask as dd

df = dd.read_parquet(parquet_file)  # very large file
print(df.names.unique().compute())
Run Code Online (Sandbox Code Playgroud)

是否可以将 dask 配置为使用特定数量的 CPU 核心并将其内存使用限制为 32 GB?使用Python 3.7.2和Dask 2.9.2。

python python-3.x pandas dask dask-distributed

5
推荐指数
1
解决办法
2678
查看次数