我无法使用数据帧读取:df_read_csv 访问 S3 上的文件。我收到错误:Exception: Unable to locate credentials
当我的 dask 分布式针对本地工作核心运行时,这可以正常工作。但是,当我导入具有附加工作服务器集群的客户端时,它会失败。我的集群是在 ubuntu 上使用 dask-ec2 创建的,标头服务器上有 1 个调度程序,3 个工作服务器(全部为 ubuntu)。
我假设失败是因为所有工作人员都需要访问 S3。我已经在所有这些设备上安装了 aws cli,并使用我的密钥进行连接,并且可以从 cli 列出 S3 存储桶。但是,由于某种原因,我的数据帧读取抛出一个 ubuntu 错误,指出 boto 无法找到凭据
我浏览了各种帖子,但似乎找不到任何有帮助的东西。这是错误的屏幕截图:

我有一个按数据框分组的转换管道。所有函数都获取 aDataframeGroupBy并计算一些特征。然后将这些特征存储在数据框中。数据帧的索引是相同的,因为所有特征都是由同一DataFrameGroupBy对象派生的。函数如下所示:
def function(group_by_df, features_df=None):
# actions to perform to group_by_df e.g
feature_max = group_by_df.column.max() # This is a series object with index the same as group_by_df
if features_df is not None:
features_df['feature_name'] = feature_max
else:
features_df = feature_max.to_frame(name='feature_name')
return features_df
Run Code Online (Sandbox Code Playgroud)
因此,由于这是迭代的,因此第一次 features_df 为 none,因此创建了数据帧。然后,当执行所有其他迭代时,feature_df 具有包含所有先前特征的列。在尝试将由生成的一系列分配给的一个步骤中,group_by_df我feature_df收到以下错误:
ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.
Run Code Online (Sandbox Code Playgroud)
奇怪的部分是运行以下代码:
featues_pandas = features_df.compute()
feature_series_with_issue_pandas = feature_series_with_issue_pandas.compute()
features_pandas['feature_name'] = feature_series_with_issue_pandas …Run Code Online (Sandbox Code Playgroud) 我使用具有dataframe.from_delayed以下列的方法创建了以下数据框
_id hour_timestamp http_method total_hits username hour weekday.
Run Code Online (Sandbox Code Playgroud)
有关源数据框的一些详细信息:
hits_rate_stats._meta.dtypes
_id object
hour_timestamp datetime64[ns]
http_method object
total_hits object
username object
hour int64
weekday int64
dtype: object
Run Code Online (Sandbox Code Playgroud)
元索引:
RangeIndex(start=0, stop=0, step=1)
Run Code Online (Sandbox Code Playgroud)
当我执行以下代码时
my_df_grouped = my_df.groupby(['username', 'http_method', 'weekday', 'hour'])
my_df_grouped.total_hits.sum().reset_index().compute()
Run Code Online (Sandbox Code Playgroud)
我得到以下异常:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-27-b24b24fc86db> in <module>()
----> 1 hits_rate_stats_grouped.total_hits.sum().reset_index().compute()
/home/avlach/virtualenvs/enorasys_sa_v2/local/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
141 dask.base.compute
142 """
--> 143 (result,) = compute(self, traverse=False, **kwargs)
144 return result
145
/home/avlach/virtualenvs/enorasys_sa_v2/local/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
390 postcomputes = …Run Code Online (Sandbox Code Playgroud) 我对使用 Dask Distributed 作为任务执行器很感兴趣。在 Celery 中,可以将任务分配给特定的工作人员。如何使用 Dask 分布式?
我使用的是 dask 1.1.1(最新版本),并且我已使用以下命令在命令行启动了 dask 调度程序:
$ dask-scheduler --port 9796 --bokeh-port 9797 --bokeh-prefix my_project
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://10.1.0.107:9796
distributed.scheduler - INFO - bokeh at: :9797
distributed.scheduler - INFO - Local Directory: /tmp/scheduler-pdnwslep
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Register tcp://10.1.25.4:36310
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.1.25.4:36310
distributed.core - INFO - Starting established connection
Run Code Online (Sandbox Code Playgroud)
然后...我尝试使用以下代码启动客户端连接到调度程序:
from dask.distributed import Client
c = …Run Code Online (Sandbox Code Playgroud) I'm performance testing Dask using "Distributed Pandas on a Cluster with Dask DataFrames" as a guide.
In Matthew's example, he has a 20GB file and 64 workers (8 physical nodes).
In my case, I have a 82GB file and 288 workers (12 physical nodes; there's a HDFS data node on each).
On all 12 nodes, I can access HDFS and execute a simple Python script that displays info on a file:
import pyarrow as pa
fs = pa.hdfs.connect([url], 8022)
print(str(fs.info('/path/to/file.csv'))) …Run Code Online (Sandbox Code Playgroud) 当执行“大量”任务时,我收到此错误:
考虑使用 client.scatter 提前分散大型对象,以减轻调度程序负担并保留工作人员的数据
我还收到了一堆这样的消息:
tornado.application - ERROR - Exception in callback <bound method BokehTornado._keep_alive of <bokeh.server.tornado.BokehTornado object at 0x7f20d25e10b8>>
Traceback (most recent call last):
File "/home/muammar/.local/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
return self.callback()
File "/home/muammar/.local/lib/python3.7/site-packages/bokeh/server/tornado.py", line 542, in _keep_alive
c.send_ping()
File "/home/muammar/.local/lib/python3.7/site-packages/bokeh/server/connection.py", line 80, in send_ping
self._socket.ping(codecs.encode(str(self._ping_count), "utf-8"))
File "/home/muammar/.local/lib/python3.7/site-packages/tornado/websocket.py", line 447, in ping
raise WebSocketClosedError()
tornado.websocket.WebSocketClosedError
tornado.application - ERROR - Exception in callback <bound method BokehTornado._keep_alive of <bokeh.server.tornado.BokehTornado object at 0x7f20d25e10b8>>
Traceback (most recent call last):
File "/home/muammar/.local/lib/python3.7/site-packages/tornado/ioloop.py", line …Run Code Online (Sandbox Code Playgroud) 我们\xe2\x80\x99使用dask来优化深度学习器(DL)架构,方法是生成设计,然后将它们发送给dask工作人员,而DASK工作人员又使用pytorch进行训练。我们观察到一些工作人员似乎没有开始,而那些完成评估 DL 的工作人员不会立即开始评估下一个等待的 DL。
\n\n我们\xe2\x80\x99在橡树岭国家实验室\xe2\x80\x99s Summit超级计算机上实现了这一点。对于我们的原型,我们提交了一个批处理作业,该作业分配 92 个节点,启动一个 dask 调度程序和 92 个 dask 工作线程,每个节点都有一个工作线程。每个节点有 6 个 Nvidia Volta V100、两个 IBM Power9 和 512 GB DDR4 + 96GB HMB@ 内存。然后,每个工作人员使用 pytorch 训练 DL 并将其验证准确度返回为 \xe2\x80\x9cfitness。\xe2\x80\x9d 但是,如果所提出的 DL 架构不可行,则会引发异常,并且相关的适应度将变为-MAXINT。
\n\n在只有两名工作人员的初始试运行中,我们注意到,如果一名工作人员评估了畸形的深度学习设计,那么它会立即被分配一个新的深度学习来评估。直到运行结束,两名工人都没有闲着。
\n\n这是实际代码的精简版本。
\n\nfrom dask.distributed import Client, as_completed\n\nclient = Client(scheduler_file=\xe2\x80\x99scheduler.json\xe2\x80\x99)\n\n# posed_dl_designs is a list of random DL architectures, \n# eval_dl is the entry point for the pytorch training\nworker_futures = client.map(eval_dl, posed_dl_designs)\n\nfor res in as_completed(worker_futures):\n evaluated_dl = res.result()\n\n # pool is …Run Code Online (Sandbox Code Playgroud) 我正在使用 Dask 分配一些函数的计算。我的总体布局如下所示:
from dask.distributed import Client, LocalCluster, as_completed
cluster = LocalCluster(processes=config.use_dask_local_processes,
n_workers=1,
threads_per_worker=1,
)
client = Client(cluster)
cluster.scale(config.dask_local_worker_instances)
work_futures = []
# For each group do work
for group in groups:
fcast_futures.append(client.submit(_work, group))
# Wait till the work is done
for done_work in as_completed(fcast_futures, with_results=False):
try:
result = done_work.result()
except Exception as error:
log.exception(error)
Run Code Online (Sandbox Code Playgroud)
我的问题是,对于大量工作,我往往会达到内存限制。我看到很多:
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? …Run Code Online (Sandbox Code Playgroud) 我在一台计算机上运行 Dask,在该计算机上运行.compute()对巨大 parquet 文件执行计算将导致 dask 耗尽系统上的所有 CPU 核心。
import dask as dd
df = dd.read_parquet(parquet_file) # very large file
print(df.names.unique().compute())
Run Code Online (Sandbox Code Playgroud)
是否可以将 dask 配置为使用特定数量的 CPU 核心并将其内存使用限制为 32 GB?使用Python 3.7.2和Dask 2.9.2。
dask-distributed ×10
dask ×7
python ×5
python-3.x ×3
dataframe ×2
pandas ×2
amazon-s3 ×1
hdfs ×1
ubuntu ×1